[DTrace-devel] [PATCH v2 1/4] Switch from per-CPU latches to per-CPU/per-agg latches

eugene.loh at oracle.com eugene.loh at oracle.com
Mon Jan 11 11:58:42 PST 2021


From: Eugene Loh <eugene.loh at oracle.com>

Change the latch sequence number to be per-aggregation, not only per-CPU.
This way, one can see whether an aggregation has any data or not.  (Some
aggregations, like count(), already inherently have such information, but
we need to handle the general case.)

This change has the added advantage of reducing producer-consumer conflicts.
E.g., imagine that the producer is firing very often, updating different
aggregations, while the consuming is trying to read a single, large
aggregation (e.g., some *quantize aggregation).  A single latch sequence
number for all aggregations would keep updating, causing the consumer to
keep rereading an aggregation unnecessarily.

Also, when taking snapshots of aggregations, copy only a single copy of
the aggregation data.  Neither the latch sequence number nor the second
copy are needed.

This also solves the problem that aggregations that appear in a D script
were being printed even if the probes that use them never fire.

Introduce a test to catch the printing of unused aggregations.

https://github.com/oracle/dtrace-utils/issues/3
Signed-off-by: Eugene Loh <eugene.loh at oracle.com>
---
 libdtrace/dt_aggregate.c       | 59 ++++++++++++++++------------------
 libdtrace/dt_bpf.c             |  8 ++---
 libdtrace/dt_cg.c              | 53 +++++++++++++++---------------
 libdtrace/dt_map.c             |  7 +++-
 test/unittest/aggs/tst.empty.d | 29 +++++++++++++++++
 test/unittest/aggs/tst.empty.r |  2 ++
 6 files changed, 92 insertions(+), 66 deletions(-)
 create mode 100644 test/unittest/aggs/tst.empty.d
 create mode 100644 test/unittest/aggs/tst.empty.r

diff --git a/libdtrace/dt_aggregate.c b/libdtrace/dt_aggregate.c
index 972c19a3..389ede8c 100644
--- a/libdtrace/dt_aggregate.c
+++ b/libdtrace/dt_aggregate.c
@@ -422,19 +422,30 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
 	size_t			ndx = hval % agh->dtah_size;
 	int			rval;
 	uint_t			i;
+	int64_t			*src;
+	int			realsz;
 
 	rval = dt_aggid_lookup(st->dtp, aid->di_id, &agg);
 	if (rval != 0)
 		return rval;
 
