[DTrace-devel] [PATCH 3/4] Introduce mechanism to turn off dual-copy aggregation code

eugene.loh at oracle.com eugene.loh at oracle.com
Wed Jan 6 17:39:12 PST 2021


From: Eugene Loh <eugene.loh at oracle.com>

To provide the producer a lock-free write mechanism for aggregations,
we have two copies of aggregations.  The producer updates both, in
turn, and the consumer reads the one not being written, as indicated
by a latch sequence number.

Currently, however, we are using older kernels that do not allow us
to mmap BPF maps.  The consumer is forced to lookup the entire BPF
aggregation map at once, making the dual-copy mechanism useless.
Meanwhile, the dual copies are consuming valuable BPF map space.

Introduce a preprocessor macro DT_AGG_NUM_COPIES that can be set to
turn off the dual-copy mechanism for now while retaining the code for
when we want to restore the mechanism.

For example, the D script
    BEGIN {x = 100; @ = lquantize(x, 0, 2045, 1); exit(0) }
runs, but increasing to 2046 fails.  With DT_AGG_NUM_COPIES=1,
one can go up to 4093.

The latch sequence number is preserved since it is used to see
which aggregations have data and should be reported.

Signed-off-by: Eugene Loh <eugene.loh at oracle.com>
---
 libdtrace/dt_aggregate.c |  6 ++-
 libdtrace/dt_cg.c        | 93 ++++++++++++++++++++++++++--------------
 libdtrace/dt_impl.h      | 14 ++++++
 libdtrace/dt_map.c       |  4 +-
 4 files changed, 82 insertions(+), 35 deletions(-)

diff --git a/libdtrace/dt_aggregate.c b/libdtrace/dt_aggregate.c
index ba32d9d5..7e325b2e 100644
--- a/libdtrace/dt_aggregate.c
+++ b/libdtrace/dt_aggregate.c
@@ -436,8 +436,8 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
 	if (*src == 0 && !firstcpu)
 		return 0;
 
-	/* real size excludes latch sequence number and second data copy */
-	realsz = (aid->di_size - sizeof(uint64_t)) / 2;
+	/* real size excludes latch sequence number and second data copy, if any */
+	realsz = (aid->di_size - sizeof(uint64_t)) / DT_AGG_NUM_COPIES;
 
 	/* See if we already have an entry for this aggregation. */
 	for (h = agh->dtah_hash[ndx]; h != NULL; h = h->dtahe_next) {
@@ -996,7 +996,9 @@ init_minmax(dt_idhash_t *dhp, dt_ident_t *aid, char *buf)
 	/* skip ptr[0], it is the latch sequence number */
 	ptr = (int64_t *)(buf + aid->di_offset);
 	ptr[1] = value;
+#if DT_AGG_NUM_COPIES == 2
 	ptr[2] = value;
+#endif
 
 	return 0;
 }
diff --git a/libdtrace/dt_cg.c b/libdtrace/dt_cg.c
index 3173f128..6e45d1a8 100644
--- a/libdtrace/dt_cg.c
+++ b/libdtrace/dt_cg.c
@@ -3032,17 +3032,65 @@ dt_cg_node(dt_node_t *dnp, dt_irlist_t *dlp, dt_regset_t *drp)
  * Macro to set the storage data (offset and size) for the aggregation
  * identifier (if not set yet).
  *
- * We consume twice the required data size because of the odd/even data pair
- * mechanism to provide lockless, write-wait-free operation.  Additionally,
- * we make room for a latch sequence number.
+ * We make room for a latch sequence number of sizeof(uint64_t).
+ *
+ * If DT_AGG_NUM_COPIES==2, we consume twice the required data size for
+ * a dual-copy mechanism to provide lockless, write-wait-free operation.
  */
 #define DT_CG_AGG_SET_STORAGE(aid, sz) \
 	do { \
 		if ((aid)->di_offset == -1) \
 			dt_ident_set_storage((aid), sizeof(uint64_t), \
-					     sizeof(uint64_t) + 2 * (sz)); \
+					     sizeof(uint64_t) + \
+					     DT_AGG_NUM_COPIES * (sz)); \
 	} while (0)
 
