[DTrace-devel] [PATCH v2 5/5] Use array-of-maps as storage for aggregations

Eugene Loh eugene.loh at oracle.com
Wed Aug 24 22:43:04 UTC 2022


Reviewed-by: Eugene Loh <eugene.loh at oracle.com>

We have three copies of aggregation data:  the BPF map, the snapshot, and the user-space copy.  So, in dt_aggregate_go(), how about
-       /* Allocate a buffer to hold the aggregation data for a CPU.  */
+       /* Allocate a buffer to hold the snapshot data for a CPU.  */

In gmap_create_aggs(), you call create_gmap_of_maps() with an osize of dtp->dt_conf.num_online_cpus.  Is that right?  This limits the "outer" keys to [0:dtp->dt_conf.num_online_cpus).  But then
        for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
                int        cpu = dtp->dt_conf.cpus[i].cpu_id;
                int        fd = ...;

                dt_bpf_map_update(dtp->dt_aggmap_fd, &cpu, &fd);
        }
Will cpu always fit inside the given range?

In dt_cg_tramp_prologue_act(), I continue to maintain that the details in the big comment block are obfuscating:  they are no more clear than the code itself and therefore are confusing.  I keep checking the comments against the code rather than the other way around.  Whatever.  In any case, the comment for the result of "call bpf_get_smp_processor_id" is said to be
        (%r0 = 'aggs' BPF map value)
That is apparently a cut-and-paste error.  And the final instruction is described as
        dctx.aggs = rc;       // stdw [%r9 + offset], %r0
But there is no more "offset"... it should probably be s/offset/DCTX_AGG/.
________________________________
From: Kris Van Hees via DTrace-devel <dtrace-devel at oss.oracle.com>
Sent: Tuesday, August 23, 2022 2:49 PM
To: dtrace-devel at oss.oracle.com <dtrace-devel at oss.oracle.com>
Subject: [DTrace-devel] [PATCH v2 5/5] Use array-of-maps as storage for aggregations

In preparation for indexed aggregations and the clear() and trunc()
operations, the storage for aggregations is moving from a per-CPU
array map to an array of maps, indexed by CPU id.

The existing storage solution for aggregations stored all data in a
singleton map value, i.e. all CPUs were writing to their own portion
of a block of memory that the consumer retrieved in its entirety in
a single system call.

The new storage solution allocates a memory block for each CPU so
that data retrieval by the consumer can be done per CPU.  This sets
the stage for future development where the consumer may need to
update the aggregation buffers.

Signed-off-by: Kris Van Hees <kris.van.hees at oracle.com>
---
 libdtrace/dt_aggregate.c | 95 ++++++++++++++--------------------------
 libdtrace/dt_bpf.c       | 69 ++++++++++++++++++-----------
 libdtrace/dt_cg.c        | 53 +++++++++++++++++++++-
 libdtrace/dt_impl.h      |  1 -
 4 files changed, 129 insertions(+), 89 deletions(-)

