[DTrace-devel] [PATCH 5/5] Use array-of-maps as storage for aggregations

Kris Van Hees kris.van.hees at oracle.com
Tue Aug 23 00:21:22 UTC 2022


On Mon, Aug 22, 2022 at 03:28:08PM -0400, Eugene Loh wrote:
> With this patch, the ante goes up a lot, and it would really be nice to see
> where this whole patch series is going before reviewing.  Again, what problems
> does one intend to address?  How?  (The commit message hints at the first
> question but really does not address the second.)

I am not sure what you mean by 'the ante goes up a lot'?  And I think my email
concerning the aggregation support and differences with the legacy version does
outline where this is going: using a hash per CPU (rather than a percpu-hashmap)
so that when aggregation data is produced in a non-uniform fashion across CPUs
(which is actually quite common) we keep storage space available for other
aggregations.

> Regarding the commit message:
> 
> *)  As before:  s/indexed aggregations/aggregation keys/

See my comments in earlier emails.

> *)  What do aggregation keys have to do with this patch?  Heck, actually the
> same question goes for clear() and trunc().  I suppose this is the same "what's
> the roadmap" question.

If you follow the series, we go from a single map (per-cpu arrau map) to a
hash of array maps.  And the next step will be to switch from a hash of array
maps to a hash of hashmaps.  This is an intermediary step towards that, and one
that makes it an incremental switch-over.

> *)  s/it's/its/
> 
> *)  The last sentence of the commit message is, let's say, "puzzling."  Just
> drop it?

What is puzzling about it?

> In dt_aggregate.c, in dt_aggregate_snap_one(), if you're going to introduce
> "dtp = st->dtp",  then occurrences in this function of st->dtp should be
> replaced simply with dtp.

Thanks - I must have missed that.

> In dt_bpf.c, in gmap_create_aggs(), if a per-CPU map cannot be created, we
> don't care?
> 
> In dt_cg.c, s/COU/CPU/.

Thanks.

> Also, the big comment block in dt_cg_tramp_prologue_act() keeps saying
>           %r0 = 'mem' BPF map value
> but that looks wrong.  In general, a ton of this comment detail should be
> pulled out.  The comments are harder to understand (and less reliable) than the
> code itself.  The comments are a bunch of detail that are wrong or hard to
> understand or both.

I am not sure I would characterize 3 instances of the same obvious cut'n'paste
error as 'keeps saying'.  But I will fix the comments.

> ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
> From: Kris Van Hees via DTrace-devel <dtrace-devel at oss.oracle.com>
> Sent: Friday, August 19, 2022 10:26 AM
> To: dtrace-devel at oss.oracle.com <dtrace-devel at oss.oracle.com>
> Subject: [DTrace-devel] [PATCH 5/5] Use array-of-maps as storage for
> aggregations
>  
> In preparation for indexed aggregations and the clear() and trunc()
> operations, the storage for aggregations is moving from a per-CPU
> array map to an array of maps, indexed by CPU id.  This gives each
> CPU it's own storage to store aggregation data in.
> 
> Signed-off-by: Kris Van Hees <kris.van.hees at oracle.com>
> ---
>  libdtrace/dt_aggregate.c | 62 ++++++++++++----------------------------
>  libdtrace/dt_bpf.c       | 24 ++++++++++++++--
>  libdtrace/dt_cg.c        | 53 ++++++++++++++++++++++++++++++++--
>  libdtrace/dt_impl.h      |  1 -
>  4 files changed, 90 insertions(+), 50 deletions(-)
> 
> diff --git a/libdtrace/dt_aggregate.c b/libdtrace/dt_aggregate.c
> index 44896fd2..cb47e442 100644
> --- a/libdtrace/dt_aggregate.c
> +++ b/libdtrace/dt_aggregate.c
> @@ -412,8 +412,6 @@ typedef void (*agg_cpu_f)(dt_ident_t *aid, int64_t *dst,
> int64_t *src,
>  typedef struct dt_snapstate {
>          dtrace_hdl_t    *dtp;
>          processorid_t   cpu;
> -       char            *buf;
> -       dt_aggregate_t  *agp;
>  } dt_snapstate_t;
>  
>  static void
> @@ -444,7 +442,9 @@ dt_agg_one_agg(dt_ident_t *aid, int64_t *dst, int64_t *src,
> uint_t datasz)
>  static int
>  dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
>  {
> -       dt_ahash_t              *agh = &st->agp->dtat_hash;
> +       dtrace_hdl_t            *dtp = st->dtp;
> +       dt_aggregate_t          *agp = &dtp->dt_aggregate;
> +       dt_ahash_t              *agh = &agp->dtat_hash;
>          dt_ahashent_t           *h;
>          dtrace_aggdesc_t        *agg;
>          dtrace_aggdata_t        *agd;
> @@ -454,12 +454,12 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid,
> dt_snapstate_t *st)
>          uint_t                  i, datasz;
>          int64_t                 *src;
>  
> -       rval = dt_aggid_lookup(st->dtp, aid->di_id, &agg);
> +       rval = dt_aggid_lookup(dtp, aid->di_id, &agg);
>          if (rval != 0)
>                  return rval;
>  
>          /* point to the data counter */
> -       src = (int64_t *)(st->buf + aid->di_offset);
> +       src = (int64_t *)(agp->dtat_buf + aid->di_offset);
>  
>          /* skip it if data counter is 0 */
>          if (*src == 0)
> @@ -506,7 +506,7 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid,
> dt_snapstate_t *st)
>          h->dtahe_hval = hval;
>          h->dtahe_size = datasz;
>  
> -       if (st->agp->dtat_flags & DTRACE_A_PERCPU) {
> +       if (agp->dtat_flags & DTRACE_A_PERCPU) {
>                  char    **percpu = dt_calloc(st->dtp,
>                                               st->dtp->dt_conf.max_cpuid + 1,
>                                               sizeof(char *));
> @@ -553,14 +553,15 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid,
> dt_snapstate_t *st)
>  static int
>  dt_aggregate_snap_cpu(dtrace_hdl_t *dtp, processorid_t cpu)
>  {
> -       dt_aggregate_t  *agp = &dtp->dt_aggregate;
> -       char            *buf = agp->dtat_cpu_buf[cpu];
>          dt_snapstate_t  st;
> +       uint32_t        key = 0;
>  
>          st.dtp = dtp;
>          st.cpu = cpu;
> -       st.buf = buf;
> -       st.agp = agp;
> +
> +       if (dt_bpf_map_lookup_inner(dtp->dt_aggmap_fd, &cpu, &key,
> +                                   dtp->dt_aggregate.dtat_buf) == -1)
> +               return 0;
>  
>          return dt_idhash_iter(dtp->dt_aggs,
>                                (dt_idhash_f *)dt_aggregate_snap_one, &st);
> @@ -573,22 +574,17 @@ int
>  dtrace_aggregate_snap(dtrace_hdl_t *dtp)
>  {
>          dt_aggregate_t  *agp = &dtp->dt_aggregate;
> -       uint32_t        key = 0;
>          int             i, rval;
>  
>          /*
>           * If we do not have a buffer initialized, we will not be processing
>           * aggregations, so there is nothing to be done here.
>           */
> -       if (agp->dtat_cpu_buf == NULL)
> +       if (agp->dtat_buf == NULL)
>                  return 0;
>  
>          dtrace_aggregate_clear(dtp);
>  
> -       rval = dt_bpf_map_lookup(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
> -       if (rval != 0)
> -               return dt_set_errno(dtp, -rval);
> -
>          for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
>                  rval = dt_aggregate_snap_cpu(dtp, dtp->dt_conf.cpus
> [i].cpu_id);
>                  if (rval != 0)
> @@ -999,41 +995,22 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
>          dt_aggregate_t  *agp = &dtp->dt_aggregate;
>          dt_ahash_t      *agh = &agp->dtat_hash;
>          int             aggsz, i;
> -       uint32_t        key = 0;
>  
>          /* If there are no aggregations there is nothing to do. */
>          aggsz = dt_idhash_datasize(dtp->dt_aggs);
>          if (aggsz <= 0)
>                  return 0;
>  
> -       /*
> -        * Allocate a buffer to hold the aggregation data for all possible
> -        * CPUs, and initialize the per-CPU data pointers for CPUs that are
> -        * currently enabled.
> -        */
> -       agp->dtat_buf = dt_zalloc(dtp, dtp->dt_conf.num_possible_cpus * aggsz);
> +       /* Allocate a buffer to hold the aggregation data for a CPU.  */
> +       agp->dtat_buf = dt_zalloc(dtp, aggsz);
>          if (agp->dtat_buf == NULL)
>                  return dt_set_errno(dtp, EDT_NOMEM);
>  
> -       agp->dtat_cpu_buf = dt_calloc(dtp, dtp->dt_conf.max_cpuid + 1,
> -                                     sizeof(char *));
> -       if (agp->dtat_cpu_buf == NULL) {
> -               dt_free(dtp, agp->dtat_buf);
> -               return dt_set_errno(dtp, EDT_NOMEM);
> -       }
> -
> -       for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
> -               int     cpu = dtp->dt_conf.cpus[i].cpu_id;
> -
> -               agp->dtat_cpu_buf[cpu] = agp->dtat_buf + cpu * aggsz;
> -       }
> -
>          /* Create the aggregation hash. */
>          agh->dtah_size = DTRACE_AHASHSIZE;
>          agh->dtah_hash = dt_zalloc(dtp,
>                                     agh->dtah_size * sizeof(dt_ahashent_t *));
>          if (agh->dtah_hash == NULL) {
> -               dt_free(dtp, agp->dtat_cpu_buf);
>                  dt_free(dtp, agp->dtat_buf);
>                  return dt_set_errno(dtp, EDT_NOMEM);
>          }
> @@ -1045,15 +1022,13 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
>                  return 0;
>          *(int64_t *)agp->dtat_buf = 0;  /* clear the flag */
>          for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
> -               int     cpu = dtp->dt_conf.cpus[i].cpu_id;
> +               int             cpu = dtp->dt_conf.cpus[i].cpu_id;
> +               uint32_t        key = 0;
>  
> -               /* Data for CPU 0 was populated, so skip it. */
> -               if (cpu == 0)
> +               if (dt_bpf_map_update_inner(dtp->dt_aggmap_fd, &cpu, &key,
> +                                           dtp->dt_aggregate.dtat_buf) == -1)
>                          continue;
> -
> -               memcpy(agp->dtat_cpu_buf[cpu], agp->dtat_buf, aggsz);
>          }
> -       dt_bpf_map_update(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
>  
>          return 0;
>  }
> @@ -1820,6 +1795,5 @@ dt_aggregate_destroy(dtrace_hdl_t *dtp)
>                  hash->dtah_size = 0;
>          }
>  
> -       dt_free(dtp, agp->dtat_cpu_buf);
>          dt_free(dtp, agp->dtat_buf);
>  }
> diff --git a/libdtrace/dt_bpf.c b/libdtrace/dt_bpf.c
> index 66c76022..3207a416 100644
> --- a/libdtrace/dt_bpf.c
> +++ b/libdtrace/dt_bpf.c
> @@ -469,19 +469,37 @@ gmap_create_state(dtrace_hdl_t *dtp)
>   * Create the 'aggs' BPF map.
>   *
>   * Aggregation data buffer map, associated with each CPU.  The map is
> - * implemented as a global per-CPU map with a singleton element (key 0).
> + * implemented as a global array-of-maps indexed by CPU id.  The associated
> + * value is a map with a singleton element (key 0).
>   */
>  static int
>  gmap_create_aggs(dtrace_hdl_t *dtp)
>  {
>          size_t  sz = dt_idhash_datasize(dtp->dt_aggs);
> +       int     i;
>  
>          /* Only create the map if it is used. */
>          if (sz == 0)
>                  return 0;
>  
> -       dtp->dt_aggmap_fd = create_gmap(dtp, "aggs", BPF_MAP_TYPE_PERCPU_ARRAY,
> -                                       sizeof(uint32_t), sz, 1);
> +       dtp->dt_aggmap_fd = create_gmap_of_maps(dtp, "aggs",
> +                                               BPF_MAP_TYPE_ARRAY_OF_MAPS,
> +                                               sizeof(uint32_t),
> +                                               dtp->dt_conf.num_online_cpus,
> +                                               BPF_MAP_TYPE_ARRAY,
> +                                               sizeof(uint32_t), sz, 1);
> +
> +       for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
> +               int     cpu = dtp->dt_conf.cpus[i].cpu_id;
> +               int     fd = dt_bpf_map_create(BPF_MAP_TYPE_ARRAY, NULL,
> +                                              sizeof(uint32_t), sz, 1, 0);
> +
> +               if (fd < 0)
> +                       continue;
> +
> +               dt_bpf_map_update(dtp->dt_aggmap_fd, &cpu, &fd);
> +       }
> +
>  
>          return dtp->dt_aggmap_fd;
>  
> diff --git a/libdtrace/dt_cg.c b/libdtrace/dt_cg.c
> index 0963f202..b6ade5a9 100644
> --- a/libdtrace/dt_cg.c
> +++ b/libdtrace/dt_cg.c
> @@ -52,11 +52,13 @@ dt_cg_tramp_prologue_act(dt_pcb_t *pcb, dt_activity_t act)
>  {
>          dtrace_hdl_t    *dtp = pcb->pcb_hdl;
>          dt_irlist_t     *dlp = &pcb->pcb_ir;
> +       dt_ident_t      *aggs = dt_dlib_get_map(dtp, "aggs");
>          dt_ident_t      *mem = dt_dlib_get_map(dtp, "mem");
>          dt_ident_t      *state = dt_dlib_get_map(dtp, "state");
>          dt_ident_t      *prid = dt_dlib_get_var(pcb->pcb_hdl, "PRID");
>          uint_t          lbl_exit = pcb->pcb_exitlbl;
>  
> +       assert(aggs != NULL);
>          assert(mem != NULL);
>          assert(state != NULL);
>          assert(prid != NULL);
> @@ -206,13 +208,60 @@ dt_cg_tramp_prologue_act(dt_pcb_t *pcb, dt_activity_t
> act)
>          DT_CG_STORE_MAP_PTR("strtab", DCTX_STRTAB);
>          if (dtp->dt_options[DTRACEOPT_SCRATCHSIZE] > 0)
>                  DT_CG_STORE_MAP_PTR("scratchmem", DCTX_SCRATCHMEM);
> -       if (dt_idhash_datasize(dtp->dt_aggs) > 0)
> -               DT_CG_STORE_MAP_PTR("aggs", DCTX_AGG);
>          if (dt_idhash_datasize(dtp->dt_globals) > 0)
>                  DT_CG_STORE_MAP_PTR("gvars", DCTX_GVARS);
>          if (dtp->dt_maxlvaralloc > 0)
>                  DT_CG_STORE_MAP_PTR("lvars", DCTX_LVARS);
>  #undef DT_CG_STORE_MAP_PTR
> +
> +       /*
> +        * Aggregation data is stored in a COU-specific BPF map.  Populate
> +        * dctx->agg with the map for the current CPU.
> +        *
> +        *       key = bpf_get_smp_processor_id()
> +        *                               // call bpf_get_smp_processor_id
> +        *                               //     (%r1 ... %r5 clobbered)
> +        *                               //     (%r0 = 'mem' BPF map value)
> +        *                               // stw [%r9 + DCTX_AGG], %r0
> +        *       rc = bpf_map_lookup_elem(&aggs, &key);
> +        *                               // lddw %r1, &aggs
> +        *                               // mov %r2, %r9
> +        *                               // add %r2, DCTX_AGG
> +        *                               // call bpf_map_lookup_elem
> +        *                               //     (%r1 ... %r5 clobbered)
> +        *                               //     (%r0 = 'mem' BPF map value)
> +        *       if (rc == 0)            // jeq %r0, 0, lbl_exit
> +        *               goto exit;
> +        *
> +        *       key = 0;                // stw [%r9 + DCTX_AGG], 0
> +        *       rc = bpf_map_lookup_elem(rc, &key);
> +        *                               // mov %r1, %r0
> +        *                               // mov %r2, %r9
> +        *                               // add %r2, DCTX_AGG
> +        *                               // call bpf_map_lookup_elem
> +        *                               //     (%r1 ... %r5 clobbered)
> +        *                               //     (%r0 = 'mem' BPF map value)
> +        *       if (rc == 0)            // jeq %r0, 0, lbl_exit
> +        *               goto exit;
> +        *
> +        *       dctx.aggs = rc;         // stdw [%r9 + offset], %r0
> +        */
> +       if (dt_idhash_datasize(dtp->dt_aggs) > 0) {
> +               emit(dlp,  BPF_CALL_HELPER(BPF_FUNC_get_smp_processor_id));
> +               emit(dlp,  BPF_STORE(BPF_DW, BPF_REG_9, DCTX_AGG, BPF_REG_0));
> +               dt_cg_xsetx(dlp, aggs, DT_LBL_NONE, BPF_REG_1, aggs->di_id);
> +               emit(dlp,  BPF_MOV_REG(BPF_REG_2, BPF_REG_9));
> +               emit(dlp,  BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, DCTX_AGG));
> +               emit(dlp,  BPF_CALL_HELPER(BPF_FUNC_map_lookup_elem));
> +               emit(dlp,  BPF_BRANCH_IMM(BPF_JEQ, BPF_REG_0, 0, lbl_exit));
> +               emit(dlp,  BPF_STORE_IMM(BPF_DW, BPF_REG_9, DCTX_AGG, 0));
> +               emit(dlp,  BPF_MOV_REG(BPF_REG_1, BPF_REG_0));
> +               emit(dlp,  BPF_MOV_REG(BPF_REG_2, BPF_REG_9));
> +               emit(dlp,  BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, DCTX_AGG));
> +               emit(dlp,  BPF_CALL_HELPER(BPF_FUNC_map_lookup_elem));
> +               emit(dlp,  BPF_BRANCH_IMM(BPF_JEQ, BPF_REG_0, 0, lbl_exit));
> +               emit(dlp,  BPF_STORE(BPF_DW, BPF_REG_9, DCTX_AGG, BPF_REG_0));
> +       }
>  }
>  
>  void
> diff --git a/libdtrace/dt_impl.h b/libdtrace/dt_impl.h
> index 85a1e7c9..e9b949ca 100644
> --- a/libdtrace/dt_impl.h
> +++ b/libdtrace/dt_impl.h
> @@ -209,7 +209,6 @@ typedef struct dt_tstring {
>  } dt_tstring_t;
>  
>  typedef struct dt_aggregate {
> -       char **dtat_cpu_buf;            /* per-CPU agg snapshot buffers */
>          char *dtat_buf;                 /* aggregation snapshot buffer */
>          int dtat_flags;                 /* aggregate flags */
>          dt_ahash_t dtat_hash;           /* aggregate hash table */
> --
> 2.34.1
> 
> 
> _______________________________________________
> DTrace-devel mailing list
> DTrace-devel at oss.oracle.com
> https://oss.oracle.com/mailman/listinfo/dtrace-devel



More information about the DTrace-devel mailing list