+#if DT_AGG_NUM_COPIES == 1
+/*
+ * Return a register that holds a pointer to the aggregation data to be
+ * updated.
+ *
+ * We update the latch seqcount (first value in the aggregation) to
+ * signal that the aggregation has data.  The location of data for the
+ * given aggregation is stored in the register returned from this function.
+ */
+static int
+dt_cg_agg_buf_prepare(dt_ident_t *aid, int size, dt_irlist_t *dlp,
+		      dt_regset_t *drp)
+{
+	int		rptr;
+
+	TRACE_REGSET("            Prep: Begin");
+
+	dt_regset_xalloc(drp, BPF_REG_0);
+	rptr = dt_regset_alloc(drp);
+	assert(rptr != -1);
+
+	/*
+	 *	ptr = dctx->agg;	// lddw %rptr, [%fp + DT_STK_DCTX]
+	 *				// lddw %rptr, [%rptr + DCTX_AGG]
+	 *	ptr += aid->di_offset;	// add %rptr, aid->di_offset
+	 */
+	emit(dlp, BPF_LOAD(BPF_DW, rptr, BPF_REG_FP, DT_STK_DCTX));
+	emit(dlp, BPF_LOAD(BPF_DW, rptr, rptr, DCTX_AGG));
+	emit(dlp, BPF_ALU64_IMM(BPF_ADD, rptr, aid->di_offset));
+
+	/*
+	 *	(*ptr)++;		// mov %r0, 1
+	 *				// xadd [%rptr + 0], %r0
+	 *	ptr += off;		// add %rptr, %roff
+	 */
+	emit(dlp, BPF_MOV_IMM(BPF_REG_0, 1));
+	emit(dlp, BPF_XADD_REG(BPF_DW, rptr, 0, BPF_REG_0));
+	emit(dlp, BPF_ALU64_IMM(BPF_ADD, rptr, sizeof(uint64_t)));
+
+	dt_regset_free(drp, BPF_REG_0);
+
+	TRACE_REGSET("            Prep: End  ");
+
+	return rptr;
+}
+#else
 /*
  * Prepare the aggregation buffer for updating for a specific aggregation, and
  * return a register that holds a pointer to the aggregation data to be
@@ -3099,41 +3147,24 @@ dt_cg_agg_buf_prepare(dt_ident_t *aid, int size, dt_irlist_t *dlp,
 
 	return rptr;
 }
+#endif
 
 #define DT_CG_AGG_IMPL(aid, sz, dlp, drp, f, ...) \
 	do {								\
-		int	dreg;						\
+		int	i, dreg;					\
 									\
 		TRACE_REGSET("        Upd: Begin ");			\
 									\
-		/*							\
-		 * Redirect reading to secondary copy.  The register	\
-		 * returned holds the base pointer to the primary copy.	\
-		 */							\
-		dreg = dt_cg_agg_buf_prepare((aid), (sz), (dlp), (drp));\
-									\
-		/*							\
-		 * Call the update implementation.  Aggregation data is \
-		 * accessed through register 'dreg'.			\
-		 */							\
-		(f)((dlp), (drp), dreg, ## __VA_ARGS__);		\
-		dt_regset_free((drp), dreg);				\
-									\
-		TRACE_REGSET("        Upd: Switch");			\
+		for (i = 0; i < DT_AGG_NUM_COPIES; i++) {		\
+			if (i == 1) {					\
+				TRACE_REGSET("        Upd: Switch");	\
+			}						\
 									\
-		/*							\
-		 * Redirect reading to primary copy.  The register	\
-		 * returned holds the base pointer to the secondary	\
-		 * copy.						\
-		 */							\
-		dreg = dt_cg_agg_buf_prepare((aid), (sz), (dlp), (drp));\
+			dreg = dt_cg_agg_buf_prepare((aid), (sz), (dlp), (drp));\
 									\
-		/*							\
-		 * Call the update implementation.  Aggregation data is \
-		 * accessed through register 'dreg'.			\
-		 */							\
-		(f)((dlp), (drp), dreg, ## __VA_ARGS__);		\
-		dt_regset_free((drp), dreg);				\
+			(f)((dlp), (drp), dreg, ## __VA_ARGS__);	\
+			dt_regset_free((drp), dreg);			\
+		}							\
 									\
 		TRACE_REGSET("        Upd: End   ");			\
 	} while (0)
diff --git a/libdtrace/dt_impl.h b/libdtrace/dt_impl.h
index 933a67f2..4548a11d 100644
--- a/libdtrace/dt_impl.h
+++ b/libdtrace/dt_impl.h
@@ -189,6 +189,20 @@ typedef struct dt_ahash {
 	size_t		dtah_size;		/* size of hash table */
 } dt_ahash_t;
 
+/*
+ * To provide a lock-free aggregation write mechanism for the producer,
+ * two copies of each aggregation can be used.  A latch sequence number
+ * on each CPU can be incremented to indicate to the consumer which copy
+ * should be read and whether a copy has changed during reading.
+ *
+ * Until BPF maps can be mmapped, however, we cannot take advantage of
+ * this technique.  Using only a single-copy, of course, saves precious
+ * BPF map space.
+ *
+ * Pick DT_AGG_NUM_COPIES to be either 1 or 2.
+ */
+#define DT_AGG_NUM_COPIES 1
+
 typedef struct dt_aggregate {
 	char **dtat_cpu_buf;		/* per-CPU agg snapshot buffers */
 	char *dtat_buf;			/* aggregation snapshot buffer */
diff --git a/libdtrace/dt_map.c b/libdtrace/dt_map.c
index 0ea93a18..9a336768 100644
--- a/libdtrace/dt_map.c
+++ b/libdtrace/dt_map.c
@@ -274,12 +274,12 @@ dt_aggid_add(dtrace_hdl_t *dtp, const dt_ident_t *aid)
 	if (agg == NULL)
 		return dt_set_errno(dtp, EDT_NOMEM);
 
-	/* size subtracts latch sequence number and divides for 2 copies */
+	/* size subtracts latch sequence number and divides for multiple copies */
 	agg->dtagd_id = id;
 	agg->dtagd_name = aid->di_name;
 	agg->dtagd_sig = ((dt_idsig_t *)aid->di_data)->dis_auxinfo;
 	agg->dtagd_varid = aid->di_id;
-	agg->dtagd_size = (aid->di_size - sizeof(uint64_t))/ 2;
+	agg->dtagd_size = (aid->di_size - sizeof(uint64_t))/ DT_AGG_NUM_COPIES;
 	agg->dtagd_nrecs = agg->dtagd_size / sizeof(uint64_t);
 
 	recs = dt_calloc(dtp, agg->dtagd_nrecs, sizeof(dtrace_recdesc_t));
-- 
2.18.4




More information about the DTrace-devel mailing list