[DTrace-devel] [PATCH 5/5] Use array-of-maps as storage for aggregations

Eugene Loh eugene.loh at oracle.com
Mon Aug 22 19:28:08 UTC 2022


With this patch, the ante goes up a lot, and it would really be nice to see where this whole patch series is going before reviewing.  Again, what problems does one intend to address?  How?  (The commit message hints at the first question but really does not address the second.)

Regarding the commit message:

*)  As before:  s/indexed aggregations/aggregation keys/

*)  What do aggregation keys have to do with this patch?  Heck, actually the same question goes for clear() and trunc().  I suppose this is the same "what's the roadmap" question.

*)  s/it's/its/

*)  The last sentence of the commit message is, let's say, "puzzling."  Just drop it?

In dt_aggregate.c, in dt_aggregate_snap_one(), if you're going to introduce "dtp = st->dtp",  then occurrences in this function of st->dtp should be replaced simply with dtp.

In dt_bpf.c, in gmap_create_aggs(), if a per-CPU map cannot be created, we don't care?

In dt_cg.c, s/COU/CPU/.

Also, the big comment block in dt_cg_tramp_prologue_act() keeps saying
          %r0 = 'mem' BPF map value
but that looks wrong.  In general, a ton of this comment detail should be pulled out.  The comments are harder to understand (and less reliable) than the code itself.  The comments are a bunch of detail that are wrong or hard to understand or both.


________________________________
From: Kris Van Hees via DTrace-devel <dtrace-devel at oss.oracle.com>
Sent: Friday, August 19, 2022 10:26 AM
To: dtrace-devel at oss.oracle.com <dtrace-devel at oss.oracle.com>
Subject: [DTrace-devel] [PATCH 5/5] Use array-of-maps as storage for aggregations

In preparation for indexed aggregations and the clear() and trunc()
operations, the storage for aggregations is moving from a per-CPU
array map to an array of maps, indexed by CPU id.  This gives each
CPU it's own storage to store aggregation data in.

Signed-off-by: Kris Van Hees <kris.van.hees at oracle.com>
---
 libdtrace/dt_aggregate.c | 62 ++++++++++++----------------------------
 libdtrace/dt_bpf.c       | 24 ++++++++++++++--
 libdtrace/dt_cg.c        | 53 ++++++++++++++++++++++++++++++++--
 libdtrace/dt_impl.h      |  1 -
 4 files changed, 90 insertions(+), 50 deletions(-)

