[DTrace-devel] [PATCH v2 5/5] Use array-of-maps as storage for aggregations
Kris Van Hees
kris.van.hees at oracle.com
Thu Aug 25 02:51:56 UTC 2022
On Wed, Aug 24, 2022 at 06:43:04PM -0400, Eugene Loh wrote:
> Reviewed-by: Eugene Loh <eugene.loh at oracle.com>
>
> We have three copies of aggregation data: the BPF map, the snapshot, and the
> user-space copy. So, in dt_aggregate_go(), how about
> - /* Allocate a buffer to hold the aggregation data for a CPU. */
> + /* Allocate a buffer to hold the snapshot data for a CPU. */
I am not convinced this makes a difference. The snapshot is the aggregation
data. Sure, it is the state of the aggregation data at a given moment, but
does it really matter how specific we get in our comments? In the end, it is
a buffer sized to hold the aggregation data and it gets copied into that buffer
using the map lookup elem bpf syscall.
> In gmap_create_aggs(), you call create_gmap_of_maps() with an osize of dtp->
> dt_conf.num_online_cpus. Is that right? This limits the "outer" keys to
> [0:dtp->dt_conf.num_online_cpus). But then
> for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
> int cpu = dtp->dt_conf.cpus[i].cpu_id;
> int fd = ...;
>
> dt_bpf_map_update(dtp->dt_aggmap_fd, &cpu, &fd);
> }
> Will cpu always fit inside the given range?
Good point - this is a mistake in my thought process where I was mixing the
notion that CPU ids are not necessarily sequential (even though they almost
always are) and the idea that I could use an array rather than a hash if it
is big enough to hold all online CPU ids. And in the end, I mixed the two
concepts and messed up.
I think I favour using an array sized up to dtp->dt_conf.max_cpuid + 1, and
we will only be populating the active CPUs. Since they are almost always
sequential (without gaps), using a hashmap would be a waste.
> In dt_cg_tramp_prologue_act(), I continue to maintain that the details in the
> big comment block are obfuscating: they are no more clear than the code itself
> and therefore are confusing. I keep checking the comments against the code
> rather than the other way around. Whatever. In any case, the comment for the
> result of "call bpf_get_smp_processor_id" is said to be
> (%r0 = 'aggs' BPF map value)
Bleech - good point.
> That is apparently a cut-and-paste error. And the final instruction is
> described as
> dctx.aggs = rc; // stdw [%r9 + offset], %r0
> But there is no more "offset"... it should probably be s/offset/DCTX_AGG/.
Hm, I thought I had substituted DCTX_AGG for offset there. Must have failed
to preserve that change when preparing the commit. Silly mistake.
> â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"â"
> From: Kris Van Hees via DTrace-devel <dtrace-devel at oss.oracle.com>
> Sent: Tuesday, August 23, 2022 2:49 PM
> To: dtrace-devel at oss.oracle.com <dtrace-devel at oss.oracle.com>
> Subject: [DTrace-devel] [PATCH v2 5/5] Use array-of-maps as storage for
> aggregations
>
> In preparation for indexed aggregations and the clear() and trunc()
> operations, the storage for aggregations is moving from a per-CPU
> array map to an array of maps, indexed by CPU id.
>
> The existing storage solution for aggregations stored all data in a
> singleton map value, i.e. all CPUs were writing to their own portion
> of a block of memory that the consumer retrieved in its entirety in
> a single system call.
>
> The new storage solution allocates a memory block for each CPU so
> that data retrieval by the consumer can be done per CPU. This sets
> the stage for future development where the consumer may need to
> update the aggregation buffers.
>
> Signed-off-by: Kris Van Hees <kris.van.hees at oracle.com>
> ---
> libdtrace/dt_aggregate.c | 95 ++++++++++++++--------------------------
> libdtrace/dt_bpf.c | 69 ++++++++++++++++++-----------
> libdtrace/dt_cg.c | 53 +++++++++++++++++++++-
> libdtrace/dt_impl.h | 1 -
> 4 files changed, 129 insertions(+), 89 deletions(-)
>
> diff --git a/libdtrace/dt_aggregate.c b/libdtrace/dt_aggregate.c
> index 44896fd2..14d16da6 100644
> --- a/libdtrace/dt_aggregate.c
> +++ b/libdtrace/dt_aggregate.c
> @@ -412,8 +412,6 @@ typedef void (*agg_cpu_f)(dt_ident_t *aid, int64_t *dst,
> int64_t *src,
> typedef struct dt_snapstate {
> dtrace_hdl_t *dtp;
> processorid_t cpu;
> - char *buf;
> - dt_aggregate_t *agp;
> } dt_snapstate_t;
>
> static void
> @@ -444,7 +442,9 @@ dt_agg_one_agg(dt_ident_t *aid, int64_t *dst, int64_t *src,
> uint_t datasz)
> static int
> dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
> {
> - dt_ahash_t *agh = &st->agp->dtat_hash;
> + dtrace_hdl_t *dtp = st->dtp;
> + dt_aggregate_t *agp = &dtp->dt_aggregate;
> + dt_ahash_t *agh = &agp->dtat_hash;
> dt_ahashent_t *h;
> dtrace_aggdesc_t *agg;
> dtrace_aggdata_t *agd;
> @@ -454,12 +454,12 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid,
> dt_snapstate_t *st)
> uint_t i, datasz;
> int64_t *src;
>
> - rval = dt_aggid_lookup(st->dtp, aid->di_id, &agg);
> + rval = dt_aggid_lookup(dtp, aid->di_id, &agg);
> if (rval != 0)
> return rval;
>
> /* point to the data counter */
> - src = (int64_t *)(st->buf + aid->di_offset);
> + src = (int64_t *)(agp->dtat_buf + aid->di_offset);
>
> /* skip it if data counter is 0 */
> if (*src == 0)
> @@ -487,46 +487,45 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid,
> dt_snapstate_t *st)
> }
>
> /* add it to the hash table */
> - h = dt_zalloc(st->dtp, sizeof(dt_ahashent_t));
> + h = dt_zalloc(dtp, sizeof(dt_ahashent_t));
> if (h == NULL)
> - return dt_set_errno(st->dtp, EDT_NOMEM);
> + return dt_set_errno(dtp, EDT_NOMEM);
>
> agd = &h->dtahe_data;
> - agd->dtada_data = dt_alloc(st->dtp, datasz);
> + agd->dtada_data = dt_alloc(dtp, datasz);
> if (agd->dtada_data == NULL) {
> - dt_free(st->dtp, h);
> - return dt_set_errno(st->dtp, EDT_NOMEM);
> + dt_free(dtp, h);
> + return dt_set_errno(dtp, EDT_NOMEM);
> }
>
> memcpy(agd->dtada_data, src, datasz);
> agd->dtada_size = datasz;
> agd->dtada_desc = agg;
> - agd->dtada_hdl = st->dtp;
> + agd->dtada_hdl = dtp;
>
> h->dtahe_hval = hval;
> h->dtahe_size = datasz;
>
> - if (st->agp->dtat_flags & DTRACE_A_PERCPU) {
> - char **percpu = dt_calloc(st->dtp,
> - st->dtp->dt_conf.max_cpuid + 1,
> + if (agp->dtat_flags & DTRACE_A_PERCPU) {
> + char **percpu = dt_calloc(dtp, dtp->dt_conf.max_cpuid + 1,
> sizeof(char *));
>
> if (percpu == NULL) {
> - dt_free(st->dtp, agd->dtada_data);
> - dt_free(st->dtp, h);
> + dt_free(dtp, agd->dtada_data);
> + dt_free(dtp, h);
>
> - dt_set_errno(st->dtp, EDT_NOMEM);
> + dt_set_errno(dtp, EDT_NOMEM);
> }
>
> - for (i = 0; i <= st->dtp->dt_conf.max_cpuid; i++) {
> - percpu[i] = dt_zalloc(st->dtp, datasz);
> + for (i = 0; i <= dtp->dt_conf.max_cpuid; i++) {
> + percpu[i] = dt_zalloc(dtp, datasz);
> if (percpu[i] == NULL) {
> while (--i >= 0)
> - dt_free(st->dtp, percpu[i]);
> - dt_free(st->dtp, agd->dtada_data);
> - dt_free(st->dtp, h);
> + dt_free(dtp, percpu[i]);
> + dt_free(dtp, agd->dtada_data);
> + dt_free(dtp, h);
>
> - dt_set_errno(st->dtp, EDT_NOMEM);
> + dt_set_errno(dtp, EDT_NOMEM);
> }
> }
>
> @@ -553,14 +552,15 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid,
> dt_snapstate_t *st)
> static int
> dt_aggregate_snap_cpu(dtrace_hdl_t *dtp, processorid_t cpu)
> {
> - dt_aggregate_t *agp = &dtp->dt_aggregate;
> - char *buf = agp->dtat_cpu_buf[cpu];
> dt_snapstate_t st;
> + uint32_t key = 0;
>
> st.dtp = dtp;
> st.cpu = cpu;
> - st.buf = buf;
> - st.agp = agp;
> +
> + if (dt_bpf_map_lookup_inner(dtp->dt_aggmap_fd, &cpu, &key,
> + dtp->dt_aggregate.dtat_buf) == -1)
> + return 0;
>
> return dt_idhash_iter(dtp->dt_aggs,
> (dt_idhash_f *)dt_aggregate_snap_one, &st);
> @@ -573,22 +573,17 @@ int
> dtrace_aggregate_snap(dtrace_hdl_t *dtp)
> {
> dt_aggregate_t *agp = &dtp->dt_aggregate;
> - uint32_t key = 0;
> int i, rval;
>
> /*
> * If we do not have a buffer initialized, we will not be processing
> * aggregations, so there is nothing to be done here.
> */
> - if (agp->dtat_cpu_buf == NULL)
> + if (agp->dtat_buf == NULL)
> return 0;
>
> dtrace_aggregate_clear(dtp);
>
> - rval = dt_bpf_map_lookup(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
> - if (rval != 0)
> - return dt_set_errno(dtp, -rval);
> -
> for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
> rval = dt_aggregate_snap_cpu(dtp, dtp->dt_conf.cpus
> [i].cpu_id);
> if (rval != 0)
> @@ -999,41 +994,22 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
> dt_aggregate_t *agp = &dtp->dt_aggregate;
> dt_ahash_t *agh = &agp->dtat_hash;
> int aggsz, i;
> - uint32_t key = 0;
>
> /* If there are no aggregations there is nothing to do. */
> aggsz = dt_idhash_datasize(dtp->dt_aggs);
> if (aggsz <= 0)
> return 0;
>
> - /*
> - * Allocate a buffer to hold the aggregation data for all possible
> - * CPUs, and initialize the per-CPU data pointers for CPUs that are
> - * currently enabled.
> - */
> - agp->dtat_buf = dt_zalloc(dtp, dtp->dt_conf.num_possible_cpus * aggsz);
> + /* Allocate a buffer to hold the aggregation data for a CPU. */
> + agp->dtat_buf = dt_zalloc(dtp, aggsz);
> if (agp->dtat_buf == NULL)
> return dt_set_errno(dtp, EDT_NOMEM);
>
> - agp->dtat_cpu_buf = dt_calloc(dtp, dtp->dt_conf.max_cpuid + 1,
> - sizeof(char *));
> - if (agp->dtat_cpu_buf == NULL) {
> - dt_free(dtp, agp->dtat_buf);
> - return dt_set_errno(dtp, EDT_NOMEM);
> - }
> -
> - for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
> - int cpu = dtp->dt_conf.cpus[i].cpu_id;
> -
> - agp->dtat_cpu_buf[cpu] = agp->dtat_buf + cpu * aggsz;
> - }
> -
> /* Create the aggregation hash. */
> agh->dtah_size = DTRACE_AHASHSIZE;
> agh->dtah_hash = dt_zalloc(dtp,
> agh->dtah_size * sizeof(dt_ahashent_t *));
> if (agh->dtah_hash == NULL) {
> - dt_free(dtp, agp->dtat_cpu_buf);
> dt_free(dtp, agp->dtat_buf);
> return dt_set_errno(dtp, EDT_NOMEM);
> }
> @@ -1045,15 +1021,13 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
> return 0;
> *(int64_t *)agp->dtat_buf = 0; /* clear the flag */
> for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
> - int cpu = dtp->dt_conf.cpus[i].cpu_id;
> + int cpu = dtp->dt_conf.cpus[i].cpu_id;
> + uint32_t key = 0;
>
> - /* Data for CPU 0 was populated, so skip it. */
> - if (cpu == 0)
> + if (dt_bpf_map_update_inner(dtp->dt_aggmap_fd, &cpu, &key,
> + dtp->dt_aggregate.dtat_buf) == -1)
> continue;
> -
> - memcpy(agp->dtat_cpu_buf[cpu], agp->dtat_buf, aggsz);
> }
> - dt_bpf_map_update(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
>
> return 0;
> }
> @@ -1820,6 +1794,5 @@ dt_aggregate_destroy(dtrace_hdl_t *dtp)
> hash->dtah_size = 0;
> }
>
> - dt_free(dtp, agp->dtat_cpu_buf);
> dt_free(dtp, agp->dtat_buf);
> }
> diff --git a/libdtrace/dt_bpf.c b/libdtrace/dt_bpf.c
> index 7dea7179..a31ddf95 100644
> --- a/libdtrace/dt_bpf.c
> +++ b/libdtrace/dt_bpf.c
> @@ -351,6 +351,22 @@ dt_bpf_init_helpers(dtrace_hdl_t *dtp)
> #undef BPF_HELPER_MAP
> }
>
> +static int
> +map_create_error(dtrace_hdl_t *dtp, const char *name, int err)
> +{
> + char msg[64];
> +
> + snprintf(msg, sizeof(msg),
> + "failed to create BPF map '%s'", name);
> +
> + if (err == E2BIG)
> + return dt_bpf_error(dtp, "%s: Too big\n", msg);
> + if (err == EPERM)
> + return dt_bpf_lockmem_error(dtp, msg);
> +
> + return dt_bpf_error(dtp, "%s: %s\n", msg, strerror(err));
> +}
> +
> static int
> create_gmap(dtrace_hdl_t *dtp, const char *name, enum bpf_map_type type,
> size_t ksz, size_t vsz, size_t size)
> @@ -369,17 +385,8 @@ create_gmap(dtrace_hdl_t *dtp, const char *name, enum
> bpf_map_type type,
> err = errno;
> }
>
> - if (fd < 0) {
> - char msg[64];
> -
> - snprintf(msg, sizeof(msg),
> - "failed to create BPF map '%s'", name);
> - if (err == E2BIG)
> - return dt_bpf_error(dtp, "%s: Too big\n", msg);
> - if (err == EPERM)
> - return dt_bpf_lockmem_error(dtp, msg);
> - return dt_bpf_error(dtp, "%s: %s\n", msg, strerror(err));
> - }
> + if (fd < 0)
> + return map_create_error(dtp, name, err);
>
> dt_dprintf("BPF map '%s' is FD %d\n", name, fd);
>
> @@ -421,17 +428,8 @@ create_gmap_of_maps(dtrace_hdl_t *dtp, const char *name,
> err = errno;
> }
>
> - if (fd < 0) {
> - char msg[64];
> -
> - snprintf(msg, sizeof(msg),
> - "failed to create BPF map '%s'", name);
> - if (err == E2BIG)
> - return dt_bpf_error(dtp, "%s: Too big\n", msg);
> - if (err == EPERM)
> - return dt_bpf_lockmem_error(dtp, msg);
> - return dt_bpf_error(dtp, "%s: %s\n", msg, strerror(err));
> - }
> + if (fd < 0)
> + return map_create_error(dtp, name, err);
>
> dt_dprintf("BPF map '%s' is FD %d\n", name, fd);
>
> @@ -470,19 +468,40 @@ gmap_create_state(dtrace_hdl_t *dtp)
> * Create the 'aggs' BPF map.
> *
> * Aggregation data buffer map, associated with each CPU. The map is
> - * implemented as a global per-CPU map with a singleton element (key 0).
> + * implemented as a global array-of-maps indexed by CPU id. The associated
> + * value is a map with a singleton element (key 0).
> */
> static int
> gmap_create_aggs(dtrace_hdl_t *dtp)
> {
> size_t sz = dt_idhash_datasize(dtp->dt_aggs);
> + int i;
>
> /* Only create the map if it is used. */
> if (sz == 0)
> return 0;
>
> - dtp->dt_aggmap_fd = create_gmap(dtp, "aggs", BPF_MAP_TYPE_PERCPU_ARRAY,
> - sizeof(uint32_t), sz, 1);
> + dtp->dt_aggmap_fd = create_gmap_of_maps(dtp, "aggs",
> + BPF_MAP_TYPE_ARRAY_OF_MAPS,
> + sizeof(uint32_t),
> + dtp->dt_conf.num_online_cpus,
> + BPF_MAP_TYPE_ARRAY,
> + sizeof(uint32_t), sz, 1);
> +
> + for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
> + int cpu = dtp->dt_conf.cpus[i].cpu_id;
> + char name[16];
> + int fd;
> +
> + snprintf(name, 16, "aggs_%d", cpu);
> + fd = dt_bpf_map_create(BPF_MAP_TYPE_ARRAY, name,
> + sizeof(uint32_t), sz, 1, 0);
> + if (fd < 0)
> + return map_create_error(dtp, name, errno);
> +
> + dt_bpf_map_update(dtp->dt_aggmap_fd, &cpu, &fd);
> + }
> +
>
> return dtp->dt_aggmap_fd;
> }
> diff --git a/libdtrace/dt_cg.c b/libdtrace/dt_cg.c
> index 0963f202..157f4861 100644
> --- a/libdtrace/dt_cg.c
> +++ b/libdtrace/dt_cg.c
> @@ -52,11 +52,13 @@ dt_cg_tramp_prologue_act(dt_pcb_t *pcb, dt_activity_t act)
> {
> dtrace_hdl_t *dtp = pcb->pcb_hdl;
> dt_irlist_t *dlp = &pcb->pcb_ir;
> + dt_ident_t *aggs = dt_dlib_get_map(dtp, "aggs");
> dt_ident_t *mem = dt_dlib_get_map(dtp, "mem");
> dt_ident_t *state = dt_dlib_get_map(dtp, "state");
> dt_ident_t *prid = dt_dlib_get_var(pcb->pcb_hdl, "PRID");
> uint_t lbl_exit = pcb->pcb_exitlbl;
>
> + assert(aggs != NULL);
> assert(mem != NULL);
> assert(state != NULL);
> assert(prid != NULL);
> @@ -206,13 +208,60 @@ dt_cg_tramp_prologue_act(dt_pcb_t *pcb, dt_activity_t
> act)
> DT_CG_STORE_MAP_PTR("strtab", DCTX_STRTAB);
> if (dtp->dt_options[DTRACEOPT_SCRATCHSIZE] > 0)
> DT_CG_STORE_MAP_PTR("scratchmem", DCTX_SCRATCHMEM);
> - if (dt_idhash_datasize(dtp->dt_aggs) > 0)
> - DT_CG_STORE_MAP_PTR("aggs", DCTX_AGG);
> if (dt_idhash_datasize(dtp->dt_globals) > 0)
> DT_CG_STORE_MAP_PTR("gvars", DCTX_GVARS);
> if (dtp->dt_maxlvaralloc > 0)
> DT_CG_STORE_MAP_PTR("lvars", DCTX_LVARS);
> #undef DT_CG_STORE_MAP_PTR
> +
> + /*
> + * Aggregation data is stored in a CPU-specific BPF map. Populate
> + * dctx->agg with the map for the current CPU.
> + *
> + * key = bpf_get_smp_processor_id()
> + * // call bpf_get_smp_processor_id
> + * // (%r1 ... %r5 clobbered)
> + * // (%r0 = 'aggs' BPF map value)
> + * // stw [%r9 + DCTX_AGG], %r0
> + * rc = bpf_map_lookup_elem(&aggs, &key);
> + * // lddw %r1, &aggs
> + * // mov %r2, %r9
> + * // add %r2, DCTX_AGG
> + * // call bpf_map_lookup_elem
> + * // (%r1 ... %r5 clobbered)
> + * // (%r0 = 'aggs' BPF map value)
> + * if (rc == 0) // jeq %r0, 0, lbl_exit
> + * goto exit;
> + *
> + * key = 0; // stw [%r9 + DCTX_AGG], 0
> + * rc = bpf_map_lookup_elem(rc, &key);
> + * // mov %r1, %r0
> + * // mov %r2, %r9
> + * // add %r2, DCTX_AGG
> + * // call bpf_map_lookup_elem
> + * // (%r1 ... %r5 clobbered)
> + * // (%r0 = aggs[cpuid] BPF map
> value)
> + * if (rc == 0) // jeq %r0, 0, lbl_exit
> + * goto exit;
> + *
> + * dctx.aggs = rc; // stdw [%r9 + offset], %r0
> + */
> + if (dt_idhash_datasize(dtp->dt_aggs) > 0) {
> + emit(dlp, BPF_CALL_HELPER(BPF_FUNC_get_smp_processor_id));
> + emit(dlp, BPF_STORE(BPF_DW, BPF_REG_9, DCTX_AGG, BPF_REG_0));
> + dt_cg_xsetx(dlp, aggs, DT_LBL_NONE, BPF_REG_1, aggs->di_id);
> + emit(dlp, BPF_MOV_REG(BPF_REG_2, BPF_REG_9));
> + emit(dlp, BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, DCTX_AGG));
> + emit(dlp, BPF_CALL_HELPER(BPF_FUNC_map_lookup_elem));
> + emit(dlp, BPF_BRANCH_IMM(BPF_JEQ, BPF_REG_0, 0, lbl_exit));
> + emit(dlp, BPF_STORE_IMM(BPF_DW, BPF_REG_9, DCTX_AGG, 0));
> + emit(dlp, BPF_MOV_REG(BPF_REG_1, BPF_REG_0));
> + emit(dlp, BPF_MOV_REG(BPF_REG_2, BPF_REG_9));
> + emit(dlp, BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, DCTX_AGG));
> + emit(dlp, BPF_CALL_HELPER(BPF_FUNC_map_lookup_elem));
> + emit(dlp, BPF_BRANCH_IMM(BPF_JEQ, BPF_REG_0, 0, lbl_exit));
> + emit(dlp, BPF_STORE(BPF_DW, BPF_REG_9, DCTX_AGG, BPF_REG_0));
> + }
> }
>
> void
> diff --git a/libdtrace/dt_impl.h b/libdtrace/dt_impl.h
> index 85a1e7c9..e9b949ca 100644
> --- a/libdtrace/dt_impl.h
> +++ b/libdtrace/dt_impl.h
> @@ -209,7 +209,6 @@ typedef struct dt_tstring {
> } dt_tstring_t;
>
> typedef struct dt_aggregate {
> - char **dtat_cpu_buf; /* per-CPU agg snapshot buffers */
> char *dtat_buf; /* aggregation snapshot buffer */
> int dtat_flags; /* aggregate flags */
> dt_ahash_t dtat_hash; /* aggregate hash table */
> --
> 2.34.1
>
>
> _______________________________________________
> DTrace-devel mailing list
> DTrace-devel at oss.oracle.com
> https://oss.oracle.com/mailman/listinfo/dtrace-devel
More information about the DTrace-devel
mailing list