[DTrace-devel] [PATCH 5/5] Use array-of-maps as storage for aggregations
Kris Van Hees
kris.van.hees at oracle.com
Tue Aug 23 00:21:22 UTC 2022
On Mon, Aug 22, 2022 at 03:28:08PM -0400, Eugene Loh wrote:
> With this patch, the ante goes up a lot, and it would really be nice to see
> where this whole patch series is going before reviewing. Again, what problems
> does one intend to address? How? (The commit message hints at the first
> question but really does not address the second.)
I am not sure what you mean by 'the ante goes up a lot'? And I think my email
concerning the aggregation support and differences with the legacy version does
outline where this is going: using a hash per CPU (rather than a percpu-hashmap)
so that when aggregation data is produced in a non-uniform fashion across CPUs
(which is actually quite common) we keep storage space available for other
aggregations.
> Regarding the commit message:
>
> *) As before: s/indexed aggregations/aggregation keys/
See my comments in earlier emails.
> *) What do aggregation keys have to do with this patch? Heck, actually the
> same question goes for clear() and trunc(). I suppose this is the same "what's
> the roadmap" question.
If you follow the series, we go from a single map (per-cpu arrau map) to a
hash of array maps. And the next step will be to switch from a hash of array
maps to a hash of hashmaps. This is an intermediary step towards that, and one
that makes it an incremental switch-over.
> *) s/it's/its/
>
> *) The last sentence of the commit message is, let's say, "puzzling." Just
> drop it?
What is puzzling about it?
> In dt_aggregate.c, in dt_aggregate_snap_one(), if you're going to introduce
> "dtp = st->dtp", then occurrences in this function of st->dtp should be
> replaced simply with dtp.
Thanks - I must have missed that.
> In dt_bpf.c, in gmap_create_aggs(), if a per-CPU map cannot be created, we
> don't care?
>
> In dt_cg.c, s/COU/CPU/.
Thanks.
> Also, the big comment block in dt_cg_tramp_prologue_act() keeps saying
> %r0 = 'mem' BPF map value
> but that looks wrong. In general, a ton of this comment detail should be
> pulled out. The comments are harder to understand (and less reliable) than the
> code itself. The comments are a bunch of detail that are wrong or hard to
> understand or both.
I am not sure I would characterize 3 instances of the same obvious cut'n'paste
error as 'keeps saying'. But I will fix the comments.
> âââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ
> From: Kris Van Hees via DTrace-devel <dtrace-devel at oss.oracle.com>
> Sent: Friday, August 19, 2022 10:26 AM
> To: dtrace-devel at oss.oracle.com <dtrace-devel at oss.oracle.com>
> Subject: [DTrace-devel] [PATCH 5/5] Use array-of-maps as storage for
> aggregations
>
> In preparation for indexed aggregations and the clear() and trunc()
> operations, the storage for aggregations is moving from a per-CPU
> array map to an array of maps, indexed by CPU id. This gives each
> CPU it's own storage to store aggregation data in.
>
> Signed-off-by: Kris Van Hees <kris.van.hees at oracle.com>
> ---
> libdtrace/dt_aggregate.c | 62 ++++++++++++----------------------------
> libdtrace/dt_bpf.c | 24 ++++++++++++++--
> libdtrace/dt_cg.c | 53 ++++++++++++++++++++++++++++++++--
> libdtrace/dt_impl.h | 1 -
> 4 files changed, 90 insertions(+), 50 deletions(-)
>
> diff --git a/libdtrace/dt_aggregate.c b/libdtrace/dt_aggregate.c
> index 44896fd2..cb47e442 100644
> --- a/libdtrace/dt_aggregate.c
> +++ b/libdtrace/dt_aggregate.c
> @@ -412,8 +412,6 @@ typedef void (*agg_cpu_f)(dt_ident_t *aid, int64_t *dst,
> int64_t *src,
> typedef struct dt_snapstate {
> dtrace_hdl_t *dtp;
> processorid_t cpu;
> - char *buf;
> - dt_aggregate_t *agp;
> } dt_snapstate_t;
>
> static void
> @@ -444,7 +442,9 @@ dt_agg_one_agg(dt_ident_t *aid, int64_t *dst, int64_t *src,
> uint_t datasz)
> static int
> dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
> {
> - dt_ahash_t *agh = &st->agp->dtat_hash;
> + dtrace_hdl_t *dtp = st->dtp;
> + dt_aggregate_t *agp = &dtp->dt_aggregate;
> + dt_ahash_t *agh = &agp->dtat_hash;
> dt_ahashent_t *h;
> dtrace_aggdesc_t *agg;
> dtrace_aggdata_t *agd;
> @@ -454,12 +454,12 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid,
> dt_snapstate_t *st)
> uint_t i, datasz;
> int64_t *src;
>
> - rval = dt_aggid_lookup(st->dtp, aid->di_id, &agg);
> + rval = dt_aggid_lookup(dtp, aid->di_id, &agg);
> if (rval != 0)
> return rval;
>
> /* point to the data counter */
> - src = (int64_t *)(st->buf + aid->di_offset);
> + src = (int64_t *)(agp->dtat_buf + aid->di_offset);
>
> /* skip it if data counter is 0 */
> if (*src == 0)
> @@ -506,7 +506,7 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid,
> dt_snapstate_t *st)
> h->dtahe_hval = hval;
> h->dtahe_size = datasz;
>
> - if (st->agp->dtat_flags & DTRACE_A_PERCPU) {
> + if (agp->dtat_flags & DTRACE_A_PERCPU) {
> char **percpu = dt_calloc(st->dtp,
> st->dtp->dt_conf.max_cpuid + 1,
> sizeof(char *));
> @@ -553,14 +553,15 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid,
> dt_snapstate_t *st)
> static int
> dt_aggregate_snap_cpu(dtrace_hdl_t *dtp, processorid_t cpu)
> {
> - dt_aggregate_t *agp = &dtp->dt_aggregate;
> - char *buf = agp->dtat_cpu_buf[cpu];
> dt_snapstate_t st;
> + uint32_t key = 0;
>
> st.dtp = dtp;
> st.cpu = cpu;
> - st.buf = buf;
> - st.agp = agp;
> +
> + if (dt_bpf_map_lookup_inner(dtp->dt_aggmap_fd, &cpu, &key,
> + dtp->dt_aggregate.dtat_buf) == -1)
> + return 0;
>
> return dt_idhash_iter(dtp->dt_aggs,
> (dt_idhash_f *)dt_aggregate_snap_one, &st);
> @@ -573,22 +574,17 @@ int
> dtrace_aggregate_snap(dtrace_hdl_t *dtp)
> {
> dt_aggregate_t *agp = &dtp->dt_aggregate;
> - uint32_t key = 0;
> int i, rval;
>
> /*
> * If we do not have a buffer initialized, we will not be processing
> * aggregations, so there is nothing to be done here.
> */
> - if (agp->dtat_cpu_buf == NULL)
> + if (agp->dtat_buf == NULL)
> return 0;
>
> dtrace_aggregate_clear(dtp);
>
> - rval = dt_bpf_map_lookup(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
> - if (rval != 0)
> - return dt_set_errno(dtp, -rval);
> -
> for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
> rval = dt_aggregate_snap_cpu(dtp, dtp->dt_conf.cpus
> [i].cpu_id);
> if (rval != 0)
> @@ -999,41 +995,22 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
> dt_aggregate_t *agp = &dtp->dt_aggregate;
> dt_ahash_t *agh = &agp->dtat_hash;
> int aggsz, i;
> - uint32_t key = 0;
>
> /* If there are no aggregations there is nothing to do. */
> aggsz = dt_idhash_datasize(dtp->dt_aggs);
> if (aggsz <= 0)
> return 0;
>
> - /*
> - * Allocate a buffer to hold the aggregation data for all possible
> - * CPUs, and initialize the per-CPU data pointers for CPUs that are
> - * currently enabled.
> - */
> - agp->dtat_buf = dt_zalloc(dtp, dtp->dt_conf.num_possible_cpus * aggsz);
> + /* Allocate a buffer to hold the aggregation data for a CPU. */
> + agp->dtat_buf = dt_zalloc(dtp, aggsz);
> if (agp->dtat_buf == NULL)
> return dt_set_errno(dtp, EDT_NOMEM);
>
> - agp->dtat_cpu_buf = dt_calloc(dtp, dtp->dt_conf.max_cpuid + 1,
> - sizeof(char *));
> - if (agp->dtat_cpu_buf == NULL) {
> - dt_free(dtp, agp->dtat_buf);
> - return dt_set_errno(dtp, EDT_NOMEM);
> - }
> -
> - for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
> - int cpu = dtp->dt_conf.cpus[i].cpu_id;
> -
> - agp->dtat_cpu_buf[cpu] = agp->dtat_buf + cpu * aggsz;
> - }
> -
> /* Create the aggregation hash. */
> agh->dtah_size = DTRACE_AHASHSIZE;
> agh->dtah_hash = dt_zalloc(dtp,
> agh->dtah_size * sizeof(dt_ahashent_t *));
> if (agh->dtah_hash == NULL) {
> - dt_free(dtp, agp->dtat_cpu_buf);
> dt_free(dtp, agp->dtat_buf);
> return dt_set_errno(dtp, EDT_NOMEM);
> }
> @@ -1045,15 +1022,13 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
> return 0;
> *(int64_t *)agp->dtat_buf = 0; /* clear the flag */
> for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
> - int cpu = dtp->dt_conf.cpus[i].cpu_id;
> + int cpu = dtp->dt_conf.cpus[i].cpu_id;
> + uint32_t key = 0;
>
> - /* Data for CPU 0 was populated, so skip it. */
> - if (cpu == 0)
> + if (dt_bpf_map_update_inner(dtp->dt_aggmap_fd, &cpu, &key,
> + dtp->dt_aggregate.dtat_buf) == -1)
> continue;
> -
> - memcpy(agp->dtat_cpu_buf[cpu], agp->dtat_buf, aggsz);
> }
> - dt_bpf_map_update(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
>
> return 0;
> }
> @@ -1820,6 +1795,5 @@ dt_aggregate_destroy(dtrace_hdl_t *dtp)
> hash->dtah_size = 0;
> }
>
> - dt_free(dtp, agp->dtat_cpu_buf);
> dt_free(dtp, agp->dtat_buf);
> }
> diff --git a/libdtrace/dt_bpf.c b/libdtrace/dt_bpf.c
> index 66c76022..3207a416 100644
> --- a/libdtrace/dt_bpf.c
> +++ b/libdtrace/dt_bpf.c
> @@ -469,19 +469,37 @@ gmap_create_state(dtrace_hdl_t *dtp)
> * Create the 'aggs' BPF map.
> *
> * Aggregation data buffer map, associated with each CPU. The map is
> - * implemented as a global per-CPU map with a singleton element (key 0).
> + * implemented as a global array-of-maps indexed by CPU id. The associated
> + * value is a map with a singleton element (key 0).
> */
> static int
> gmap_create_aggs(dtrace_hdl_t *dtp)
> {
> size_t sz = dt_idhash_datasize(dtp->dt_aggs);
> + int i;
>
> /* Only create the map if it is used. */
> if (sz == 0)
> return 0;
>
> - dtp->dt_aggmap_fd = create_gmap(dtp, "aggs", BPF_MAP_TYPE_PERCPU_ARRAY,
> - sizeof(uint32_t), sz, 1);
> + dtp->dt_aggmap_fd = create_gmap_of_maps(dtp, "aggs",
> + BPF_MAP_TYPE_ARRAY_OF_MAPS,
> + sizeof(uint32_t),
> + dtp->dt_conf.num_online_cpus,
> + BPF_MAP_TYPE_ARRAY,
> + sizeof(uint32_t), sz, 1);
> +
> + for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
> + int cpu = dtp->dt_conf.cpus[i].cpu_id;
> + int fd = dt_bpf_map_create(BPF_MAP_TYPE_ARRAY, NULL,
> + sizeof(uint32_t), sz, 1, 0);
> +
> + if (fd < 0)
> + continue;
> +
> + dt_bpf_map_update(dtp->dt_aggmap_fd, &cpu, &fd);
> + }
> +
>
> return dtp->dt_aggmap_fd;
>
> diff --git a/libdtrace/dt_cg.c b/libdtrace/dt_cg.c
> index 0963f202..b6ade5a9 100644
> --- a/libdtrace/dt_cg.c
> +++ b/libdtrace/dt_cg.c
> @@ -52,11 +52,13 @@ dt_cg_tramp_prologue_act(dt_pcb_t *pcb, dt_activity_t act)
> {
> dtrace_hdl_t *dtp = pcb->pcb_hdl;
> dt_irlist_t *dlp = &pcb->pcb_ir;
> + dt_ident_t *aggs = dt_dlib_get_map(dtp, "aggs");
> dt_ident_t *mem = dt_dlib_get_map(dtp, "mem");
> dt_ident_t *state = dt_dlib_get_map(dtp, "state");
> dt_ident_t *prid = dt_dlib_get_var(pcb->pcb_hdl, "PRID");
> uint_t lbl_exit = pcb->pcb_exitlbl;
>
> + assert(aggs != NULL);
> assert(mem != NULL);
> assert(state != NULL);
> assert(prid != NULL);
> @@ -206,13 +208,60 @@ dt_cg_tramp_prologue_act(dt_pcb_t *pcb, dt_activity_t
> act)
> DT_CG_STORE_MAP_PTR("strtab", DCTX_STRTAB);
> if (dtp->dt_options[DTRACEOPT_SCRATCHSIZE] > 0)
> DT_CG_STORE_MAP_PTR("scratchmem", DCTX_SCRATCHMEM);
> - if (dt_idhash_datasize(dtp->dt_aggs) > 0)
> - DT_CG_STORE_MAP_PTR("aggs", DCTX_AGG);
> if (dt_idhash_datasize(dtp->dt_globals) > 0)
> DT_CG_STORE_MAP_PTR("gvars", DCTX_GVARS);
> if (dtp->dt_maxlvaralloc > 0)
> DT_CG_STORE_MAP_PTR("lvars", DCTX_LVARS);
> #undef DT_CG_STORE_MAP_PTR
> +
> + /*
> + * Aggregation data is stored in a COU-specific BPF map. Populate
> + * dctx->agg with the map for the current CPU.
> + *
> + * key = bpf_get_smp_processor_id()
> + * // call bpf_get_smp_processor_id
> + * // (%r1 ... %r5 clobbered)
> + * // (%r0 = 'mem' BPF map value)
> + * // stw [%r9 + DCTX_AGG], %r0
> + * rc = bpf_map_lookup_elem(&aggs, &key);
> + * // lddw %r1, &aggs
> + * // mov %r2, %r9
> + * // add %r2, DCTX_AGG
> + * // call bpf_map_lookup_elem
> + * // (%r1 ... %r5 clobbered)
> + * // (%r0 = 'mem' BPF map value)
> + * if (rc == 0) // jeq %r0, 0, lbl_exit
> + * goto exit;
> + *
> + * key = 0; // stw [%r9 + DCTX_AGG], 0
> + * rc = bpf_map_lookup_elem(rc, &key);
> + * // mov %r1, %r0
> + * // mov %r2, %r9
> + * // add %r2, DCTX_AGG
> + * // call bpf_map_lookup_elem
> + * // (%r1 ... %r5 clobbered)
> + * // (%r0 = 'mem' BPF map value)
> + * if (rc == 0) // jeq %r0, 0, lbl_exit
> + * goto exit;
> + *
> + * dctx.aggs = rc; // stdw [%r9 + offset], %r0
> + */
> + if (dt_idhash_datasize(dtp->dt_aggs) > 0) {
> + emit(dlp, BPF_CALL_HELPER(BPF_FUNC_get_smp_processor_id));
> + emit(dlp, BPF_STORE(BPF_DW, BPF_REG_9, DCTX_AGG, BPF_REG_0));
> + dt_cg_xsetx(dlp, aggs, DT_LBL_NONE, BPF_REG_1, aggs->di_id);
> + emit(dlp, BPF_MOV_REG(BPF_REG_2, BPF_REG_9));
> + emit(dlp, BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, DCTX_AGG));
> + emit(dlp, BPF_CALL_HELPER(BPF_FUNC_map_lookup_elem));
> + emit(dlp, BPF_BRANCH_IMM(BPF_JEQ, BPF_REG_0, 0, lbl_exit));
> + emit(dlp, BPF_STORE_IMM(BPF_DW, BPF_REG_9, DCTX_AGG, 0));
> + emit(dlp, BPF_MOV_REG(BPF_REG_1, BPF_REG_0));
> + emit(dlp, BPF_MOV_REG(BPF_REG_2, BPF_REG_9));
> + emit(dlp, BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, DCTX_AGG));
> + emit(dlp, BPF_CALL_HELPER(BPF_FUNC_map_lookup_elem));
> + emit(dlp, BPF_BRANCH_IMM(BPF_JEQ, BPF_REG_0, 0, lbl_exit));
> + emit(dlp, BPF_STORE(BPF_DW, BPF_REG_9, DCTX_AGG, BPF_REG_0));
> + }
> }
>
> void
> diff --git a/libdtrace/dt_impl.h b/libdtrace/dt_impl.h
> index 85a1e7c9..e9b949ca 100644
> --- a/libdtrace/dt_impl.h
> +++ b/libdtrace/dt_impl.h
> @@ -209,7 +209,6 @@ typedef struct dt_tstring {
> } dt_tstring_t;
>
> typedef struct dt_aggregate {
> - char **dtat_cpu_buf; /* per-CPU agg snapshot buffers */
> char *dtat_buf; /* aggregation snapshot buffer */
> int dtat_flags; /* aggregate flags */
> dt_ahash_t dtat_hash; /* aggregate hash table */
> --
> 2.34.1
>
>
> _______________________________________________
> DTrace-devel mailing list
> DTrace-devel at oss.oracle.com
> https://oss.oracle.com/mailman/listinfo/dtrace-devel
More information about the DTrace-devel
mailing list