[DTrace-devel] [PATCH 6/6] Implement clear()

Kris Van Hees kris.van.hees at oracle.com
Wed Feb 22 15:17:28 UTC 2023


The clear() action is implemented by incrementing the global generation
counter for an aggregation name and clearing the data for matching
aggregtions within the consumer.  If aggregation data in a snapshot is
found to have an older generation, it is discarded.  The producer side
will already clear its data when the generation counter is found to be
higher than the generation of the data in a CPU's agg buffer.

Signed-off-by: Kris Van Hees <kris.van.hees at oracle.com>
---
 libdtrace/dt_aggregate.c                  | 41 ++++++++++++++++++-----
 libdtrace/dt_cg.c                         |  7 ----
 libdtrace/dt_consume.c                    | 32 ++++++++++++++++++
 test/unittest/aggs/tst.clear.d            |  1 -
 test/unittest/aggs/tst.clearavg.d         |  1 -
 test/unittest/aggs/tst.clearavg2.d        |  2 +-
 test/unittest/aggs/tst.cleardenormalize.d |  1 -
 test/unittest/aggs/tst.clearlquantize.d   |  1 -
 test/unittest/aggs/tst.clearnormalize.d   |  1 -
 9 files changed, 65 insertions(+), 22 deletions(-)

diff --git a/libdtrace/dt_aggregate.c b/libdtrace/dt_aggregate.c
index 2129c867..4adb3382 100644
--- a/libdtrace/dt_aggregate.c
+++ b/libdtrace/dt_aggregate.c
@@ -446,8 +446,17 @@ dt_aggregate_clear_one(const dtrace_aggdata_t *agd, void *arg)
 	dtrace_recdesc_t	*rec = &agg->dtagd_drecs[DT_AGGDATA_RECORD];
 	int64_t			*vals = (int64_t *)
 					&agd->dtada_data[rec->dtrd_offset];
+	uint64_t		agen;
 	int			i, max_cpus = dtp->dt_conf.max_cpuid + 1;
 
