[DTrace-devel] [PATCH 5/5] Use array-of-maps as storage for aggregations

Kris Van Hees kris.van.hees at oracle.com
Fri Aug 19 17:26:13 UTC 2022


In preparation for indexed aggregations and the clear() and trunc()
operations, the storage for aggregations is moving from a per-CPU
array map to an array of maps, indexed by CPU id.  This gives each
CPU it's own storage to store aggregation data in.

Signed-off-by: Kris Van Hees <kris.van.hees at oracle.com>
---
 libdtrace/dt_aggregate.c | 62 ++++++++++++----------------------------
 libdtrace/dt_bpf.c       | 24 ++++++++++++++--
 libdtrace/dt_cg.c        | 53 ++++++++++++++++++++++++++++++++--
 libdtrace/dt_impl.h      |  1 -
 4 files changed, 90 insertions(+), 50 deletions(-)

diff --git a/libdtrace/dt_aggregate.c b/libdtrace/dt_aggregate.c
index 44896fd2..cb47e442 100644
--- a/libdtrace/dt_aggregate.c
+++ b/libdtrace/dt_aggregate.c
@@ -412,8 +412,6 @@ typedef void (*agg_cpu_f)(dt_ident_t *aid, int64_t *dst, int64_t *src,
 typedef struct dt_snapstate {
 	dtrace_hdl_t	*dtp;
 	processorid_t	cpu;
-	char		*buf;
-	dt_aggregate_t	*agp;
 } dt_snapstate_t;
 
 static void
@@ -444,7 +442,9 @@ dt_agg_one_agg(dt_ident_t *aid, int64_t *dst, int64_t *src, uint_t datasz)
 static int
 dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
 {
-	dt_ahash_t		*agh = &st->agp->dtat_hash;
+	dtrace_hdl_t		*dtp = st->dtp;
+	dt_aggregate_t		*agp = &dtp->dt_aggregate;
+	dt_ahash_t		*agh = &agp->dtat_hash;
 	dt_ahashent_t		*h;
 	dtrace_aggdesc_t	*agg;
 	dtrace_aggdata_t	*agd;
@@ -454,12 +454,12 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
 	uint_t			i, datasz;
 	int64_t			*src;
 
-	rval = dt_aggid_lookup(st->dtp, aid->di_id, &agg);
+	rval = dt_aggid_lookup(dtp, aid->di_id, &agg);
 	if (rval != 0)
 		return rval;
 
 	/* point to the data counter */
-	src = (int64_t *)(st->buf + aid->di_offset);
+	src = (int64_t *)(agp->dtat_buf + aid->di_offset);
 
 	/* skip it if data counter is 0 */
 	if (*src == 0)
@@ -506,7 +506,7 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
 	h->dtahe_hval = hval;
 	h->dtahe_size = datasz;
 
-	if (st->agp->dtat_flags & DTRACE_A_PERCPU) {
+	if (agp->dtat_flags & DTRACE_A_PERCPU) {
 		char	**percpu = dt_calloc(st->dtp,
 					     st->dtp->dt_conf.max_cpuid + 1,
 					     sizeof(char *));
@@ -553,14 +553,15 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
 static int
 dt_aggregate_snap_cpu(dtrace_hdl_t *dtp, processorid_t cpu)
 {
-	dt_aggregate_t	*agp = &dtp->dt_aggregate;
-	char		*buf = agp->dtat_cpu_buf[cpu];
 	dt_snapstate_t	st;
+	uint32_t	key = 0;
 
 	st.dtp = dtp;
 	st.cpu = cpu;
-	st.buf = buf;
-	st.agp = agp;
+
+	if (dt_bpf_map_lookup_inner(dtp->dt_aggmap_fd, &cpu, &key,
+				    dtp->dt_aggregate.dtat_buf) == -1)
+		return 0;
 
 	return dt_idhash_iter(dtp->dt_aggs,
 			      (dt_idhash_f *)dt_aggregate_snap_one, &st);
@@ -573,22 +574,17 @@ int
 dtrace_aggregate_snap(dtrace_hdl_t *dtp)
 {
 	dt_aggregate_t	*agp = &dtp->dt_aggregate;
-	uint32_t	key = 0;
 	int		i, rval;
 
 	/*
 	 * If we do not have a buffer initialized, we will not be processing
 	 * aggregations, so there is nothing to be done here.
 	 */
-	if (agp->dtat_cpu_buf == NULL)
+	if (agp->dtat_buf == NULL)
 		return 0;
 
 	dtrace_aggregate_clear(dtp);
 
-	rval = dt_bpf_map_lookup(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
-	if (rval != 0)
-		return dt_set_errno(dtp, -rval);
-
 	for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
 		rval = dt_aggregate_snap_cpu(dtp, dtp->dt_conf.cpus[i].cpu_id);
 		if (rval != 0)
@@ -999,41 +995,22 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
 	dt_aggregate_t	*agp = &dtp->dt_aggregate;
 	dt_ahash_t	*agh = &agp->dtat_hash;
 	int		aggsz, i;
-	uint32_t	key = 0;
 
 	/* If there are no aggregations there is nothing to do. */
 	aggsz = dt_idhash_datasize(dtp->dt_aggs);
 	if (aggsz <= 0)
 		return 0;
 
-	/*
-	 * Allocate a buffer to hold the aggregation data for all possible
-	 * CPUs, and initialize the per-CPU data pointers for CPUs that are
-	 * currently enabled.
-	 */
-	agp->dtat_buf = dt_zalloc(dtp, dtp->dt_conf.num_possible_cpus * aggsz);
+	/* Allocate a buffer to hold the aggregation data for a CPU.  */
+	agp->dtat_buf = dt_zalloc(dtp, aggsz);
 	if (agp->dtat_buf == NULL)
 		return dt_set_errno(dtp, EDT_NOMEM);
 
-	agp->dtat_cpu_buf = dt_calloc(dtp, dtp->dt_conf.max_cpuid + 1,
-				      sizeof(char *));
-	if (agp->dtat_cpu_buf == NULL) {
-		dt_free(dtp, agp->dtat_buf);
-		return dt_set_errno(dtp, EDT_NOMEM);
-	}
-
-	for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
-		int	cpu = dtp->dt_conf.cpus[i].cpu_id;
-
-		agp->dtat_cpu_buf[cpu] = agp->dtat_buf + cpu * aggsz;
-	}
-
 	/* Create the aggregation hash. */
 	agh->dtah_size = DTRACE_AHASHSIZE;
 	agh->dtah_hash = dt_zalloc(dtp,
 				   agh->dtah_size * sizeof(dt_ahashent_t *));
 	if (agh->dtah_hash == NULL) {
-		dt_free(dtp, agp->dtat_cpu_buf);
 		dt_free(dtp, agp->dtat_buf);
 		return dt_set_errno(dtp, EDT_NOMEM);
 	}
@@ -1045,15 +1022,13 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
 		return 0;
 	*(int64_t *)agp->dtat_buf = 0;  /* clear the flag */
 	for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
-		int	cpu = dtp->dt_conf.cpus[i].cpu_id;
+		int		cpu = dtp->dt_conf.cpus[i].cpu_id;
+		uint32_t	key = 0;
 
-		/* Data for CPU 0 was populated, so skip it. */
-		if (cpu == 0)
+		if (dt_bpf_map_update_inner(dtp->dt_aggmap_fd, &cpu, &key,
+					    dtp->dt_aggregate.dtat_buf) == -1)
 			continue;
-
-		memcpy(agp->dtat_cpu_buf[cpu], agp->dtat_buf, aggsz);
 	}
-	dt_bpf_map_update(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
 
 	return 0;
 }
@@ -1820,6 +1795,5 @@ dt_aggregate_destroy(dtrace_hdl_t *dtp)
 		hash->dtah_size = 0;
 	}
 
-	dt_free(dtp, agp->dtat_cpu_buf);
 	dt_free(dtp, agp->dtat_buf);
 }
diff --git a/libdtrace/dt_bpf.c b/libdtrace/dt_bpf.c
index 66c76022..3207a416 100644
--- a/libdtrace/dt_bpf.c
+++ b/libdtrace/dt_bpf.c
@@ -469,19 +469,37 @@ gmap_create_state(dtrace_hdl_t *dtp)
  * Create the 'aggs' BPF map.
  *
  * Aggregation data buffer map, associated with each CPU.  The map is
- * implemented as a global per-CPU map with a singleton element (key 0).
+ * implemented as a global array-of-maps indexed by CPU id.  The associated
+ * value is a map with a singleton element (key 0).
  */
 static int
 gmap_create_aggs(dtrace_hdl_t *dtp)
 {
 	size_t	sz = dt_idhash_datasize(dtp->dt_aggs);
+	int	i;
 
 	/* Only create the map if it is used. */
 	if (sz == 0)
 		return 0;
 
-	dtp->dt_aggmap_fd = create_gmap(dtp, "aggs", BPF_MAP_TYPE_PERCPU_ARRAY,
-					sizeof(uint32_t), sz, 1);
+	dtp->dt_aggmap_fd = create_gmap_of_maps(dtp, "aggs",
+						BPF_MAP_TYPE_ARRAY_OF_MAPS,
+						sizeof(uint32_t),
+						dtp->dt_conf.num_online_cpus,
+						BPF_MAP_TYPE_ARRAY,
+						sizeof(uint32_t), sz, 1);
+
+	for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
+		int	cpu = dtp->dt_conf.cpus[i].cpu_id;
+		int	fd = dt_bpf_map_create(BPF_MAP_TYPE_ARRAY, NULL,
+					       sizeof(uint32_t), sz, 1, 0);
+
+		if (fd < 0)
+			continue;
+
+		dt_bpf_map_update(dtp->dt_aggmap_fd, &cpu, &fd);
+	}
+
 
 	return dtp->dt_aggmap_fd;
 
diff --git a/libdtrace/dt_cg.c b/libdtrace/dt_cg.c
index 0963f202..b6ade5a9 100644
--- a/libdtrace/dt_cg.c
+++ b/libdtrace/dt_cg.c
@@ -52,11 +52,13 @@ dt_cg_tramp_prologue_act(dt_pcb_t *pcb, dt_activity_t act)
 {
 	dtrace_hdl_t	*dtp = pcb->pcb_hdl;
 	dt_irlist_t	*dlp = &pcb->pcb_ir;
+	dt_ident_t	*aggs = dt_dlib_get_map(dtp, "aggs");
 	dt_ident_t	*mem = dt_dlib_get_map(dtp, "mem");
 	dt_ident_t	*state = dt_dlib_get_map(dtp, "state");
 	dt_ident_t	*prid = dt_dlib_get_var(pcb->pcb_hdl, "PRID");
 	uint_t		lbl_exit = pcb->pcb_exitlbl;
 
+	assert(aggs != NULL);
 	assert(mem != NULL);
 	assert(state != NULL);
 	assert(prid != NULL);
@@ -206,13 +208,60 @@ dt_cg_tramp_prologue_act(dt_pcb_t *pcb, dt_activity_t act)
 	DT_CG_STORE_MAP_PTR("strtab", DCTX_STRTAB);
 	if (dtp->dt_options[DTRACEOPT_SCRATCHSIZE] > 0)
 		DT_CG_STORE_MAP_PTR("scratchmem", DCTX_SCRATCHMEM);
-	if (dt_idhash_datasize(dtp->dt_aggs) > 0)
-		DT_CG_STORE_MAP_PTR("aggs", DCTX_AGG);
 	if (dt_idhash_datasize(dtp->dt_globals) > 0)
 		DT_CG_STORE_MAP_PTR("gvars", DCTX_GVARS);
 	if (dtp->dt_maxlvaralloc > 0)
 		DT_CG_STORE_MAP_PTR("lvars", DCTX_LVARS);
 #undef DT_CG_STORE_MAP_PTR
+
+	/*
+	 * Aggregation data is stored in a COU-specific BPF map.  Populate
+	 * dctx->agg with the map for the current CPU.
+	 *
+	 *	key = bpf_get_smp_processor_id()
+	 *				// call bpf_get_smp_processor_id
+	 *				//     (%r1 ... %r5 clobbered)
+	 *				//     (%r0 = 'mem' BPF map value)
+	 *				// stw [%r9 + DCTX_AGG], %r0
+	 *	rc = bpf_map_lookup_elem(&aggs, &key);
+	 *				// lddw %r1, &aggs
+	 *				// mov %r2, %r9
+	 *				// add %r2, DCTX_AGG
+	 *				// call bpf_map_lookup_elem
+	 *				//     (%r1 ... %r5 clobbered)
+	 *				//     (%r0 = 'mem' BPF map value)
+	 *	if (rc == 0)		// jeq %r0, 0, lbl_exit
+	 *		goto exit;
+	 *
+	 *	key = 0;		// stw [%r9 + DCTX_AGG], 0
+	 *	rc = bpf_map_lookup_elem(rc, &key);
+	 *				// mov %r1, %r0
+	 *				// mov %r2, %r9
+	 *				// add %r2, DCTX_AGG
+	 *				// call bpf_map_lookup_elem
+	 *				//     (%r1 ... %r5 clobbered)
+	 *				//     (%r0 = 'mem' BPF map value)
+	 *	if (rc == 0)		// jeq %r0, 0, lbl_exit
+	 *		goto exit;
+	 *
+	 *	dctx.aggs = rc;		// stdw [%r9 + offset], %r0
+	 */
+	if (dt_idhash_datasize(dtp->dt_aggs) > 0) {
+		emit(dlp,  BPF_CALL_HELPER(BPF_FUNC_get_smp_processor_id));
+		emit(dlp,  BPF_STORE(BPF_DW, BPF_REG_9, DCTX_AGG, BPF_REG_0));
+		dt_cg_xsetx(dlp, aggs, DT_LBL_NONE, BPF_REG_1, aggs->di_id);
+		emit(dlp,  BPF_MOV_REG(BPF_REG_2, BPF_REG_9));
+		emit(dlp,  BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, DCTX_AGG));
+		emit(dlp,  BPF_CALL_HELPER(BPF_FUNC_map_lookup_elem));
+		emit(dlp,  BPF_BRANCH_IMM(BPF_JEQ, BPF_REG_0, 0, lbl_exit));
+		emit(dlp,  BPF_STORE_IMM(BPF_DW, BPF_REG_9, DCTX_AGG, 0));
+		emit(dlp,  BPF_MOV_REG(BPF_REG_1, BPF_REG_0));
+		emit(dlp,  BPF_MOV_REG(BPF_REG_2, BPF_REG_9));
+		emit(dlp,  BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, DCTX_AGG));
+		emit(dlp,  BPF_CALL_HELPER(BPF_FUNC_map_lookup_elem));
+		emit(dlp,  BPF_BRANCH_IMM(BPF_JEQ, BPF_REG_0, 0, lbl_exit));
+		emit(dlp,  BPF_STORE(BPF_DW, BPF_REG_9, DCTX_AGG, BPF_REG_0));
+	}
 }
 
 void
diff --git a/libdtrace/dt_impl.h b/libdtrace/dt_impl.h
index 85a1e7c9..e9b949ca 100644
--- a/libdtrace/dt_impl.h
+++ b/libdtrace/dt_impl.h
@@ -209,7 +209,6 @@ typedef struct dt_tstring {
 } dt_tstring_t;
 
 typedef struct dt_aggregate {
-	char **dtat_cpu_buf;		/* per-CPU agg snapshot buffers */
 	char *dtat_buf;			/* aggregation snapshot buffer */
 	int dtat_flags;			/* aggregate flags */
 	dt_ahash_t dtat_hash;		/* aggregate hash table */
-- 
2.34.1




More information about the DTrace-devel mailing list