+	/* check latch sequence number to see if there is any data */
+	src = (int64_t *)(st->buf + aid->di_offset);
+	if (*src == 0)
+		return 0;
+	src++;
+
+	/* real size excludes latch sequence number and second data copy */
+	realsz = (aid->di_size - sizeof(uint64_t)) / 2;
+
 	/* See if we already have an entry for this aggregation. */
 	for (h = agh->dtah_hash[ndx]; h != NULL; h = h->dtahe_next) {
-		int64_t	*src, *dst, *cpu_dst;
+		int64_t	*dst, *cpu_dst;
 		uint_t	cnt;
 
 		if (h->dtahe_hval != hval)
 			continue;
-		if (h->dtahe_size != aid->di_size)
+		if (h->dtahe_size != realsz)
 			continue;
 
 		/* Entry found - process the data. */
@@ -445,7 +456,6 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
 				: NULL;
 
 accumulate:
-		src = (int64_t *)(st->buf + aid->di_offset);
 		switch (((dt_ident_t *)aid->di_iarg)->di_id) {
 		case DT_AGG_MAX:
 			if (*src > *dst)
@@ -458,7 +468,7 @@ accumulate:
 
 			break;
 		default:
-			for (i = 0, cnt = aid->di_size / sizeof(int64_t);
+			for (i = 0, cnt = realsz / sizeof(int64_t);
 			     i < cnt; i++, dst++, src++)
 				*dst += *src;
 		}
@@ -480,20 +490,20 @@ accumulate:
 		return dt_set_errno(st->dtp, EDT_NOMEM);
 
 	agd = &h->dtahe_data;
-	agd->dtada_data = dt_alloc(st->dtp, aid->di_size);
+	agd->dtada_data = dt_alloc(st->dtp, realsz);
 	if (agd->dtada_data == NULL) {
 		dt_free(st->dtp, h);
 		return dt_set_errno(st->dtp, EDT_NOMEM);
 	}
 
-	memcpy(agd->dtada_data, st->buf + aid->di_offset, aid->di_size);
-	agd->dtada_size = aid->di_size;
+	memcpy(agd->dtada_data, src, realsz);
+	agd->dtada_size = realsz;
 	agd->dtada_desc = agg;
 	agd->dtada_hdl = st->dtp;
 	agd->dtada_normal = 1;
 
 	h->dtahe_hval = hval;
-	h->dtahe_size = aid->di_size;
+	h->dtahe_size = realsz;
 
 	if (st->agp->dtat_flags & DTRACE_A_PERCPU) {
 		char	**percpu = dt_calloc(st->dtp,
@@ -508,7 +518,7 @@ accumulate:
 		}
 
 		for (i = 0; i <= st->dtp->dt_conf.max_cpuid; i++) {
-			percpu[i] = dt_zalloc(st->dtp, aid->di_size);
+			percpu[i] = dt_zalloc(st->dtp, realsz);
 			if (percpu[i] == NULL) {
 				while (--i >= 0)
 					dt_free(st->dtp, percpu[i]);
@@ -519,7 +529,7 @@ accumulate:
 			}
 		}
 
-		memcpy(percpu[st->cpu], st->buf + aid->di_offset, aid->di_size);
+		memcpy(percpu[st->cpu], src, realsz);
 		agd->dtada_percpu = percpu;
 	}
 
@@ -544,21 +554,11 @@ dt_aggregate_snap_cpu(dtrace_hdl_t *dtp, processorid_t cpu)
 {
 	dt_aggregate_t	*agp = &dtp->dt_aggregate;
 	char		*buf = agp->dtat_cpu_buf[cpu];
-	uint64_t	*seq = (uint64_t *)buf;
 	dt_snapstate_t	st;
 
-	/* Nothing to be done? */
-	if (*seq == 0)
-		return 0;
-
-	/*
-	 * Process all static aggregation identifiers in the CPU buffer.  We
-	 * skip over the latch sequence number because from this point on we
-	 * are simply processing data.
-	 */
 	st.dtp = dtp;
 	st.cpu = cpu;
-	st.buf = buf + sizeof(*seq);
+	st.buf = buf;
 	st.agp = agp;
 
 	return dt_idhash_iter(dtp->dt_aggs,
@@ -960,8 +960,8 @@ dt_aggregate_bundlecmp(const void *lhs, const void *rhs)
  * set for min(), so that any other value fed to the functions will register
  * properly.
  *
- * The first value in the buffer is used as a flag to indicate whether an
- * initial value was stored in the buffer.
+ * The latch sequence number of the first aggregation is used as a flag to
+ * indicate whether an initial value was stored for any aggregation.
  */
 static int
 init_minmax(dt_idhash_t *dhp, dt_ident_t *aid, char *buf)
@@ -983,9 +983,10 @@ init_minmax(dt_idhash_t *dhp, dt_ident_t *aid, char *buf)
 	/* Indicate that we are setting initial values. */
 	*(int64_t *)buf = 1;
 
-	ptr = (int64_t *)(buf + sizeof(int64_t) + aid->di_offset);
-	ptr[0] = value;
+	/* skip ptr[0], it is the latch sequence number */
+	ptr = (int64_t *)(buf + aid->di_offset);
 	ptr[1] = value;
+	ptr[2] = value;
 
 	return 0;
 }
@@ -1007,12 +1008,7 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
 	 * Allocate a buffer to hold the aggregation data for all possible
 	 * CPUs, and initialize the per-CPU data pointers for CPUs that are
 	 * currently enabled.
-	 *
-	 * The size of the aggregation data is to be increased by the size of
-	 * the latch sequence number.  That yields the actual size of the data
-	 * allocated per CPU.
 	 */
-	aggsz += sizeof(uint64_t);
 	agp->dtat_buf = dt_zalloc(dtp, dtp->dt_conf.num_possible_cpus * aggsz);
 	if (agp->dtat_buf == NULL)
 		return dt_set_errno(dtp, EDT_NOMEM);
@@ -1044,7 +1040,7 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
 	dt_idhash_iter(dtp->dt_aggs, (dt_idhash_f *) init_minmax,
 		       agp->dtat_buf);
 	if (*(int64_t *)agp->dtat_buf != 0) {
-		*(int64_t *)agp->dtat_buf = 0;	/* clear the flag */
+		*(int64_t *)agp->dtat_buf = 0;  /* clear the flag */
 
 		for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
 			int	cpu = dtp->dt_conf.cpus[i].cpu_id;
@@ -1055,7 +1051,6 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
 
 			memcpy(agp->dtat_cpu_buf[cpu], agp->dtat_buf, aggsz);
 		}
-
 		dt_bpf_map_update(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
 	}
 
diff --git a/libdtrace/dt_bpf.c b/libdtrace/dt_bpf.c
index c6102f15..1705c990 100644
--- a/libdtrace/dt_bpf.c
+++ b/libdtrace/dt_bpf.c
@@ -223,16 +223,12 @@ dt_bpf_gmap_create(dtrace_hdl_t *dtp)
 		return -1;		/* dt_errno is set for us */
 
 	/*
-	 * If there is aggregation data to be collected, we need to create the
-	 * 'aggs' BPF map, and account for a uint64_t in the map value size to
-	 * hold a latch sequence number (seq) for concurrent access to the
-	 * data.
+	 * Check if there is aggregation data to be collected.
 	 */
 	if (aggsz > 0) {
 		dtp->dt_aggmap_fd = create_gmap(dtp, "aggs",
 						BPF_MAP_TYPE_PERCPU_ARRAY,
-						sizeof(uint32_t),
-						sizeof(uint64_t) + aggsz, 1);
+						sizeof(uint32_t), aggsz, 1);
 		if (dtp->dt_aggmap_fd == -1)
 			return -1;	/* dt_errno is set for us */
 	}
diff --git a/libdtrace/dt_cg.c b/libdtrace/dt_cg.c
index 8e440c8e..972b8483 100644
--- a/libdtrace/dt_cg.c
+++ b/libdtrace/dt_cg.c
@@ -3033,13 +3033,14 @@ dt_cg_node(dt_node_t *dnp, dt_irlist_t *dlp, dt_regset_t *drp)
  * identifier (if not set yet).
  *
  * We consume twice the required data size because of the odd/even data pair
- * mechanism to provide lockless, write-wait-free operation.
+ * mechanism to provide lockless, write-wait-free operation.  Additionally,
+ * we make room for a latch sequence number.
  */
 #define DT_CG_AGG_SET_STORAGE(aid, sz) \
 	do { \
 		if ((aid)->di_offset == -1) \
 			dt_ident_set_storage((aid), sizeof(uint64_t), \
-					     2 * (sz)); \
+					     sizeof(uint64_t) + 2 * (sz)); \
 	} while (0)
 
 /*
@@ -3047,7 +3048,7 @@ dt_cg_node(dt_node_t *dnp, dt_irlist_t *dlp, dt_regset_t *drp)
  * return a register that holds a pointer to the aggregation data to be
  * updated.
  *
- * We update the latch seqcount (first value in the aggregation buffer) to
+ * We update the latch seqcount (first value in the aggregation) to
  * signal that reads should be directed to the alternate copy of the data.  We
  * then determine the location of data for the given aggregation that can be
  * updated.  This value is stored in the register returned from this function.
@@ -3056,49 +3057,47 @@ static int
 dt_cg_agg_buf_prepare(dt_ident_t *aid, int size, dt_irlist_t *dlp,
 		      dt_regset_t *drp)
 {
-	int		rx, ry;
+	int		ragd, roff;
 
 	TRACE_REGSET("            Prep: Begin");
 
 	dt_regset_xalloc(drp, BPF_REG_0);
-	rx = dt_regset_alloc(drp);
-	ry = dt_regset_alloc(drp);
-	assert(rx != -1 && ry != -1);
+	ragd = dt_regset_alloc(drp);
+	roff = dt_regset_alloc(drp);
+	assert(ragd != -1 && roff != -1);
 
 	/*
-	 *				//     (%rX = register for agg ptr)
-	 *				//     (%rY = register for agg data)
 	 *	agd = dctx->agg;	// lddw %r0, [%fp + DT_STK_DCTX]
-	 *				// lddw %rX, [%r0 + DCTX_AGG]
+	 *				// lddw %ragd, [%r0 + DCTX_AGG]
+	 *	agd += aid->di_offset	// %ragd += aid->di_offset
 	 */
 	emit(dlp, BPF_LOAD(BPF_DW, BPF_REG_0, BPF_REG_FP, DT_STK_DCTX));
-	emit(dlp, BPF_LOAD(BPF_DW, rx, BPF_REG_0, DCTX_AGG));
+	emit(dlp, BPF_LOAD(BPF_DW, ragd, BPF_REG_0, DCTX_AGG));
+	emit(dlp, BPF_ALU64_IMM(BPF_ADD, ragd, aid->di_offset));
 
 	/*
-	 *	off = (*agd % 2) * size	// lddw %rY, [%rX + 0]
-	 *	      + aid->di_offset	// and %rY, 1
-	 *	      + sizeof(uint64_t);
-	 *				// mul %rY, size
-	 *				// add %rY, aid->di_offset +
-	 *				//		sizeof(uint64_t)
+	 *	off = (*agd & 1) * size	// lddw %roff, [%ragd + 0]
+	 *	      + sizeof(uint64_t);	// and %roff, 1
+	 *				// mul %roff, size
+	 *				// add %roff, sizeof(uint64_t)
 	 *	(*agd)++;		// mov %r0, 1
-	 *				// xadd [%rX + 0], %r0
-	 *	agd += off;		// add %rX, %rY
+	 *				// xadd [%ragd + 0], %r0
+	 *	agd += off;		// add %ragd, %roff
 	 */
-	emit(dlp, BPF_LOAD(BPF_DW, ry, rx, 0));
-	emit(dlp, BPF_ALU64_IMM(BPF_AND, ry, 1));
-	emit(dlp, BPF_ALU64_IMM(BPF_MUL, ry, size));
-	emit(dlp, BPF_ALU64_IMM(BPF_ADD, ry, aid->di_offset + sizeof(uint64_t)));
+	emit(dlp, BPF_LOAD(BPF_DW, roff, ragd, 0));
+	emit(dlp, BPF_ALU64_IMM(BPF_AND, roff, 1));
+	emit(dlp, BPF_ALU64_IMM(BPF_MUL, roff, size));
+	emit(dlp, BPF_ALU64_IMM(BPF_ADD, roff, sizeof(uint64_t)));
 	emit(dlp, BPF_MOV_IMM(BPF_REG_0, 1));
-	emit(dlp, BPF_XADD_REG(BPF_DW, rx, 0, BPF_REG_0));
-	emit(dlp, BPF_ALU64_REG(BPF_ADD, rx, ry));
+	emit(dlp, BPF_XADD_REG(BPF_DW, ragd, 0, BPF_REG_0));
+	emit(dlp, BPF_ALU64_REG(BPF_ADD, ragd, roff));
 
-	dt_regset_free(drp, ry);
+	dt_regset_free(drp, roff);
 	dt_regset_free(drp, BPF_REG_0);
 
 	TRACE_REGSET("            Prep: End  ");
 
-	return rx;
+	return ragd;
 }
 
 #define DT_CG_AGG_IMPL(aid, sz, dlp, drp, f, ...) \