diff --git a/libdtrace/dt_aggregate.c b/libdtrace/dt_aggregate.c
index 44896fd2..14d16da6 100644
--- a/libdtrace/dt_aggregate.c
+++ b/libdtrace/dt_aggregate.c
@@ -412,8 +412,6 @@ typedef void (*agg_cpu_f)(dt_ident_t *aid, int64_t *dst, int64_t *src,
 typedef struct dt_snapstate {
         dtrace_hdl_t    *dtp;
         processorid_t   cpu;
-       char            *buf;
-       dt_aggregate_t  *agp;
 } dt_snapstate_t;

 static void
@@ -444,7 +442,9 @@ dt_agg_one_agg(dt_ident_t *aid, int64_t *dst, int64_t *src, uint_t datasz)
 static int
 dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
 {
-       dt_ahash_t              *agh = &st->agp->dtat_hash;
+       dtrace_hdl_t            *dtp = st->dtp;
+       dt_aggregate_t          *agp = &dtp->dt_aggregate;
+       dt_ahash_t              *agh = &agp->dtat_hash;
         dt_ahashent_t           *h;
         dtrace_aggdesc_t        *agg;
         dtrace_aggdata_t        *agd;
@@ -454,12 +454,12 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
         uint_t                  i, datasz;
         int64_t                 *src;

-       rval = dt_aggid_lookup(st->dtp, aid->di_id, &agg);
+       rval = dt_aggid_lookup(dtp, aid->di_id, &agg);
         if (rval != 0)
                 return rval;

         /* point to the data counter */
-       src = (int64_t *)(st->buf + aid->di_offset);
+       src = (int64_t *)(agp->dtat_buf + aid->di_offset);

         /* skip it if data counter is 0 */
         if (*src == 0)
@@ -487,46 +487,45 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
         }

         /* add it to the hash table */
-       h = dt_zalloc(st->dtp, sizeof(dt_ahashent_t));
+       h = dt_zalloc(dtp, sizeof(dt_ahashent_t));
         if (h == NULL)
-               return dt_set_errno(st->dtp, EDT_NOMEM);
+               return dt_set_errno(dtp, EDT_NOMEM);

         agd = &h->dtahe_data;
-       agd->dtada_data = dt_alloc(st->dtp, datasz);
+       agd->dtada_data = dt_alloc(dtp, datasz);
         if (agd->dtada_data == NULL) {
-               dt_free(st->dtp, h);
-               return dt_set_errno(st->dtp, EDT_NOMEM);
+               dt_free(dtp, h);
+               return dt_set_errno(dtp, EDT_NOMEM);
         }

         memcpy(agd->dtada_data, src, datasz);
         agd->dtada_size = datasz;
         agd->dtada_desc = agg;
-       agd->dtada_hdl = st->dtp;
+       agd->dtada_hdl = dtp;

         h->dtahe_hval = hval;
         h->dtahe_size = datasz;

-       if (st->agp->dtat_flags & DTRACE_A_PERCPU) {
-               char    **percpu = dt_calloc(st->dtp,
-                                            st->dtp->dt_conf.max_cpuid + 1,
+       if (agp->dtat_flags & DTRACE_A_PERCPU) {
+               char    **percpu = dt_calloc(dtp, dtp->dt_conf.max_cpuid + 1,
                                              sizeof(char *));

                 if (percpu == NULL) {
-                       dt_free(st->dtp, agd->dtada_data);
-                       dt_free(st->dtp, h);
+                       dt_free(dtp, agd->dtada_data);
+                       dt_free(dtp, h);

-                       dt_set_errno(st->dtp, EDT_NOMEM);
+                       dt_set_errno(dtp, EDT_NOMEM);
                 }

-               for (i = 0; i <= st->dtp->dt_conf.max_cpuid; i++) {
-                       percpu[i] = dt_zalloc(st->dtp, datasz);
+               for (i = 0; i <= dtp->dt_conf.max_cpuid; i++) {
+                       percpu[i] = dt_zalloc(dtp, datasz);
                         if (percpu[i] == NULL) {
                                 while (--i >= 0)
-                                       dt_free(st->dtp, percpu[i]);
-                               dt_free(st->dtp, agd->dtada_data);
-                               dt_free(st->dtp, h);
+                                       dt_free(dtp, percpu[i]);
+                               dt_free(dtp, agd->dtada_data);
+                               dt_free(dtp, h);

-                               dt_set_errno(st->dtp, EDT_NOMEM);
+                               dt_set_errno(dtp, EDT_NOMEM);
                         }
                 }

@@ -553,14 +552,15 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
 static int
 dt_aggregate_snap_cpu(dtrace_hdl_t *dtp, processorid_t cpu)
 {
-       dt_aggregate_t  *agp = &dtp->dt_aggregate;
-       char            *buf = agp->dtat_cpu_buf[cpu];
         dt_snapstate_t  st;
+       uint32_t        key = 0;

         st.dtp = dtp;
         st.cpu = cpu;
-       st.buf = buf;
-       st.agp = agp;
+
+       if (dt_bpf_map_lookup_inner(dtp->dt_aggmap_fd, &cpu, &key,
+                                   dtp->dt_aggregate.dtat_buf) == -1)
+               return 0;

         return dt_idhash_iter(dtp->dt_aggs,
                               (dt_idhash_f *)dt_aggregate_snap_one, &st);
@@ -573,22 +573,17 @@ int
 dtrace_aggregate_snap(dtrace_hdl_t *dtp)
 {
         dt_aggregate_t  *agp = &dtp->dt_aggregate;
-       uint32_t        key = 0;
         int             i, rval;

         /*
          * If we do not have a buffer initialized, we will not be processing
          * aggregations, so there is nothing to be done here.
          */
-       if (agp->dtat_cpu_buf == NULL)
+       if (agp->dtat_buf == NULL)
                 return 0;

         dtrace_aggregate_clear(dtp);

-       rval = dt_bpf_map_lookup(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
-       if (rval != 0)
-               return dt_set_errno(dtp, -rval);
-
         for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
                 rval = dt_aggregate_snap_cpu(dtp, dtp->dt_conf.cpus[i].cpu_id);
                 if (rval != 0)
@@ -999,41 +994,22 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
         dt_aggregate_t  *agp = &dtp->dt_aggregate;
         dt_ahash_t      *agh = &agp->dtat_hash;
         int             aggsz, i;
-       uint32_t        key = 0;

         /* If there are no aggregations there is nothing to do. */
         aggsz = dt_idhash_datasize(dtp->dt_aggs);
         if (aggsz <= 0)
                 return 0;

-       /*
-        * Allocate a buffer to hold the aggregation data for all possible
-        * CPUs, and initialize the per-CPU data pointers for CPUs that are
-        * currently enabled.
-        */
-       agp->dtat_buf = dt_zalloc(dtp, dtp->dt_conf.num_possible_cpus * aggsz);
+       /* Allocate a buffer to hold the aggregation data for a CPU.  */
+       agp->dtat_buf = dt_zalloc(dtp, aggsz);
         if (agp->dtat_buf == NULL)
                 return dt_set_errno(dtp, EDT_NOMEM);

-       agp->dtat_cpu_buf = dt_calloc(dtp, dtp->dt_conf.max_cpuid + 1,
-                                     sizeof(char *));
-       if (agp->dtat_cpu_buf == NULL) {
-               dt_free(dtp, agp->dtat_buf);
-               return dt_set_errno(dtp, EDT_NOMEM);
-       }
-
-       for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
-               int     cpu = dtp->dt_conf.cpus[i].cpu_id;
-
-               agp->dtat_cpu_buf[cpu] = agp->dtat_buf + cpu * aggsz;
-       }
-
         /* Create the aggregation hash. */
         agh->dtah_size = DTRACE_AHASHSIZE;
         agh->dtah_hash = dt_zalloc(dtp,
                                    agh->dtah_size * sizeof(dt_ahashent_t *));
         if (agh->dtah_hash == NULL) {
-               dt_free(dtp, agp->dtat_cpu_buf);
                 dt_free(dtp, agp->dtat_buf);
                 return dt_set_errno(dtp, EDT_NOMEM);
         }
@@ -1045,15 +1021,13 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
                 return 0;
         *(int64_t *)agp->dtat_buf = 0;  /* clear the flag */
         for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
-               int     cpu = dtp->dt_conf.cpus[i].cpu_id;
+               int             cpu = dtp->dt_conf.cpus[i].cpu_id;
+               uint32_t        key = 0;

-               /* Data for CPU 0 was populated, so skip it. */
-               if (cpu == 0)
+               if (dt_bpf_map_update_inner(dtp->dt_aggmap_fd, &cpu, &key,
+                                           dtp->dt_aggregate.dtat_buf) == -1)
                         continue;
-
-               memcpy(agp->dtat_cpu_buf[cpu], agp->dtat_buf, aggsz);
         }
-       dt_bpf_map_update(dtp->dt_aggmap_fd, &key, agp->dtat_buf);

         return 0;
 }
@@ -1820,6 +1794,5 @@ dt_aggregate_destroy(dtrace_hdl_t *dtp)
                 hash->dtah_size = 0;
         }

-       dt_free(dtp, agp->dtat_cpu_buf);
         dt_free(dtp, agp->dtat_buf);
 }
diff --git a/libdtrace/dt_bpf.c b/libdtrace/dt_bpf.c
index 7dea7179..a31ddf95 100644
--- a/libdtrace/dt_bpf.c
+++ b/libdtrace/dt_bpf.c
@@ -351,6 +351,22 @@ dt_bpf_init_helpers(dtrace_hdl_t *dtp)
 #undef BPF_HELPER_MAP
 }

+static int
+map_create_error(dtrace_hdl_t *dtp, const char *name, int err)
+{
+       char    msg[64];
+
+       snprintf(msg, sizeof(msg),
+                "failed to create BPF map '%s'", name);
+
+       if (err == E2BIG)
+               return dt_bpf_error(dtp, "%s: Too big\n", msg);
+       if (err == EPERM)
+               return dt_bpf_lockmem_error(dtp, msg);
+
+       return dt_bpf_error(dtp, "%s: %s\n", msg, strerror(err));
+}
+
 static int
 create_gmap(dtrace_hdl_t *dtp, const char *name, enum bpf_map_type type,
             size_t ksz, size_t vsz, size_t size)
@@ -369,17 +385,8 @@ create_gmap(dtrace_hdl_t *dtp, const char *name, enum bpf_map_type type,
                 err = errno;
         }

-       if (fd < 0) {
-               char msg[64];
-
-               snprintf(msg, sizeof(msg),
-                        "failed to create BPF map '%s'", name);
-               if (err == E2BIG)
-                       return dt_bpf_error(dtp, "%s: Too big\n", msg);
-               if (err == EPERM)
-                       return dt_bpf_lockmem_error(dtp, msg);
-               return dt_bpf_error(dtp, "%s: %s\n", msg, strerror(err));
-       }
+       if (fd < 0)
+               return map_create_error(dtp, name, err);

         dt_dprintf("BPF map '%s' is FD %d\n", name, fd);

@@ -421,17 +428,8 @@ create_gmap_of_maps(dtrace_hdl_t *dtp, const char *name,
                 err = errno;
         }

-       if (fd < 0) {
-               char msg[64];
-
-               snprintf(msg, sizeof(msg),
-                        "failed to create BPF map '%s'", name);
-               if (err == E2BIG)
-                       return dt_bpf_error(dtp, "%s: Too big\n", msg);
-               if (err == EPERM)
-                       return dt_bpf_lockmem_error(dtp, msg);
-               return dt_bpf_error(dtp, "%s: %s\n", msg, strerror(err));
-       }
+       if (fd < 0)
+               return map_create_error(dtp, name, err);

         dt_dprintf("BPF map '%s' is FD %d\n", name, fd);

@@ -470,19 +468,40 @@ gmap_create_state(dtrace_hdl_t *dtp)
  * Create the 'aggs' BPF map.
  *
  * Aggregation data buffer map, associated with each CPU.  The map is
- * implemented as a global per-CPU map with a singleton element (key 0).
+ * implemented as a global array-of-maps indexed by CPU id.  The associated
+ * value is a map with a singleton element (key 0).
  */
 static int
 gmap_create_aggs(dtrace_hdl_t *dtp)
 {
         size_t  sz = dt_idhash_datasize(dtp->dt_aggs);
+       int     i;

         /* Only create the map if it is used. */
         if (sz == 0)
                 return 0;

-       dtp->dt_aggmap_fd = create_gmap(dtp, "aggs", BPF_MAP_TYPE_PERCPU_ARRAY,
-                                       sizeof(uint32_t), sz, 1);
+       dtp->dt_aggmap_fd = create_gmap_of_maps(dtp, "aggs",
+                                               BPF_MAP_TYPE_ARRAY_OF_MAPS,
+                                               sizeof(uint32_t),
+                                               dtp->dt_conf.num_online_cpus,
+                                               BPF_MAP_TYPE_ARRAY,
+                                               sizeof(uint32_t), sz, 1);
+
+       for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
+               int     cpu = dtp->dt_conf.cpus[i].cpu_id;
+               char    name[16];
+               int     fd;
+
+               snprintf(name, 16, "aggs_%d", cpu);
+               fd = dt_bpf_map_create(BPF_MAP_TYPE_ARRAY, name,
+                                      sizeof(uint32_t), sz, 1, 0);
+               if (fd < 0)
+                       return map_create_error(dtp, name, errno);
+
+               dt_bpf_map_update(dtp->dt_aggmap_fd, &cpu, &fd);
+       }
+

         return dtp->dt_aggmap_fd;
 }
diff --git a/libdtrace/dt_cg.c b/libdtrace/dt_cg.c
index 0963f202..157f4861 100644
--- a/libdtrace/dt_cg.c
+++ b/libdtrace/dt_cg.c
@@ -52,11 +52,13 @@ dt_cg_tramp_prologue_act(dt_pcb_t *pcb, dt_activity_t act)
 {
         dtrace_hdl_t    *dtp = pcb->pcb_hdl;
         dt_irlist_t     *dlp = &pcb->pcb_ir;
+       dt_ident_t      *aggs = dt_dlib_get_map(dtp, "aggs");
         dt_ident_t      *mem = dt_dlib_get_map(dtp, "mem");
         dt_ident_t      *state = dt_dlib_get_map(dtp, "state");
         dt_ident_t      *prid = dt_dlib_get_var(pcb->pcb_hdl, "PRID");
         uint_t          lbl_exit = pcb->pcb_exitlbl;

+       assert(aggs != NULL);
         assert(mem != NULL);
         assert(state != NULL);
         assert(prid != NULL);
@@ -206,13 +208,60 @@ dt_cg_tramp_prologue_act(dt_pcb_t *pcb, dt_activity_t act)
         DT_CG_STORE_MAP_PTR("strtab", DCTX_STRTAB);
         if (dtp->dt_options[DTRACEOPT_SCRATCHSIZE] > 0)
                 DT_CG_STORE_MAP_PTR("scratchmem", DCTX_SCRATCHMEM);
-       if (dt_idhash_datasize(dtp->dt_aggs) > 0)
-               DT_CG_STORE_MAP_PTR("aggs", DCTX_AGG);
         if (dt_idhash_datasize(dtp->dt_globals) > 0)
                 DT_CG_STORE_MAP_PTR("gvars", DCTX_GVARS);
         if (dtp->dt_maxlvaralloc > 0)
                 DT_CG_STORE_MAP_PTR("lvars", DCTX_LVARS);
 #undef DT_CG_STORE_MAP_PTR
+
+       /*
+        * Aggregation data is stored in a CPU-specific BPF map.  Populate
+        * dctx->agg with the map for the current CPU.
+        *
+        *       key = bpf_get_smp_processor_id()
+        *                               // call bpf_get_smp_processor_id
+        *                               //     (%r1 ... %r5 clobbered)
+        *                               //     (%r0 = 'aggs' BPF map value)
+        *                               // stw [%r9 + DCTX_AGG], %r0
+        *       rc = bpf_map_lookup_elem(&aggs, &key);
+        *                               // lddw %r1, &aggs
+        *                               // mov %r2, %r9
+        *                               // add %r2, DCTX_AGG
+        *                               // call bpf_map_lookup_elem
+        *                               //     (%r1 ... %r5 clobbered)
+        *                               //     (%r0 = 'aggs' BPF map value)
+        *       if (rc == 0)            // jeq %r0, 0, lbl_exit
+        *               goto exit;
+        *
+        *       key = 0;                // stw [%r9 + DCTX_AGG], 0
+        *       rc = bpf_map_lookup_elem(rc, &key);
+        *                               // mov %r1, %r0
+        *                               // mov %r2, %r9
+        *                               // add %r2, DCTX_AGG
+        *                               // call bpf_map_lookup_elem
+        *                               //     (%r1 ... %r5 clobbered)
+        *                               //     (%r0 = aggs[cpuid] BPF map value)
+        *       if (rc == 0)            // jeq %r0, 0, lbl_exit
+        *               goto exit;
+        *
+        *       dctx.aggs = rc;         // stdw [%r9 + offset], %r0
+        */
+       if (dt_idhash_datasize(dtp->dt_aggs) > 0) {
+               emit(dlp,  BPF_CALL_HELPER(BPF_FUNC_get_smp_processor_id));
+               emit(dlp,  BPF_STORE(BPF_DW, BPF_REG_9, DCTX_AGG, BPF_REG_0));
+               dt_cg_xsetx(dlp, aggs, DT_LBL_NONE, BPF_REG_1, aggs->di_id);
+               emit(dlp,  BPF_MOV_REG(BPF_REG_2, BPF_REG_9));
+               emit(dlp,  BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, DCTX_AGG));
+               emit(dlp,  BPF_CALL_HELPER(BPF_FUNC_map_lookup_elem));
+               emit(dlp,  BPF_BRANCH_IMM(BPF_JEQ, BPF_REG_0, 0, lbl_exit));
+               emit(dlp,  BPF_STORE_IMM(BPF_DW, BPF_REG_9, DCTX_AGG, 0));
+               emit(dlp,  BPF_MOV_REG(BPF_REG_1, BPF_REG_0));
+               emit(dlp,  BPF_MOV_REG(BPF_REG_2, BPF_REG_9));
+               emit(dlp,  BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, DCTX_AGG));
+               emit(dlp,  BPF_CALL_HELPER(BPF_FUNC_map_lookup_elem));
+               emit(dlp,  BPF_BRANCH_IMM(BPF_JEQ, BPF_REG_0, 0, lbl_exit));
+               emit(dlp,  BPF_STORE(BPF_DW, BPF_REG_9, DCTX_AGG, BPF_REG_0));
+       }
 }

 void
diff --git a/libdtrace/dt_impl.h b/libdtrace/dt_impl.h
index 85a1e7c9..e9b949ca 100644
--- a/libdtrace/dt_impl.h
+++ b/libdtrace/dt_impl.h
@@ -209,7 +209,6 @@ typedef struct dt_tstring {
 } dt_tstring_t;

 typedef struct dt_aggregate {
-       char **dtat_cpu_buf;            /* per-CPU agg snapshot buffers */
         char *dtat_buf;                 /* aggregation snapshot buffer */
         int dtat_flags;                 /* aggregate flags */
         dt_ahash_t dtat_hash;           /* aggregate hash table */
--
2.34.1


_______________________________________________
DTrace-devel mailing list
DTrace-devel at oss.oracle.com
https://oss.oracle.com/mailman/listinfo/dtrace-devel
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://oss.oracle.com/pipermail/dtrace-devel/attachments/20220824/50df527d/attachment-0001.html>


More information about the DTrace-devel mailing list