[DTrace-devel] [PATCH 5/5] Use array-of-maps as storage for aggregations
Kris Van Hees
kris.van.hees at oracle.com
Fri Aug 19 17:26:13 UTC 2022
In preparation for indexed aggregations and the clear() and trunc()
operations, the storage for aggregations is moving from a per-CPU
array map to an array of maps, indexed by CPU id. This gives each
CPU it's own storage to store aggregation data in.
Signed-off-by: Kris Van Hees <kris.van.hees at oracle.com>
---
libdtrace/dt_aggregate.c | 62 ++++++++++++----------------------------
libdtrace/dt_bpf.c | 24 ++++++++++++++--
libdtrace/dt_cg.c | 53 ++++++++++++++++++++++++++++++++--
libdtrace/dt_impl.h | 1 -
4 files changed, 90 insertions(+), 50 deletions(-)
diff --git a/libdtrace/dt_aggregate.c b/libdtrace/dt_aggregate.c
index 44896fd2..cb47e442 100644
--- a/libdtrace/dt_aggregate.c
+++ b/libdtrace/dt_aggregate.c
@@ -412,8 +412,6 @@ typedef void (*agg_cpu_f)(dt_ident_t *aid, int64_t *dst, int64_t *src,
typedef struct dt_snapstate {
dtrace_hdl_t *dtp;
processorid_t cpu;
- char *buf;
- dt_aggregate_t *agp;
} dt_snapstate_t;
static void
@@ -444,7 +442,9 @@ dt_agg_one_agg(dt_ident_t *aid, int64_t *dst, int64_t *src, uint_t datasz)
static int
dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
{
- dt_ahash_t *agh = &st->agp->dtat_hash;
+ dtrace_hdl_t *dtp = st->dtp;
+ dt_aggregate_t *agp = &dtp->dt_aggregate;
+ dt_ahash_t *agh = &agp->dtat_hash;
dt_ahashent_t *h;
dtrace_aggdesc_t *agg;
dtrace_aggdata_t *agd;
@@ -454,12 +454,12 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
uint_t i, datasz;
int64_t *src;
- rval = dt_aggid_lookup(st->dtp, aid->di_id, &agg);
+ rval = dt_aggid_lookup(dtp, aid->di_id, &agg);
if (rval != 0)
return rval;
/* point to the data counter */
- src = (int64_t *)(st->buf + aid->di_offset);
+ src = (int64_t *)(agp->dtat_buf + aid->di_offset);
/* skip it if data counter is 0 */
if (*src == 0)
@@ -506,7 +506,7 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
h->dtahe_hval = hval;
h->dtahe_size = datasz;
- if (st->agp->dtat_flags & DTRACE_A_PERCPU) {
+ if (agp->dtat_flags & DTRACE_A_PERCPU) {
char **percpu = dt_calloc(st->dtp,
st->dtp->dt_conf.max_cpuid + 1,
sizeof(char *));
@@ -553,14 +553,15 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
static int
dt_aggregate_snap_cpu(dtrace_hdl_t *dtp, processorid_t cpu)
{
- dt_aggregate_t *agp = &dtp->dt_aggregate;
- char *buf = agp->dtat_cpu_buf[cpu];
dt_snapstate_t st;
+ uint32_t key = 0;
st.dtp = dtp;
st.cpu = cpu;
- st.buf = buf;
- st.agp = agp;
+
+ if (dt_bpf_map_lookup_inner(dtp->dt_aggmap_fd, &cpu, &key,
+ dtp->dt_aggregate.dtat_buf) == -1)
+ return 0;
return dt_idhash_iter(dtp->dt_aggs,
(dt_idhash_f *)dt_aggregate_snap_one, &st);
@@ -573,22 +574,17 @@ int
dtrace_aggregate_snap(dtrace_hdl_t *dtp)
{
dt_aggregate_t *agp = &dtp->dt_aggregate;
- uint32_t key = 0;
int i, rval;
/*
* If we do not have a buffer initialized, we will not be processing
* aggregations, so there is nothing to be done here.
*/
- if (agp->dtat_cpu_buf == NULL)
+ if (agp->dtat_buf == NULL)
return 0;
dtrace_aggregate_clear(dtp);
- rval = dt_bpf_map_lookup(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
- if (rval != 0)
- return dt_set_errno(dtp, -rval);
-
for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
rval = dt_aggregate_snap_cpu(dtp, dtp->dt_conf.cpus[i].cpu_id);
if (rval != 0)
@@ -999,41 +995,22 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
dt_aggregate_t *agp = &dtp->dt_aggregate;
dt_ahash_t *agh = &agp->dtat_hash;
int aggsz, i;
- uint32_t key = 0;
/* If there are no aggregations there is nothing to do. */
aggsz = dt_idhash_datasize(dtp->dt_aggs);
if (aggsz <= 0)
return 0;
- /*
- * Allocate a buffer to hold the aggregation data for all possible
- * CPUs, and initialize the per-CPU data pointers for CPUs that are
- * currently enabled.
- */
- agp->dtat_buf = dt_zalloc(dtp, dtp->dt_conf.num_possible_cpus * aggsz);
+ /* Allocate a buffer to hold the aggregation data for a CPU. */
+ agp->dtat_buf = dt_zalloc(dtp, aggsz);
if (agp->dtat_buf == NULL)
return dt_set_errno(dtp, EDT_NOMEM);
- agp->dtat_cpu_buf = dt_calloc(dtp, dtp->dt_conf.max_cpuid + 1,
- sizeof(char *));
- if (agp->dtat_cpu_buf == NULL) {
- dt_free(dtp, agp->dtat_buf);
- return dt_set_errno(dtp, EDT_NOMEM);
- }
-
- for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
- int cpu = dtp->dt_conf.cpus[i].cpu_id;
-
- agp->dtat_cpu_buf[cpu] = agp->dtat_buf + cpu * aggsz;
- }
-
/* Create the aggregation hash. */
agh->dtah_size = DTRACE_AHASHSIZE;
agh->dtah_hash = dt_zalloc(dtp,
agh->dtah_size * sizeof(dt_ahashent_t *));
if (agh->dtah_hash == NULL) {
- dt_free(dtp, agp->dtat_cpu_buf);
dt_free(dtp, agp->dtat_buf);
return dt_set_errno(dtp, EDT_NOMEM);
}
@@ -1045,15 +1022,13 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
return 0;
*(int64_t *)agp->dtat_buf = 0; /* clear the flag */
for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
- int cpu = dtp->dt_conf.cpus[i].cpu_id;
+ int cpu = dtp->dt_conf.cpus[i].cpu_id;
+ uint32_t key = 0;
- /* Data for CPU 0 was populated, so skip it. */
- if (cpu == 0)
+ if (dt_bpf_map_update_inner(dtp->dt_aggmap_fd, &cpu, &key,
+ dtp->dt_aggregate.dtat_buf) == -1)
continue;
-
- memcpy(agp->dtat_cpu_buf[cpu], agp->dtat_buf, aggsz);
}
- dt_bpf_map_update(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
return 0;
}
@@ -1820,6 +1795,5 @@ dt_aggregate_destroy(dtrace_hdl_t *dtp)
hash->dtah_size = 0;
}
- dt_free(dtp, agp->dtat_cpu_buf);
dt_free(dtp, agp->dtat_buf);
}
diff --git a/libdtrace/dt_bpf.c b/libdtrace/dt_bpf.c
index 66c76022..3207a416 100644
--- a/libdtrace/dt_bpf.c
+++ b/libdtrace/dt_bpf.c
@@ -469,19 +469,37 @@ gmap_create_state(dtrace_hdl_t *dtp)
* Create the 'aggs' BPF map.
*
* Aggregation data buffer map, associated with each CPU. The map is
- * implemented as a global per-CPU map with a singleton element (key 0).
+ * implemented as a global array-of-maps indexed by CPU id. The associated
+ * value is a map with a singleton element (key 0).
*/
static int
gmap_create_aggs(dtrace_hdl_t *dtp)
{
size_t sz = dt_idhash_datasize(dtp->dt_aggs);
+ int i;
/* Only create the map if it is used. */
if (sz == 0)
return 0;
- dtp->dt_aggmap_fd = create_gmap(dtp, "aggs", BPF_MAP_TYPE_PERCPU_ARRAY,
- sizeof(uint32_t), sz, 1);
+ dtp->dt_aggmap_fd = create_gmap_of_maps(dtp, "aggs",
+ BPF_MAP_TYPE_ARRAY_OF_MAPS,
+ sizeof(uint32_t),
+ dtp->dt_conf.num_online_cpus,
+ BPF_MAP_TYPE_ARRAY,
+ sizeof(uint32_t), sz, 1);
+
+ for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
+ int cpu = dtp->dt_conf.cpus[i].cpu_id;
+ int fd = dt_bpf_map_create(BPF_MAP_TYPE_ARRAY, NULL,
+ sizeof(uint32_t), sz, 1, 0);
+
+ if (fd < 0)
+ continue;
+
+ dt_bpf_map_update(dtp->dt_aggmap_fd, &cpu, &fd);
+ }
+
return dtp->dt_aggmap_fd;
diff --git a/libdtrace/dt_cg.c b/libdtrace/dt_cg.c
index 0963f202..b6ade5a9 100644
--- a/libdtrace/dt_cg.c
+++ b/libdtrace/dt_cg.c
@@ -52,11 +52,13 @@ dt_cg_tramp_prologue_act(dt_pcb_t *pcb, dt_activity_t act)
{
dtrace_hdl_t *dtp = pcb->pcb_hdl;
dt_irlist_t *dlp = &pcb->pcb_ir;
+ dt_ident_t *aggs = dt_dlib_get_map(dtp, "aggs");
dt_ident_t *mem = dt_dlib_get_map(dtp, "mem");
dt_ident_t *state = dt_dlib_get_map(dtp, "state");
dt_ident_t *prid = dt_dlib_get_var(pcb->pcb_hdl, "PRID");
uint_t lbl_exit = pcb->pcb_exitlbl;
+ assert(aggs != NULL);
assert(mem != NULL);
assert(state != NULL);
assert(prid != NULL);
@@ -206,13 +208,60 @@ dt_cg_tramp_prologue_act(dt_pcb_t *pcb, dt_activity_t act)
DT_CG_STORE_MAP_PTR("strtab", DCTX_STRTAB);
if (dtp->dt_options[DTRACEOPT_SCRATCHSIZE] > 0)
DT_CG_STORE_MAP_PTR("scratchmem", DCTX_SCRATCHMEM);
- if (dt_idhash_datasize(dtp->dt_aggs) > 0)
- DT_CG_STORE_MAP_PTR("aggs", DCTX_AGG);
if (dt_idhash_datasize(dtp->dt_globals) > 0)
DT_CG_STORE_MAP_PTR("gvars", DCTX_GVARS);
if (dtp->dt_maxlvaralloc > 0)
DT_CG_STORE_MAP_PTR("lvars", DCTX_LVARS);
#undef DT_CG_STORE_MAP_PTR
+
+ /*
+ * Aggregation data is stored in a COU-specific BPF map. Populate
+ * dctx->agg with the map for the current CPU.
+ *
+ * key = bpf_get_smp_processor_id()
+ * // call bpf_get_smp_processor_id
+ * // (%r1 ... %r5 clobbered)
+ * // (%r0 = 'mem' BPF map value)
+ * // stw [%r9 + DCTX_AGG], %r0
+ * rc = bpf_map_lookup_elem(&aggs, &key);
+ * // lddw %r1, &aggs
+ * // mov %r2, %r9
+ * // add %r2, DCTX_AGG
+ * // call bpf_map_lookup_elem
+ * // (%r1 ... %r5 clobbered)
+ * // (%r0 = 'mem' BPF map value)
+ * if (rc == 0) // jeq %r0, 0, lbl_exit
+ * goto exit;
+ *
+ * key = 0; // stw [%r9 + DCTX_AGG], 0
+ * rc = bpf_map_lookup_elem(rc, &key);
+ * // mov %r1, %r0
+ * // mov %r2, %r9
+ * // add %r2, DCTX_AGG
+ * // call bpf_map_lookup_elem
+ * // (%r1 ... %r5 clobbered)
+ * // (%r0 = 'mem' BPF map value)
+ * if (rc == 0) // jeq %r0, 0, lbl_exit
+ * goto exit;
+ *
+ * dctx.aggs = rc; // stdw [%r9 + offset], %r0
+ */
+ if (dt_idhash_datasize(dtp->dt_aggs) > 0) {
+ emit(dlp, BPF_CALL_HELPER(BPF_FUNC_get_smp_processor_id));
+ emit(dlp, BPF_STORE(BPF_DW, BPF_REG_9, DCTX_AGG, BPF_REG_0));
+ dt_cg_xsetx(dlp, aggs, DT_LBL_NONE, BPF_REG_1, aggs->di_id);
+ emit(dlp, BPF_MOV_REG(BPF_REG_2, BPF_REG_9));
+ emit(dlp, BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, DCTX_AGG));
+ emit(dlp, BPF_CALL_HELPER(BPF_FUNC_map_lookup_elem));
+ emit(dlp, BPF_BRANCH_IMM(BPF_JEQ, BPF_REG_0, 0, lbl_exit));
+ emit(dlp, BPF_STORE_IMM(BPF_DW, BPF_REG_9, DCTX_AGG, 0));
+ emit(dlp, BPF_MOV_REG(BPF_REG_1, BPF_REG_0));
+ emit(dlp, BPF_MOV_REG(BPF_REG_2, BPF_REG_9));
+ emit(dlp, BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, DCTX_AGG));
+ emit(dlp, BPF_CALL_HELPER(BPF_FUNC_map_lookup_elem));
+ emit(dlp, BPF_BRANCH_IMM(BPF_JEQ, BPF_REG_0, 0, lbl_exit));
+ emit(dlp, BPF_STORE(BPF_DW, BPF_REG_9, DCTX_AGG, BPF_REG_0));
+ }
}
void
diff --git a/libdtrace/dt_impl.h b/libdtrace/dt_impl.h
index 85a1e7c9..e9b949ca 100644
--- a/libdtrace/dt_impl.h
+++ b/libdtrace/dt_impl.h
@@ -209,7 +209,6 @@ typedef struct dt_tstring {
} dt_tstring_t;
typedef struct dt_aggregate {
- char **dtat_cpu_buf; /* per-CPU agg snapshot buffers */
char *dtat_buf; /* aggregation snapshot buffer */
int dtat_flags; /* aggregate flags */
dt_ahash_t dtat_hash; /* aggregate hash table */
--
2.34.1
More information about the DTrace-devel
mailing list