[DTrace-devel] [PATCH v2 5/5] Use array-of-maps as storage for aggregations

Kris Van Hees kris.van.hees at oracle.com
Tue Aug 23 21:49:08 UTC 2022


In preparation for indexed aggregations and the clear() and trunc()
operations, the storage for aggregations is moving from a per-CPU
array map to an array of maps, indexed by CPU id.

The existing storage solution for aggregations stored all data in a
singleton map value, i.e. all CPUs were writing to their own portion
of a block of memory that the consumer retrieved in its entirety in
a single system call.

The new storage solution allocates a memory block for each CPU so
that data retrieval by the consumer can be done per CPU.  This sets
the stage for future development where the consumer may need to
update the aggregation buffers.

Signed-off-by: Kris Van Hees <kris.van.hees at oracle.com>
---
 libdtrace/dt_aggregate.c | 95 ++++++++++++++--------------------------
 libdtrace/dt_bpf.c       | 69 ++++++++++++++++++-----------
 libdtrace/dt_cg.c        | 53 +++++++++++++++++++++-
 libdtrace/dt_impl.h      |  1 -
 4 files changed, 129 insertions(+), 89 deletions(-)

diff --git a/libdtrace/dt_aggregate.c b/libdtrace/dt_aggregate.c
index 44896fd2..14d16da6 100644
--- a/libdtrace/dt_aggregate.c
+++ b/libdtrace/dt_aggregate.c
@@ -412,8 +412,6 @@ typedef void (*agg_cpu_f)(dt_ident_t *aid, int64_t *dst, int64_t *src,
 typedef struct dt_snapstate {
 	dtrace_hdl_t	*dtp;
 	processorid_t	cpu;
-	char		*buf;
-	dt_aggregate_t	*agp;
 } dt_snapstate_t;
 
 static void
@@ -444,7 +442,9 @@ dt_agg_one_agg(dt_ident_t *aid, int64_t *dst, int64_t *src, uint_t datasz)
 static int
 dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
 {
-	dt_ahash_t		*agh = &st->agp->dtat_hash;
+	dtrace_hdl_t		*dtp = st->dtp;
+	dt_aggregate_t		*agp = &dtp->dt_aggregate;
+	dt_ahash_t		*agh = &agp->dtat_hash;
 	dt_ahashent_t		*h;
 	dtrace_aggdesc_t	*agg;
 	dtrace_aggdata_t	*agd;
@@ -454,12 +454,12 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
 	uint_t			i, datasz;
 	int64_t			*src;
 
-	rval = dt_aggid_lookup(st->dtp, aid->di_id, &agg);
+	rval = dt_aggid_lookup(dtp, aid->di_id, &agg);
 	if (rval != 0)
 		return rval;
 
 	/* point to the data counter */
-	src = (int64_t *)(st->buf + aid->di_offset);
+	src = (int64_t *)(agp->dtat_buf + aid->di_offset);
 
 	/* skip it if data counter is 0 */
 	if (*src == 0)
@@ -487,46 +487,45 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
 	}
 
 	/* add it to the hash table */
-	h = dt_zalloc(st->dtp, sizeof(dt_ahashent_t));
+	h = dt_zalloc(dtp, sizeof(dt_ahashent_t));
 	if (h == NULL)
-		return dt_set_errno(st->dtp, EDT_NOMEM);
+		return dt_set_errno(dtp, EDT_NOMEM);
 
 	agd = &h->dtahe_data;
-	agd->dtada_data = dt_alloc(st->dtp, datasz);
+	agd->dtada_data = dt_alloc(dtp, datasz);
 	if (agd->dtada_data == NULL) {
-		dt_free(st->dtp, h);
-		return dt_set_errno(st->dtp, EDT_NOMEM);
+		dt_free(dtp, h);
+		return dt_set_errno(dtp, EDT_NOMEM);
 	}
 
 	memcpy(agd->dtada_data, src, datasz);
 	agd->dtada_size = datasz;
 	agd->dtada_desc = agg;
-	agd->dtada_hdl = st->dtp;
+	agd->dtada_hdl = dtp;
 
 	h->dtahe_hval = hval;
 	h->dtahe_size = datasz;
 
-	if (st->agp->dtat_flags & DTRACE_A_PERCPU) {
-		char	**percpu = dt_calloc(st->dtp,
-					     st->dtp->dt_conf.max_cpuid + 1,
+	if (agp->dtat_flags & DTRACE_A_PERCPU) {
+		char	**percpu = dt_calloc(dtp, dtp->dt_conf.max_cpuid + 1,
 					     sizeof(char *));
 
 		if (percpu == NULL) {
-			dt_free(st->dtp, agd->dtada_data);
-			dt_free(st->dtp, h);
+			dt_free(dtp, agd->dtada_data);
+			dt_free(dtp, h);
 
-			dt_set_errno(st->dtp, EDT_NOMEM);
+			dt_set_errno(dtp, EDT_NOMEM);
 		}
 
-		for (i = 0; i <= st->dtp->dt_conf.max_cpuid; i++) {
-			percpu[i] = dt_zalloc(st->dtp, datasz);
+		for (i = 0; i <= dtp->dt_conf.max_cpuid; i++) {
+			percpu[i] = dt_zalloc(dtp, datasz);
 			if (percpu[i] == NULL) {
 				while (--i >= 0)
-					dt_free(st->dtp, percpu[i]);
-				dt_free(st->dtp, agd->dtada_data);
-				dt_free(st->dtp, h);
+					dt_free(dtp, percpu[i]);
+				dt_free(dtp, agd->dtada_data);
+				dt_free(dtp, h);
 
-				dt_set_errno(st->dtp, EDT_NOMEM);
+				dt_set_errno(dtp, EDT_NOMEM);
 			}
 		}
 
@@ -553,14 +552,15 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
 static int
 dt_aggregate_snap_cpu(dtrace_hdl_t *dtp, processorid_t cpu)
 {
-	dt_aggregate_t	*agp = &dtp->dt_aggregate;
-	char		*buf = agp->dtat_cpu_buf[cpu];
 	dt_snapstate_t	st;
+	uint32_t	key = 0;
 
 	st.dtp = dtp;
 	st.cpu = cpu;
-	st.buf = buf;
-	st.agp = agp;
+
+	if (dt_bpf_map_lookup_inner(dtp->dt_aggmap_fd, &cpu, &key,
+				    dtp->dt_aggregate.dtat_buf) == -1)
+		return 0;
 
 	return dt_idhash_iter(dtp->dt_aggs,
 			      (dt_idhash_f *)dt_aggregate_snap_one, &st);
@@ -573,22 +573,17 @@ int
 dtrace_aggregate_snap(dtrace_hdl_t *dtp)
 {
 	dt_aggregate_t	*agp = &dtp->dt_aggregate;
-	uint32_t	key = 0;
 	int		i, rval;
 
 	/*
 	 * If we do not have a buffer initialized, we will not be processing
 	 * aggregations, so there is nothing to be done here.
 	 */
-	if (agp->dtat_cpu_buf == NULL)
+	if (agp->dtat_buf == NULL)
 		return 0;
 
 	dtrace_aggregate_clear(dtp);
 
-	rval = dt_bpf_map_lookup(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
-	if (rval != 0)
-		return dt_set_errno(dtp, -rval);
-
 	for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
 		rval = dt_aggregate_snap_cpu(dtp, dtp->dt_conf.cpus[i].cpu_id);
 		if (rval != 0)
@@ -999,41 +994,22 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
 	dt_aggregate_t	*agp = &dtp->dt_aggregate;
 	dt_ahash_t	*agh = &agp->dtat_hash;
 	int		aggsz, i;
-	uint32_t	key = 0;
 
 	/* If there are no aggregations there is nothing to do. */
 	aggsz = dt_idhash_datasize(dtp->dt_aggs);
 	if (aggsz <= 0)
 		return 0;
 
-	/*
-	 * Allocate a buffer to hold the aggregation data for all possible
-	 * CPUs, and initialize the per-CPU data pointers for CPUs that are
-	 * currently enabled.
-	 */
-	agp->dtat_buf = dt_zalloc(dtp, dtp->dt_conf.num_possible_cpus * aggsz);
+	/* Allocate a buffer to hold the aggregation data for a CPU.  */
+	agp->dtat_buf = dt_zalloc(dtp, aggsz);
 	if (agp->dtat_buf == NULL)
 		return dt_set_errno(dtp, EDT_NOMEM);
 
-	agp->dtat_cpu_buf = dt_calloc(dtp, dtp->dt_conf.max_cpuid + 1,
-				      sizeof(char *));
-	if (agp->dtat_cpu_buf == NULL) {
-		dt_free(dtp, agp->dtat_buf);
-		return dt_set_errno(dtp, EDT_NOMEM);
-	}
-
-	for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
-		int	cpu = dtp->dt_conf.cpus[i].cpu_id;
-
-		agp->dtat_cpu_buf[cpu] = agp->dtat_buf + cpu * aggsz;
-	}
-
 	/* Create the aggregation hash. */
 	agh->dtah_size = DTRACE_AHASHSIZE;
 	agh->dtah_hash = dt_zalloc(dtp,
 				   agh->dtah_size * sizeof(dt_ahashent_t *));
 	if (agh->dtah_hash == NULL) {
-		dt_free(dtp, agp->dtat_cpu_buf);
 		dt_free(dtp, agp->dtat_buf);
 		return dt_set_errno(dtp, EDT_NOMEM);
 	}
@@ -1045,15 +1021,13 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
 		return 0;
 	*(int64_t *)agp->dtat_buf = 0;  /* clear the flag */
 	for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
-		int	cpu = dtp->dt_conf.cpus[i].cpu_id;
+		int		cpu = dtp->dt_conf.cpus[i].cpu_id;
+		uint32_t	key = 0;
 
-		/* Data for CPU 0 was populated, so skip it. */
-		if (cpu == 0)
+		if (dt_bpf_map_update_inner(dtp->dt_aggmap_fd, &cpu, &key,
+					    dtp->dt_aggregate.dtat_buf) == -1)
 			continue;
-
-		memcpy(agp->dtat_cpu_buf[cpu], agp->dtat_buf, aggsz);
 	}
-	dt_bpf_map_update(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
 
 	return 0;
 }
@@ -1820,6 +1794,5 @@ dt_aggregate_destroy(dtrace_hdl_t *dtp)
 		hash->dtah_size = 0;
 	}
 
-	dt_free(dtp, agp->dtat_cpu_buf);
 	dt_free(dtp, agp->dtat_buf);
 }
diff --git a/libdtrace/dt_bpf.c b/libdtrace/dt_bpf.c
index 7dea7179..a31ddf95 100644
--- a/libdtrace/dt_bpf.c
+++ b/libdtrace/dt_bpf.c
@@ -351,6 +351,22 @@ dt_bpf_init_helpers(dtrace_hdl_t *dtp)
 #undef BPF_HELPER_MAP
 }
 
+static int
+map_create_error(dtrace_hdl_t *dtp, const char *name, int err)
+{
+	char	msg[64];
+
+	snprintf(msg, sizeof(msg),
+		 "failed to create BPF map '%s'", name);
+
+	if (err == E2BIG)
+		return dt_bpf_error(dtp, "%s: Too big\n", msg);
+	if (err == EPERM)
+		return dt_bpf_lockmem_error(dtp, msg);
+
+	return dt_bpf_error(dtp, "%s: %s\n", msg, strerror(err));
+}
+
 static int
 create_gmap(dtrace_hdl_t *dtp, const char *name, enum bpf_map_type type,
 	    size_t ksz, size_t vsz, size_t size)
@@ -369,17 +385,8 @@ create_gmap(dtrace_hdl_t *dtp, const char *name, enum bpf_map_type type,
 		err = errno;
 	}
 
-	if (fd < 0) {
-		char msg[64];
-
-		snprintf(msg, sizeof(msg),
-			 "failed to create BPF map '%s'", name);
-		if (err == E2BIG)
-			return dt_bpf_error(dtp, "%s: Too big\n", msg);
-		if (err == EPERM)
-			return dt_bpf_lockmem_error(dtp, msg);
-		return dt_bpf_error(dtp, "%s: %s\n", msg, strerror(err));
-	}
+	if (fd < 0)
+		return map_create_error(dtp, name, err);
 
 	dt_dprintf("BPF map '%s' is FD %d\n", name, fd);
 
@@ -421,17 +428,8 @@ create_gmap_of_maps(dtrace_hdl_t *dtp, const char *name,
 		err = errno;
 	}
 
-	if (fd < 0) {
-		char msg[64];
-
-		snprintf(msg, sizeof(msg),
-			 "failed to create BPF map '%s'", name);
-		if (err == E2BIG)
-			return dt_bpf_error(dtp, "%s: Too big\n", msg);
-		if (err == EPERM)
-			return dt_bpf_lockmem_error(dtp, msg);
-		return dt_bpf_error(dtp, "%s: %s\n", msg, strerror(err));
-	}
+	if (fd < 0)
+		return map_create_error(dtp, name, err);
 
 	dt_dprintf("BPF map '%s' is FD %d\n", name, fd);
 
@@ -470,19 +468,40 @@ gmap_create_state(dtrace_hdl_t *dtp)
  * Create the 'aggs' BPF map.
  *
  * Aggregation data buffer map, associated with each CPU.  The map is
- * implemented as a global per-CPU map with a singleton element (key 0).
+ * implemented as a global array-of-maps indexed by CPU id.  The associated
+ * value is a map with a singleton element (key 0).
  */
 static int
 gmap_create_aggs(dtrace_hdl_t *dtp)
 {
 	size_t	sz = dt_idhash_datasize(dtp->dt_aggs);
+	int	i;
 
 	/* Only create the map if it is used. */
 	if (sz == 0)
 		return 0;
 
-	dtp->dt_aggmap_fd = create_gmap(dtp, "aggs", BPF_MAP_TYPE_PERCPU_ARRAY,
-					sizeof(uint32_t), sz, 1);
+	dtp->dt_aggmap_fd = create_gmap_of_maps(dtp, "aggs",
+						BPF_MAP_TYPE_ARRAY_OF_MAPS,
+						sizeof(uint32_t),
+						dtp->dt_conf.num_online_cpus,
+						BPF_MAP_TYPE_ARRAY,
+						sizeof(uint32_t), sz, 1);
+
+	for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
+		int	cpu = dtp->dt_conf.cpus[i].cpu_id;
+		char	name[16];
+		int	fd;
+
+		snprintf(name, 16, "aggs_%d", cpu);
+		fd = dt_bpf_map_create(BPF_MAP_TYPE_ARRAY, name,
+				       sizeof(uint32_t), sz, 1, 0);
+		if (fd < 0)
+			return map_create_error(dtp, name, errno);
+
+		dt_bpf_map_update(dtp->dt_aggmap_fd, &cpu, &fd);
+	}
+
 
 	return dtp->dt_aggmap_fd;
 }
diff --git a/libdtrace/dt_cg.c b/libdtrace/dt_cg.c
index 0963f202..157f4861 100644
--- a/libdtrace/dt_cg.c
+++ b/libdtrace/dt_cg.c
@@ -52,11 +52,13 @@ dt_cg_tramp_prologue_act(dt_pcb_t *pcb, dt_activity_t act)
 {
 	dtrace_hdl_t	*dtp = pcb->pcb_hdl;
 	dt_irlist_t	*dlp = &pcb->pcb_ir;
+	dt_ident_t	*aggs = dt_dlib_get_map(dtp, "aggs");
 	dt_ident_t	*mem = dt_dlib_get_map(dtp, "mem");
 	dt_ident_t	*state = dt_dlib_get_map(dtp, "state");
 	dt_ident_t	*prid = dt_dlib_get_var(pcb->pcb_hdl, "PRID");
 	uint_t		lbl_exit = pcb->pcb_exitlbl;
 
+	assert(aggs != NULL);
 	assert(mem != NULL);
 	assert(state != NULL);
 	assert(prid != NULL);
@@ -206,13 +208,60 @@ dt_cg_tramp_prologue_act(dt_pcb_t *pcb, dt_activity_t act)
 	DT_CG_STORE_MAP_PTR("strtab", DCTX_STRTAB);
 	if (dtp->dt_options[DTRACEOPT_SCRATCHSIZE] > 0)
 		DT_CG_STORE_MAP_PTR("scratchmem", DCTX_SCRATCHMEM);
-	if (dt_idhash_datasize(dtp->dt_aggs) > 0)
-		DT_CG_STORE_MAP_PTR("aggs", DCTX_AGG);
 	if (dt_idhash_datasize(dtp->dt_globals) > 0)
 		DT_CG_STORE_MAP_PTR("gvars", DCTX_GVARS);
 	if (dtp->dt_maxlvaralloc > 0)
 		DT_CG_STORE_MAP_PTR("lvars", DCTX_LVARS);
 #undef DT_CG_STORE_MAP_PTR
+
+	/*
+	 * Aggregation data is stored in a CPU-specific BPF map.  Populate
+	 * dctx->agg with the map for the current CPU.
+	 *
+	 *	key = bpf_get_smp_processor_id()
+	 *				// call bpf_get_smp_processor_id
+	 *				//     (%r1 ... %r5 clobbered)
+	 *				//     (%r0 = 'aggs' BPF map value)
+	 *				// stw [%r9 + DCTX_AGG], %r0
+	 *	rc = bpf_map_lookup_elem(&aggs, &key);
+	 *				// lddw %r1, &aggs
+	 *				// mov %r2, %r9
+	 *				// add %r2, DCTX_AGG
+	 *				// call bpf_map_lookup_elem
+	 *				//     (%r1 ... %r5 clobbered)
+	 *				//     (%r0 = 'aggs' BPF map value)
+	 *	if (rc == 0)		// jeq %r0, 0, lbl_exit
+	 *		goto exit;
+	 *
+	 *	key = 0;		// stw [%r9 + DCTX_AGG], 0
+	 *	rc = bpf_map_lookup_elem(rc, &key);
+	 *				// mov %r1, %r0
+	 *				// mov %r2, %r9
+	 *				// add %r2, DCTX_AGG
+	 *				// call bpf_map_lookup_elem
+	 *				//     (%r1 ... %r5 clobbered)
+	 *				//     (%r0 = aggs[cpuid] BPF map value)
+	 *	if (rc == 0)		// jeq %r0, 0, lbl_exit
+	 *		goto exit;
+	 *
+	 *	dctx.aggs = rc;		// stdw [%r9 + offset], %r0
+	 */
+	if (dt_idhash_datasize(dtp->dt_aggs) > 0) {
+		emit(dlp,  BPF_CALL_HELPER(BPF_FUNC_get_smp_processor_id));
+		emit(dlp,  BPF_STORE(BPF_DW, BPF_REG_9, DCTX_AGG, BPF_REG_0));
+		dt_cg_xsetx(dlp, aggs, DT_LBL_NONE, BPF_REG_1, aggs->di_id);
+		emit(dlp,  BPF_MOV_REG(BPF_REG_2, BPF_REG_9));
+		emit(dlp,  BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, DCTX_AGG));
+		emit(dlp,  BPF_CALL_HELPER(BPF_FUNC_map_lookup_elem));
+		emit(dlp,  BPF_BRANCH_IMM(BPF_JEQ, BPF_REG_0, 0, lbl_exit));
+		emit(dlp,  BPF_STORE_IMM(BPF_DW, BPF_REG_9, DCTX_AGG, 0));
+		emit(dlp,  BPF_MOV_REG(BPF_REG_1, BPF_REG_0));
+		emit(dlp,  BPF_MOV_REG(BPF_REG_2, BPF_REG_9));
+		emit(dlp,  BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, DCTX_AGG));
+		emit(dlp,  BPF_CALL_HELPER(BPF_FUNC_map_lookup_elem));
+		emit(dlp,  BPF_BRANCH_IMM(BPF_JEQ, BPF_REG_0, 0, lbl_exit));
+		emit(dlp,  BPF_STORE(BPF_DW, BPF_REG_9, DCTX_AGG, BPF_REG_0));
+	}
 }
 
 void
diff --git a/libdtrace/dt_impl.h b/libdtrace/dt_impl.h
index 85a1e7c9..e9b949ca 100644
--- a/libdtrace/dt_impl.h
+++ b/libdtrace/dt_impl.h
@@ -209,7 +209,6 @@ typedef struct dt_tstring {
 } dt_tstring_t;
 
 typedef struct dt_aggregate {
-	char **dtat_cpu_buf;		/* per-CPU agg snapshot buffers */
 	char *dtat_buf;			/* aggregation snapshot buffer */
 	int dtat_flags;			/* aggregate flags */
 	dt_ahash_t dtat_hash;		/* aggregate hash table */
-- 
2.34.1




More information about the DTrace-devel mailing list