diff --git a/libdtrace/dt_map.c b/libdtrace/dt_map.c
index 0c9495d7..cac295fa 100644
--- a/libdtrace/dt_map.c
+++ b/libdtrace/dt_map.c
@@ -274,11 +274,16 @@ dt_aggid_add(dtrace_hdl_t *dtp, const dt_ident_t *aid)
 	if (agg == NULL)
 		return dt_set_errno(dtp, EDT_NOMEM);
 
+	/*
+	 * Note the relationship between the aggregation storage
+	 * size (di_size) and the aggregation data size (dtagd_size):
+	 *     di_size = dtagd_size * (# agg copies) + (size of latch seq #)
+	 */
 	agg->dtagd_id = id;
 	agg->dtagd_name = aid->di_name;
 	agg->dtagd_sig = ((dt_idsig_t *)aid->di_data)->dis_auxinfo;
 	agg->dtagd_varid = aid->di_id;
-	agg->dtagd_size = aid->di_size / 2;
+	agg->dtagd_size = (aid->di_size - sizeof(uint64_t))/ 2;
 	agg->dtagd_nrecs = agg->dtagd_size / sizeof(uint64_t);
 
 	recs = dt_calloc(dtp, agg->dtagd_nrecs, sizeof(dtrace_recdesc_t));
diff --git a/test/unittest/aggs/tst.empty.d b/test/unittest/aggs/tst.empty.d
new file mode 100644
index 00000000..784e1dd9
--- /dev/null
+++ b/test/unittest/aggs/tst.empty.d
@@ -0,0 +1,29 @@
+/*
+ * Oracle Linux DTrace.
+ * Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
+ * Licensed under the Universal Permissive License v 1.0 as shown at
+ * http://oss.oracle.com/licenses/upl.
+ */
+
+/*
+ * ASSERTION: Empty aggregations are not printed
+ *
+ * SECTION: Aggregations/Aggregations
+ */
+
+#pragma D option quiet
+
+BEGIN
+{
+	i = 12345678;
+	@a = sum(i);
+	exit(0);
+}
+
+/* this probe should not fire */
+tick-1hour
+{
+	/* these aggregations should not be printed */
+	@b = min(i);
+	@c = max(i);
+}
diff --git a/test/unittest/aggs/tst.empty.r b/test/unittest/aggs/tst.empty.r
new file mode 100644
index 00000000..aab82b76
--- /dev/null
+++ b/test/unittest/aggs/tst.empty.r
@@ -0,0 +1,2 @@
+
+         12345678
-- 
2.18.4




More information about the DTrace-devel mailing list