+	/*
+	 * We can pass the entire key because we know that the first uint32_t
+	 * in the key is the aggreggation ID we need.
+	 */
+	if (dt_bpf_map_lookup(dtp->dt_genmap_fd, agd->dtada_key, &agen) < 0)
+		agen = 0;
+	*(uint64_t *)agd->dtada_data = agen;
+
 	switch (rec->dtrd_action) {
 	case DT_AGG_MIN:
 		*vals = INT64_MAX;
@@ -483,12 +492,14 @@ dt_aggregate_snap_one(dtrace_hdl_t *dtp, int aggid, int cpu, const char *key,
 	dtrace_recdesc_t	*rec;
 	char			*ptr;
 	uint64_t		hval = aggid;
+	uint64_t		dgen;
 	size_t			ndx = hval % agh->dtah_size;
 	size_t			off, size;
 	int			i, rval;
 
-	/* Skip this aggregation if the data counter is 0. */
-	if (*(int64_t *)data == 0)
+	/* Data generation: skip if 0 */
+	dgen = *(uint64_t *)data;
+	if (dgen == 0)
 		return 0;
 
 	/* Retrieve the aggregation description. */
@@ -499,6 +510,7 @@ dt_aggregate_snap_one(dtrace_hdl_t *dtp, int aggid, int cpu, const char *key,
 	/* See if we already have an entry for this aggregation. */
 	for (h = agh->dtah_hash[ndx]; h != NULL; h = h->dtahe_next) {
 		dt_ident_t	*aid;
+		uint64_t	hgen;
 
 		/* Hash value needs to match. */
 		if (h->dtahe_hval != hval)
@@ -519,6 +531,16 @@ dt_aggregate_snap_one(dtrace_hdl_t *dtp, int aggid, int cpu, const char *key,
 				goto hashnext;
 		}
 
+		/*
+		 * Skip if data gen is older than hash gen.
+		 * Reset hash data if data gen is newer than hash gen.
+		 */
+		hgen = *(int64_t *)agd->dtada_data;
+		if (dgen < hgen)
+			return 0;
+		if (dgen > hgen)
+			dt_aggregate_clear_one(agd, dtp);
+
 		aid = dt_idhash_lookup(dtp->dt_aggs, agg->dtagd_name);
 		assert(aid != NULL);
 		dt_agg_one_agg(aid, &agg->dtagd_drecs[DT_AGGDATA_RECORD],
@@ -642,7 +664,7 @@ dtrace_aggregate_snap(dtrace_hdl_t *dtp)
 		int	fd = dt_bpf_map_lookup_fd(dtp->dt_aggmap_fd, &cpu);
 
 		if (fd < 0)
-			return -1;
+			return DTRACE_WORKSTATUS_ERROR;
 
 		rval = dt_aggregate_snap_cpu(dtp, cpu, fd);
 		close(fd);
@@ -650,7 +672,7 @@ dtrace_aggregate_snap(dtrace_hdl_t *dtp)
 			return rval;
 	}
 
-	return 0;
+	return DTRACE_WORKSTATUS_OKAY;
 }
 
 static int
@@ -1219,7 +1241,9 @@ dt_aggregate_walk_sorted(dtrace_hdl_t *dtp, dtrace_aggregate_f *func,
 	dt_ahash_t *hash = &agp->dtat_hash;
 	size_t i, nentries = 0;
 
+#if 0
 	dtrace_aggregate_snap(dtp);
+#endif
 
 	/*
 	 * Count how many aggregations have data in the buffers.  If there are
@@ -1353,6 +1377,10 @@ dtrace_aggregate_walk_joined(dtrace_hdl_t *dtp, dtrace_aggid_t *aggvars,
 	int i, j;
 	dtrace_optval_t sortpos = dtp->dt_options[DTRACEOPT_AGGSORTPOS];
 
+#if 0
+	dtrace_aggregate_snap(dtp);
+#endif
+
 	/*
 	 * If the sorting position is greater than the number of aggregation
 	 * variable IDs, we silently set it to 0.
@@ -1423,11 +1451,6 @@ dtrace_aggregate_walk_joined(dtrace_hdl_t *dtp, dtrace_aggid_t *aggvars,
 		map[aggvar] = i + 1;
 	}
 
-	/*
-	 * Retrieve the aggregation data.
-	 */
-	dtrace_aggregate_snap(dtp);
-
 	/*
 	 * We need to take two passes over the data to size our allocation, so
 	 * we'll use the first pass to also fill in the zero-filled data to be
diff --git a/libdtrace/dt_cg.c b/libdtrace/dt_cg.c
index 38c43df3..b72da90a 100644
--- a/libdtrace/dt_cg.c
+++ b/libdtrace/dt_cg.c
@@ -1350,14 +1350,7 @@ dt_cg_act_clear(dt_pcb_t *pcb, dt_node_t *dnp, dtrace_actkind_t kind)
 		dnerror(dnp, D_CLEAR_AGGBAD,
 			"undefined aggregation: @%s\n", aid->di_name);
 
-	/*
-	 * FIXME: Needs implementation
-	 * TODO: Emit code to clear the given aggregation.
-	 * DEPENDS ON: How aggregations are implemented using eBPF (hashmap?).
-	 * AGGID = aid->di_id
-	 */
 	dt_cg_store_val(pcb, anp, DTRACEACT_LIBACT, NULL, DT_ACT_CLEAR);
-	dnerror(dnp, D_UNKNOWN, "clear() is not implemented (yet)\n");
 }
 
 /*
diff --git a/libdtrace/dt_consume.c b/libdtrace/dt_consume.c
index 34009e6b..e90edae9 100644
--- a/libdtrace/dt_consume.c
+++ b/libdtrace/dt_consume.c
@@ -1512,6 +1512,33 @@ dt_normalize(dtrace_hdl_t *dtp, caddr_t base, dtrace_recdesc_t *rec)
 	return 0;
 }
 
+static int
+dt_clear(dtrace_hdl_t *dtp, caddr_t base, dtrace_recdesc_t *rec)
+{
+	dtrace_aggid_t	aid;
+	uint64_t	gen;
+	caddr_t		addr;
+
+	/* We have just one record: the aggregation ID. */
+	addr = base + rec->dtrd_offset;
+
+	if (rec->dtrd_size != sizeof(dtrace_aggid_t))
+		return dt_set_errno(dtp, EDT_BADNORMAL);
+
+	aid = *((dtrace_aggid_t *)addr);
+
+	if (dt_bpf_map_lookup(dtp->dt_genmap_fd, &aid, &gen) < 0)
+		return -1;
+	gen++;
+	if (dt_bpf_map_update(dtp->dt_genmap_fd, &aid, &gen) < 0)
+		return -1;
+
+	/* Also clear our own copy of the data, in case it gets printed. */
+	dtrace_aggregate_walk(dtp, dt_aggregate_clear_one, dtp);
+
+	return 0;
+}
+
 #ifdef FIXME
 static int
 dt_clear_agg(const dtrace_aggdata_t *aggdata, void *arg)
@@ -2325,6 +2352,11 @@ dt_consume_one_probe(dtrace_hdl_t *dtp, FILE *fp, char *data, uint32_t size,
 					return DTRACE_WORKSTATUS_ERROR;
 
 				i++;
+				continue;
+			case DT_ACT_CLEAR:
+				if (dt_clear(dtp, data, rec) != 0)
+					return DTRACE_WORKSTATUS_ERROR;
+
 				continue;
 			case DT_ACT_FTRUNCATE:
 				if (fp == NULL)
diff --git a/test/unittest/aggs/tst.clear.d b/test/unittest/aggs/tst.clear.d
index 408dd373..f574b5d2 100644
--- a/test/unittest/aggs/tst.clear.d
+++ b/test/unittest/aggs/tst.clear.d
@@ -4,7 +4,6 @@
  * Licensed under the Universal Permissive License v 1.0 as shown at
  * http://oss.oracle.com/licenses/upl.
  */
-/* @@xfail: dtv2 - requires clear() */
 
 /*
  * ASSERTION:
diff --git a/test/unittest/aggs/tst.clearavg.d b/test/unittest/aggs/tst.clearavg.d
index 58d54742..16d18f34 100644
--- a/test/unittest/aggs/tst.clearavg.d
+++ b/test/unittest/aggs/tst.clearavg.d
@@ -4,7 +4,6 @@
  * Licensed under the Universal Permissive License v 1.0 as shown at
  * http://oss.oracle.com/licenses/upl.
  */
-/* @@xfail: dtv2 - requires clear() */
 
 /*
  * ASSERTION:
diff --git a/test/unittest/aggs/tst.clearavg2.d b/test/unittest/aggs/tst.clearavg2.d
index 3d918502..984ce82c 100644
--- a/test/unittest/aggs/tst.clearavg2.d
+++ b/test/unittest/aggs/tst.clearavg2.d
@@ -4,7 +4,7 @@
  * Licensed under the Universal Permissive License v 1.0 as shown at
  * http://oss.oracle.com/licenses/upl.
  */
-/* @@xfail: dtv2 - requires clear() */
+
 /*
  * ASSERTION:
  * 	Positive avg() test
diff --git a/test/unittest/aggs/tst.cleardenormalize.d b/test/unittest/aggs/tst.cleardenormalize.d
index 277b7c7f..191ecad7 100644
--- a/test/unittest/aggs/tst.cleardenormalize.d
+++ b/test/unittest/aggs/tst.cleardenormalize.d
@@ -4,7 +4,6 @@
  * Licensed under the Universal Permissive License v 1.0 as shown at
  * http://oss.oracle.com/licenses/upl.
  */
-/* @@xfail: dtv2 - requires clear() */
 
 /*
  * ASSERTION:
diff --git a/test/unittest/aggs/tst.clearlquantize.d b/test/unittest/aggs/tst.clearlquantize.d
index dcd4fef7..7e47e3aa 100644
--- a/test/unittest/aggs/tst.clearlquantize.d
+++ b/test/unittest/aggs/tst.clearlquantize.d
@@ -4,7 +4,6 @@
  * Licensed under the Universal Permissive License v 1.0 as shown at
  * http://oss.oracle.com/licenses/upl.
  */
-/* @@xfail: dtv2 - requires clear() */
 
 /*
  * ASSERTION:
diff --git a/test/unittest/aggs/tst.clearnormalize.d b/test/unittest/aggs/tst.clearnormalize.d
index 211371d1..34d225f0 100644
--- a/test/unittest/aggs/tst.clearnormalize.d
+++ b/test/unittest/aggs/tst.clearnormalize.d
@@ -4,7 +4,6 @@
  * Licensed under the Universal Permissive License v 1.0 as shown at
  * http://oss.oracle.com/licenses/upl.
  */
-/* @@xfail: dtv2 - requires clear() */
 
 /*
  * ASSERTION:
-- 
2.39.1




More information about the DTrace-devel mailing list