diff --git a/libdtrace/dt_aggregate.c b/libdtrace/dt_aggregate.c
index 44896fd2..cb47e442 100644
--- a/libdtrace/dt_aggregate.c
+++ b/libdtrace/dt_aggregate.c
@@ -412,8 +412,6 @@ typedef void (*agg_cpu_f)(dt_ident_t *aid, int64_t *dst, int64_t *src,
 typedef struct dt_snapstate {
         dtrace_hdl_t    *dtp;
         processorid_t   cpu;
-       char            *buf;
-       dt_aggregate_t  *agp;
 } dt_snapstate_t;

 static void
@@ -444,7 +442,9 @@ dt_agg_one_agg(dt_ident_t *aid, int64_t *dst, int64_t *src, uint_t datasz)
 static int
 dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
 {
-       dt_ahash_t              *agh = &st->agp->dtat_hash;
+       dtrace_hdl_t            *dtp = st->dtp;
+       dt_aggregate_t          *agp = &dtp->dt_aggregate;
+       dt_ahash_t              *agh = &agp->dtat_hash;
         dt_ahashent_t           *h;
         dtrace_aggdesc_t        *agg;
         dtrace_aggdata_t        *agd;
@@ -454,12 +454,12 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
         uint_t                  i, datasz;
         int64_t                 *src;

-       rval = dt_aggid_lookup(st->dtp, aid->di_id, &agg);
+       rval = dt_aggid_lookup(dtp, aid->di_id, &agg);
         if (rval != 0)
                 return rval;

         /* point to the data counter */
-       src = (int64_t *)(st->buf + aid->di_offset);
+       src = (int64_t *)(agp->dtat_buf + aid->di_offset);

         /* skip it if data counter is 0 */
         if (*src == 0)
@@ -506,7 +506,7 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
         h->dtahe_hval = hval;
         h->dtahe_size = datasz;

-       if (st->agp->dtat_flags & DTRACE_A_PERCPU) {
+       if (agp->dtat_flags & DTRACE_A_PERCPU) {
                 char    **percpu = dt_calloc(st->dtp,
                                              st->dtp->dt_conf.max_cpuid + 1,
                                              sizeof(char *));
@@ -553,14 +553,15 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
 static int
 dt_aggregate_snap_cpu(dtrace_hdl_t *dtp, processorid_t cpu)
 {
-       dt_aggregate_t  *agp = &dtp->dt_aggregate;
-       char            *buf = agp->dtat_cpu_buf[cpu];
         dt_snapstate_t  st;
+       uint32_t        key = 0;

         st.dtp = dtp;
         st.cpu = cpu;
-       st.buf = buf;
-       st.agp = agp;
+
+       if (dt_bpf_map_lookup_inner(dtp->dt_aggmap_fd, &cpu, &key,
+                                   dtp->dt_aggregate.dtat_buf) == -1)
+               return 0;

         return dt_idhash_iter(dtp->dt_aggs,
                               (dt_idhash_f *)dt_aggregate_snap_one, &st);
@@ -573,22 +574,17 @@ int
 dtrace_aggregate_snap(dtrace_hdl_t *dtp)
 {
         dt_aggregate_t  *agp = &dtp->dt_aggregate;
-       uint32_t        key = 0;
         int             i, rval;

         /*
          * If we do not have a buffer initialized, we will not be processing
          * aggregations, so there is nothing to be done here.
          */
-       if (agp->dtat_cpu_buf == NULL)
+       if (agp->dtat_buf == NULL)
                 return 0;

         dtrace_aggregate_clear(dtp);

-       rval = dt_bpf_map_lookup(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
-       if (rval != 0)
-               return dt_set_errno(dtp, -rval);
-
         for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
                 rval = dt_aggregate_snap_cpu(dtp, dtp->dt_conf.cpus[i].cpu_id);
                 if (rval != 0)
@@ -999,41 +995,22 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
         dt_aggregate_t  *agp = &dtp->dt_aggregate;
         dt_ahash_t      *agh = &agp->dtat_hash;
         int             aggsz, i;
-       uint32_t        key = 0;

         /* If there are no aggregations there is nothing to do. */
         aggsz = dt_idhash_datasize(dtp->dt_aggs);
         if (aggsz <= 0)
                 return 0;

-       /*
-        * Allocate a buffer to hold the aggregation data for all possible
-        * CPUs, and initialize the per-CPU data pointers for CPUs that are
-        * currently enabled.
-        */
-       agp->dtat_buf = dt_zalloc(dtp, dtp->dt_conf.num_possible_cpus * aggsz);
+       /* Allocate a buffer to hold the aggregation data for a CPU.  */
+       agp->dtat_buf = dt_zalloc(dtp, aggsz);
         if (agp->dtat_buf == NULL)
                 return dt_set_errno(dtp, EDT_NOMEM);

-       agp->dtat_cpu_buf = dt_calloc(dtp, dtp->dt_conf.max_cpuid + 1,
-                                     sizeof(char *));
-       if (agp->dtat_cpu_buf == NULL) {
-               dt_free(dtp, agp->dtat_buf);
-               return dt_set_errno(dtp, EDT_NOMEM);
-       }
-
-       for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
-               int     cpu = dtp->dt_conf.cpus[i].cpu_id;
-
-               agp->dtat_cpu_buf[cpu] = agp->dtat_buf + cpu * aggsz;
-       }
-
         /* Create the aggregation hash. */
         agh->dtah_size = DTRACE_AHASHSIZE;
         agh->dtah_hash = dt_zalloc(dtp,
                                    agh->dtah_size * sizeof(dt_ahashent_t *));
         if (agh->dtah_hash == NULL) {
-               dt_free(dtp, agp->dtat_cpu_buf);
                 dt_free(dtp, agp->dtat_buf);
                 return dt_set_errno(dtp, EDT_NOMEM);
         }
@@ -1045,15 +1022,13 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
                 return 0;
         *(int64_t *)agp->dtat_buf = 0;  /* clear the flag */
         for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
-               int     cpu = dtp->dt_conf.cpus[i].cpu_id;
+               int             cpu = dtp->dt_conf.cpus[i].cpu_id;
+               uint32_t        key = 0;

-               /* Data for CPU 0 was populated, so skip it. */
-               if (cpu == 0)
+               if (dt_bpf_map_update_inner(dtp->dt_aggmap_fd, &cpu, &key,
+                                           dtp->dt_aggregate.dtat_buf) == -1)
                         continue;
-
-               memcpy(agp->dtat_cpu_buf[cpu], agp->dtat_buf, aggsz);
         }
-       dt_bpf_map_update(dtp->dt_aggmap_fd, &key, agp->dtat_buf);

         return 0;
 }
@@ -1820,6 +1795,5 @@ dt_aggregate_destroy(dtrace_hdl_t *dtp)
                 hash->dtah_size = 0;
         }

-       dt_free(dtp, agp->dtat_cpu_buf);
         dt_free(dtp, agp->dtat_buf);
 }
diff --git a/libdtrace/dt_bpf.c b/libdtrace/dt_bpf.c
index 66c76022..3207a416 100644
--- a/libdtrace/dt_bpf.c
+++ b/libdtrace/dt_bpf.c
@@ -469,19 +469,37 @@ gmap_create_state(dtrace_hdl_t *dtp)
  * Create the 'aggs' BPF map.
  *
  * Aggregation data buffer map, associated with each CPU.  The map is
- * implemented as a global per-CPU map with a singleton element (key 0).
+ * implemented as a global array-of-maps indexed by CPU id.  The associated
+ * value is a map with a singleton element (key 0).
  */
 static int
 gmap_create_aggs(dtrace_hdl_t *dtp)
 {
         size_t  sz = dt_idhash_datasize(dtp->dt_aggs);
+       int     i;

         /* Only create the map if it is used. */
         if (sz == 0)
                 return 0;

-       dtp->dt_aggmap_fd = create_gmap(dtp, "aggs", BPF_MAP_TYPE_PERCPU_ARRAY,
-                                       sizeof(uint32_t), sz, 1);
+       dtp->dt_aggmap_fd = create_gmap_of_maps(dtp, "aggs",
+                                               BPF_MAP_TYPE_ARRAY_OF_MAPS,
+                                               sizeof(uint32_t),
+                                               dtp->dt_conf.num_online_cpus,
+                                               BPF_MAP_TYPE_ARRAY,
+                                               sizeof(uint32_t), sz, 1);
+
+       for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
+               int     cpu = dtp->dt_conf.cpus[i].cpu_id;
+               int     fd = dt_bpf_map_create(BPF_MAP_TYPE_ARRAY, NULL,
+                                              sizeof(uint32_t), sz, 1, 0);
+
+               if (fd < 0)
+                       continue;
+
+               dt_bpf_map_update(dtp->dt_aggmap_fd, &cpu, &fd);
+       }
+

         return dtp->dt_aggmap_fd;

diff --git a/libdtrace/dt_cg.c b/libdtrace/dt_cg.c
index 0963f202..b6ade5a9 100644
--- a/libdtrace/dt_cg.c
+++ b/libdtrace/dt_cg.c
@@ -52,11 +52,13 @@ dt_cg_tramp_prologue_act(dt_pcb_t *pcb, dt_activity_t act)
 {
         dtrace_hdl_t    *dtp = pcb->pcb_hdl;
         dt_irlist_t     *dlp = &pcb->pcb_ir;
+       dt_ident_t      *aggs = dt_dlib_get_map(dtp, "aggs");
         dt_ident_t      *mem = dt_dlib_get_map(dtp, "mem");
         dt_ident_t      *state = dt_dlib_get_map(dtp, "state");
         dt_ident_t      *prid = dt_dlib_get_var(pcb->pcb_hdl, "PRID");
         uint_t          lbl_exit = pcb->pcb_exitlbl;

+       assert(aggs != NULL);
         assert(mem != NULL);
         assert(state != NULL);
         assert(prid != NULL);
@@ -206,13 +208,60 @@ dt_cg_tramp_prologue_act(dt_pcb_t *pcb, dt_activity_t act)
         DT_CG_STORE_MAP_PTR("strtab", DCTX_STRTAB);
         if (dtp->dt_options[DTRACEOPT_SCRATCHSIZE] > 0)
                 DT_CG_STORE_MAP_PTR("scratchmem", DCTX_SCRATCHMEM);
-       if (dt_idhash_datasize(dtp->dt_aggs) > 0)
-               DT_CG_STORE_MAP_PTR("aggs", DCTX_AGG);
         if (dt_idhash_datasize(dtp->dt_globals) > 0)
                 DT_CG_STORE_MAP_PTR("gvars", DCTX_GVARS);
         if (dtp->dt_maxlvaralloc > 0)
                 DT_CG_STORE_MAP_PTR("lvars", DCTX_LVARS);
 #undef DT_CG_STORE_MAP_PTR
+
+       /*
+        * Aggregation data is stored in a COU-specific BPF map.  Populate
+        * dctx->agg with the map for the current CPU.
+        *
+        *       key = bpf_get_smp_processor_id()
+        *                               // call bpf_get_smp_processor_id
+        *                               //     (%r1 ... %r5 clobbered)
+        *                               //     (%r0 = 'mem' BPF map value)
+        *                               // stw [%r9 + DCTX_AGG], %r0
+        *       rc = bpf_map_lookup_elem(&aggs, &key);
+        *                               // lddw %r1, &aggs
+        *                               // mov %r2, %r9
+        *                               // add %r2, DCTX_AGG
+        *                               // call bpf_map_lookup_elem
+        *                               //     (%r1 ... %r5 clobbered)
+        *                               //     (%r0 = 'mem' BPF map value)
+        *       if (rc == 0)            // jeq %r0, 0, lbl_exit
+        *               goto exit;
+        *
+        *       key = 0;                // stw [%r9 + DCTX_AGG], 0
+        *       rc = bpf_map_lookup_elem(rc, &key);
+        *                               // mov %r1, %r0
+        *                               // mov %r2, %r9
+        *                               // add %r2, DCTX_AGG
+        *                               // call bpf_map_lookup_elem
+        *                               //     (%r1 ... %r5 clobbered)
+        *                               //     (%r0 = 'mem' BPF map value)
+        *       if (rc == 0)            // jeq %r0, 0, lbl_exit
+        *               goto exit;
+        *
+        *       dctx.aggs = rc;         // stdw [%r9 + offset], %r0
+        */
+       if (dt_idhash_datasize(dtp->dt_aggs) > 0) {
+               emit(dlp,  BPF_CALL_HELPER(BPF_FUNC_get_smp_processor_id));
+               emit(dlp,  BPF_STORE(BPF_DW, BPF_REG_9, DCTX_AGG, BPF_REG_0));
+               dt_cg_xsetx(dlp, aggs, DT_LBL_NONE, BPF_REG_1, aggs->di_id);
+               emit(dlp,  BPF_MOV_REG(BPF_REG_2, BPF_REG_9));
+               emit(dlp,  BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, DCTX_AGG));
+               emit(dlp,  BPF_CALL_HELPER(BPF_FUNC_map_lookup_elem));
+               emit(dlp,  BPF_BRANCH_IMM(BPF_JEQ, BPF_REG_0, 0, lbl_exit));
+               emit(dlp,  BPF_STORE_IMM(BPF_DW, BPF_REG_9, DCTX_AGG, 0));
+               emit(dlp,  BPF_MOV_REG(BPF_REG_1, BPF_REG_0));
+               emit(dlp,  BPF_MOV_REG(BPF_REG_2, BPF_REG_9));
+               emit(dlp,  BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, DCTX_AGG));
+               emit(dlp,  BPF_CALL_HELPER(BPF_FUNC_map_lookup_elem));
+               emit(dlp,  BPF_BRANCH_IMM(BPF_JEQ, BPF_REG_0, 0, lbl_exit));
+               emit(dlp,  BPF_STORE(BPF_DW, BPF_REG_9, DCTX_AGG, BPF_REG_0));
+       }
 }

 void
diff --git a/libdtrace/dt_impl.h b/libdtrace/dt_impl.h
index 85a1e7c9..e9b949ca 100644
--- a/libdtrace/dt_impl.h
+++ b/libdtrace/dt_impl.h
@@ -209,7 +209,6 @@ typedef struct dt_tstring {
 } dt_tstring_t;

 typedef struct dt_aggregate {
-       char **dtat_cpu_buf;            /* per-CPU agg snapshot buffers */
         char *dtat_buf;                 /* aggregation snapshot buffer */
         int dtat_flags;                 /* aggregate flags */
         dt_ahash_t dtat_hash;           /* aggregate hash table */
--
2.34.1


_______________________________________________
DTrace-devel mailing list
DTrace-devel at oss.oracle.com
https://oss.oracle.com/mailman/listinfo/dtrace-devel
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://oss.oracle.com/pipermail/dtrace-devel/attachments/20220822/a24c1a8a/attachment-0001.html>


More information about the DTrace-devel mailing list