[DTrace-devel] [PATCH 1/4] Do not report unused aggregations

Kris Van Hees kris.van.hees at oracle.com
Wed Jan 6 19:20:01 PST 2021


First of, I think that the short desc line for this patch should be
something like:

	Switch from per-CPU latches to per-CPU/per-agg latches

Yes, the patch came out of the issue that you found about unused aggregations
getting printed, but the fix has a larger impact than just that and in fact is
reflecting work you suggested earlier before this problem was even known.

The fix to the printing of unused aggregations is something I would mention as
something that makes use of the latch change.

On Wed, Jan 06, 2021 at 08:39:10PM -0500, eugene.loh at oracle.com wrote:
> From: Eugene Loh <eugene.loh at oracle.com>
> 
> Aggregations that appear in a D script are printed even if the probes
> that use them never fire.  This conflicts with DTv1 behavior.

I would omit the paragraph above.

> Change the latch sequence number to be per-aggregation, not only per-CPU.
> This way, one can see whether an aggregation has any data or not.  (Some
> aggregations, like count(), already inherently have such information, but
> we need to handle the general case.)
> 
> This change has the added advantage of reducing producer-consumer conflicts.
> E.g., imagine that the producer is firing very often, updating different
> aggregations, while the consuming is trying to read a single, large
> aggregation (e.g., some *quantize aggregation).  A single latch sequence
> number for all aggregations would keep updating, causing the consumer to
> keep rereading an aggregation unnecessarily.
> 
> Also, when taking snapshots of aggregations, copy only a single copy of
> the aggregation data.  Neither the latch sequence number nor the second
> copy are needed.

Maybe add here:

	This also solves the problem that unused Aggregations that appear in a
	D script were being printed even though they should not.

> Introduce a test to catch the printing of unused aggregations.
> 
> https://github.com/oracle/dtrace-utils/issues/3
> Signed-off-by: Eugene Loh <eugene.loh at oracle.com>
> ---
>  libdtrace/dt_aggregate.c       | 77 ++++++++++++++--------------------
>  libdtrace/dt_bpf.c             |  8 +---
>  libdtrace/dt_cg.c              | 55 ++++++++++++------------
>  libdtrace/dt_map.c             |  3 +-
>  test/unittest/aggs/tst.empty.d | 29 +++++++++++++
>  test/unittest/aggs/tst.empty.r |  2 +
>  6 files changed, 93 insertions(+), 81 deletions(-)
>  create mode 100644 test/unittest/aggs/tst.empty.d
>  create mode 100644 test/unittest/aggs/tst.empty.r
> 
> diff --git a/libdtrace/dt_aggregate.c b/libdtrace/dt_aggregate.c
> index 972c19a3..d59f107e 100644
> --- a/libdtrace/dt_aggregate.c
> +++ b/libdtrace/dt_aggregate.c
> @@ -422,19 +422,30 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
>  	size_t			ndx = hval % agh->dtah_size;
>  	int			rval;
>  	uint_t			i;
> +	int64_t			*src;
> +	int			realsz;
>  
>  	rval = dt_aggid_lookup(st->dtp, aid->di_id, &agg);
>  	if (rval != 0)
>  		return rval;
>  
> +	/* check latch sequence number to see if there is any data */
> +	src = (int64_t *)(st->buf + aid->di_offset);
> +	if (*src == 0)
> +		return 0;
> +	src++;
> +
> +	/* real size excludes latch sequence number and second data copy */
> +	realsz = (aid->di_size - sizeof(uint64_t)) / 2;
> +
>  	/* See if we already have an entry for this aggregation. */
>  	for (h = agh->dtah_hash[ndx]; h != NULL; h = h->dtahe_next) {
> -		int64_t	*src, *dst, *cpu_dst;
> +		int64_t	*dst, *cpu_dst;
>  		uint_t	cnt;
>  
>  		if (h->dtahe_hval != hval)
>  			continue;
> -		if (h->dtahe_size != aid->di_size)
> +		if (h->dtahe_size != realsz)
>  			continue;
>  
>  		/* Entry found - process the data. */
> @@ -445,7 +456,6 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
>  				: NULL;
>  
>  accumulate:
> -		src = (int64_t *)(st->buf + aid->di_offset);
>  		switch (((dt_ident_t *)aid->di_iarg)->di_id) {
>  		case DT_AGG_MAX:
>  			if (*src > *dst)
> @@ -458,7 +468,7 @@ accumulate:
>  
>  			break;
>  		default:
> -			for (i = 0, cnt = aid->di_size / sizeof(int64_t);
> +			for (i = 0, cnt = realsz / sizeof(int64_t);
>  			     i < cnt; i++, dst++, src++)
>  				*dst += *src;
>  		}
> @@ -480,20 +490,20 @@ accumulate:
>  		return dt_set_errno(st->dtp, EDT_NOMEM);
>  
>  	agd = &h->dtahe_data;
> -	agd->dtada_data = dt_alloc(st->dtp, aid->di_size);
> +	agd->dtada_data = dt_alloc(st->dtp, realsz);
>  	if (agd->dtada_data == NULL) {
>  		dt_free(st->dtp, h);
>  		return dt_set_errno(st->dtp, EDT_NOMEM);
>  	}
>  
> -	memcpy(agd->dtada_data, st->buf + aid->di_offset, aid->di_size);
> -	agd->dtada_size = aid->di_size;
> +	memcpy(agd->dtada_data, src, realsz);
> +	agd->dtada_size = realsz;
>  	agd->dtada_desc = agg;
>  	agd->dtada_hdl = st->dtp;
>  	agd->dtada_normal = 1;
>  
>  	h->dtahe_hval = hval;
> -	h->dtahe_size = aid->di_size;
> +	h->dtahe_size = realsz;
>  
>  	if (st->agp->dtat_flags & DTRACE_A_PERCPU) {
>  		char	**percpu = dt_calloc(st->dtp,
> @@ -508,7 +518,7 @@ accumulate:
>  		}
>  
>  		for (i = 0; i <= st->dtp->dt_conf.max_cpuid; i++) {
> -			percpu[i] = dt_zalloc(st->dtp, aid->di_size);
> +			percpu[i] = dt_zalloc(st->dtp, realsz);
>  			if (percpu[i] == NULL) {
>  				while (--i >= 0)
>  					dt_free(st->dtp, percpu[i]);
> @@ -519,7 +529,7 @@ accumulate:
>  			}
>  		}
>  
> -		memcpy(percpu[st->cpu], st->buf + aid->di_offset, aid->di_size);
> +		memcpy(percpu[st->cpu], src, realsz);
>  		agd->dtada_percpu = percpu;
>  	}
>  
> @@ -544,21 +554,11 @@ dt_aggregate_snap_cpu(dtrace_hdl_t *dtp, processorid_t cpu)
>  {
>  	dt_aggregate_t	*agp = &dtp->dt_aggregate;
>  	char		*buf = agp->dtat_cpu_buf[cpu];
> -	uint64_t	*seq = (uint64_t *)buf;
>  	dt_snapstate_t	st;
>  
> -	/* Nothing to be done? */
> -	if (*seq == 0)
> -		return 0;
> -
> -	/*
> -	 * Process all static aggregation identifiers in the CPU buffer.  We
> -	 * skip over the latch sequence number because from this point on we
> -	 * are simply processing data.
> -	 */
>  	st.dtp = dtp;
>  	st.cpu = cpu;
> -	st.buf = buf + sizeof(*seq);
> +	st.buf = buf;
>  	st.agp = agp;
>  
>  	return dt_idhash_iter(dtp->dt_aggs,
> @@ -959,9 +959,6 @@ dt_aggregate_bundlecmp(const void *lhs, const void *rhs)
>   * The minimum 64 bit value is set for max() and the maximum 64 bit value is
>   * set for min(), so that any other value fed to the functions will register
>   * properly.
> - *
> - * The first value in the buffer is used as a flag to indicate whether an
> - * initial value was stored in the buffer.
>   */
>  static int
>  init_minmax(dt_idhash_t *dhp, dt_ident_t *aid, char *buf)
> @@ -980,12 +977,10 @@ init_minmax(dt_idhash_t *dhp, dt_ident_t *aid, char *buf)
>  	else
>  		return 0;
>  
> -	/* Indicate that we are setting initial values. */
> -	*(int64_t *)buf = 1;
> -
> -	ptr = (int64_t *)(buf + sizeof(int64_t) + aid->di_offset);
> -	ptr[0] = value;
> +	/* skip ptr[0], it is the latch sequence number */
> +	ptr = (int64_t *)(buf + aid->di_offset);
>  	ptr[1] = value;
> +	ptr[2] = value;
>  
>  	return 0;
>  }
> @@ -1007,12 +1002,7 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
>  	 * Allocate a buffer to hold the aggregation data for all possible
>  	 * CPUs, and initialize the per-CPU data pointers for CPUs that are
>  	 * currently enabled.
> -	 *
> -	 * The size of the aggregation data is to be increased by the size of
> -	 * the latch sequence number.  That yields the actual size of the data
> -	 * allocated per CPU.
>  	 */
> -	aggsz += sizeof(uint64_t);
>  	agp->dtat_buf = dt_zalloc(dtp, dtp->dt_conf.num_possible_cpus * aggsz);
>  	if (agp->dtat_buf == NULL)
>  		return dt_set_errno(dtp, EDT_NOMEM);
> @@ -1043,21 +1033,16 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
>  	/* Initialize the starting values for min() and max() aggregations.  */
>  	dt_idhash_iter(dtp->dt_aggs, (dt_idhash_f *) init_minmax,
>  		       agp->dtat_buf);
> -	if (*(int64_t *)agp->dtat_buf != 0) {
> -		*(int64_t *)agp->dtat_buf = 0;	/* clear the flag */
> -
> -		for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
> -			int	cpu = dtp->dt_conf.cpus[i].cpu_id;
> -
> -			/* Data for CPU 0 was populated, so skip it. */
> -			if (cpu == 0)
> -				continue;
> +	for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
> +		int	cpu = dtp->dt_conf.cpus[i].cpu_id;
>  
> -			memcpy(agp->dtat_cpu_buf[cpu], agp->dtat_buf, aggsz);
> -		}
> +		/* Data for CPU 0 was populated, so skip it. */
> +		if (cpu == 0)
> +			continue;
>  
> -		dt_bpf_map_update(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
> +		memcpy(agp->dtat_cpu_buf[cpu], agp->dtat_buf, aggsz);
>  	}
> +	dt_bpf_map_update(dtp->dt_aggmap_fd, &key, agp->dtat_buf);

Hm, so you changed this (and init_minmax) in a way that actually changes its
operation from only populating the aggregation data when there is actual init
data to store to always populating data.  I'd really prefer to see a solution
that maintains the concept to only store data if there is init data.

You could still do that by using the latch for the first aggregation (because
you know it is to be 0).  Simply set that one to 1 if any init data gets
stored in the buffer, and reset it back to 0 as it was before.  Same mechanism
as before, just instead of using a per-CPU latch as flag you use the per-CPU
latch for the first aggregation.

>  	return 0;
>  }
> diff --git a/libdtrace/dt_bpf.c b/libdtrace/dt_bpf.c
> index c6102f15..1705c990 100644
> --- a/libdtrace/dt_bpf.c
> +++ b/libdtrace/dt_bpf.c
> @@ -223,16 +223,12 @@ dt_bpf_gmap_create(dtrace_hdl_t *dtp)
>  		return -1;		/* dt_errno is set for us */
>  
>  	/*
> -	 * If there is aggregation data to be collected, we need to create the
> -	 * 'aggs' BPF map, and account for a uint64_t in the map value size to
> -	 * hold a latch sequence number (seq) for concurrent access to the
> -	 * data.
> +	 * Check if there is aggregation data to be collected.
>  	 */
>  	if (aggsz > 0) {
>  		dtp->dt_aggmap_fd = create_gmap(dtp, "aggs",
>  						BPF_MAP_TYPE_PERCPU_ARRAY,
> -						sizeof(uint32_t),
> -						sizeof(uint64_t) + aggsz, 1);
> +						sizeof(uint32_t), aggsz, 1);
>  		if (dtp->dt_aggmap_fd == -1)
>  			return -1;	/* dt_errno is set for us */
>  	}
> diff --git a/libdtrace/dt_cg.c b/libdtrace/dt_cg.c
> index 8e440c8e..3173f128 100644
> --- a/libdtrace/dt_cg.c
> +++ b/libdtrace/dt_cg.c
> @@ -3033,13 +3033,14 @@ dt_cg_node(dt_node_t *dnp, dt_irlist_t *dlp, dt_regset_t *drp)
>   * identifier (if not set yet).
>   *
>   * We consume twice the required data size because of the odd/even data pair
> - * mechanism to provide lockless, write-wait-free operation.
> + * mechanism to provide lockless, write-wait-free operation.  Additionally,
> + * we make room for a latch sequence number.
>   */
>  #define DT_CG_AGG_SET_STORAGE(aid, sz) \
>  	do { \
>  		if ((aid)->di_offset == -1) \
>  			dt_ident_set_storage((aid), sizeof(uint64_t), \
> -					     2 * (sz)); \
> +					     sizeof(uint64_t) + 2 * (sz)); \
>  	} while (0)
>  
>  /*
> @@ -3047,7 +3048,7 @@ dt_cg_node(dt_node_t *dnp, dt_irlist_t *dlp, dt_regset_t *drp)
>   * return a register that holds a pointer to the aggregation data to be
>   * updated.
>   *
> - * We update the latch seqcount (first value in the aggregation buffer) to
> + * We update the latch seqcount (first value in the aggregation) to
>   * signal that reads should be directed to the alternate copy of the data.  We
>   * then determine the location of data for the given aggregation that can be
>   * updated.  This value is stored in the register returned from this function.
> @@ -3056,49 +3057,47 @@ static int
>  dt_cg_agg_buf_prepare(dt_ident_t *aid, int size, dt_irlist_t *dlp,
>  		      dt_regset_t *drp)
>  {
> -	int		rx, ry;
> +	int		rptr, roff;

I think this renaming of registers is confusing.  Sure, rx and ry wasn't the
most informative, but using rptr sounds like it is a pointer when it actually
is not: it is a register number.  Also, in the code below you change agd into
ptr, for which I do not see any reason.  The 'agd' name refers to aggregation
data whereas (to me) 'ptr' is a generic pointer.  Why not stick with what it
is (aggregation data)?  In that case, perhaps use ragd and roff?

>  	TRACE_REGSET("            Prep: Begin");
>  
>  	dt_regset_xalloc(drp, BPF_REG_0);
> -	rx = dt_regset_alloc(drp);
> -	ry = dt_regset_alloc(drp);
> -	assert(rx != -1 && ry != -1);
> +	rptr = dt_regset_alloc(drp);
> +	roff = dt_regset_alloc(drp);
> +	assert(rptr != -1 && roff != -1);
>  
>  	/*
> -	 *				//     (%rX = register for agg ptr)
> -	 *				//     (%rY = register for agg data)
> -	 *	agd = dctx->agg;	// lddw %r0, [%fp + DT_STK_DCTX]
> -	 *				// lddw %rX, [%r0 + DCTX_AGG]
> +	 *	ptr = dctx->agg;	// lddw %r0, [%fp + DT_STK_DCTX]
> +	 *				// lddw %rptr, [%r0 + DCTX_AGG]
>  	 */
>  	emit(dlp, BPF_LOAD(BPF_DW, BPF_REG_0, BPF_REG_FP, DT_STK_DCTX));
> -	emit(dlp, BPF_LOAD(BPF_DW, rx, BPF_REG_0, DCTX_AGG));
> +	emit(dlp, BPF_LOAD(BPF_DW, rptr, BPF_REG_0, DCTX_AGG));
> +	emit(dlp, BPF_ALU64_IMM(BPF_ADD, rptr, aid->di_offset));
>  
>  	/*
> -	 *	off = (*agd % 2) * size	// lddw %rY, [%rX + 0]
> -	 *	      + aid->di_offset	// and %rY, 1
> -	 *	      + sizeof(uint64_t);
> -	 *				// mul %rY, size
> -	 *				// add %rY, aid->di_offset +
> +	 *	off = (*ptr & 1) * size	// lddw %roff, [%rptr + 0]
> +	 *	      + sizeof(uint64_t);	// and %roff, 1
> +	 *				// mul %roff, size
> +	 *				// add %roff, aid->di_offset +
>  	 *				//		sizeof(uint64_t)
> -	 *	(*agd)++;		// mov %r0, 1
> -	 *				// xadd [%rX + 0], %r0
> -	 *	agd += off;		// add %rX, %rY
> +	 *	(*ptr)++;		// mov %r0, 1
> +	 *				// xadd [%rptr + 0], %r0
> +	 *	ptr += off;		// add %rptr, %roff
>  	 */
> -	emit(dlp, BPF_LOAD(BPF_DW, ry, rx, 0));
> -	emit(dlp, BPF_ALU64_IMM(BPF_AND, ry, 1));
> -	emit(dlp, BPF_ALU64_IMM(BPF_MUL, ry, size));
> -	emit(dlp, BPF_ALU64_IMM(BPF_ADD, ry, aid->di_offset + sizeof(uint64_t)));
> +	emit(dlp, BPF_LOAD(BPF_DW, roff, rptr, 0));
> +	emit(dlp, BPF_ALU64_IMM(BPF_AND, roff, 1));
> +	emit(dlp, BPF_ALU64_IMM(BPF_MUL, roff, size));
> +	emit(dlp, BPF_ALU64_IMM(BPF_ADD, roff, sizeof(uint64_t)));
>  	emit(dlp, BPF_MOV_IMM(BPF_REG_0, 1));
> -	emit(dlp, BPF_XADD_REG(BPF_DW, rx, 0, BPF_REG_0));
> -	emit(dlp, BPF_ALU64_REG(BPF_ADD, rx, ry));
> +	emit(dlp, BPF_XADD_REG(BPF_DW, rptr, 0, BPF_REG_0));
> +	emit(dlp, BPF_ALU64_REG(BPF_ADD, rptr, roff));
>  
> -	dt_regset_free(drp, ry);
> +	dt_regset_free(drp, roff);
>  	dt_regset_free(drp, BPF_REG_0);
>  
>  	TRACE_REGSET("            Prep: End  ");
>  
> -	return rx;
> +	return rptr;
>  }
>  
>  #define DT_CG_AGG_IMPL(aid, sz, dlp, drp, f, ...) \
> diff --git a/libdtrace/dt_map.c b/libdtrace/dt_map.c
> index 0c9495d7..0ea93a18 100644
> --- a/libdtrace/dt_map.c
> +++ b/libdtrace/dt_map.c
> @@ -274,11 +274,12 @@ dt_aggid_add(dtrace_hdl_t *dtp, const dt_ident_t *aid)
>  	if (agg == NULL)
>  		return dt_set_errno(dtp, EDT_NOMEM);
>  
> +	/* size subtracts latch sequence number and divides for 2 copies */

This is a confusing comment: do you mean aid->di_size or agg->dtagd_size?  From
the code below, I can figure it out, but then the comment is not really needed
anymore.  How about commenting how the aggregation data size (dtagd_size)
relates to the aggregation storage size (di_size)?

>  	agg->dtagd_id = id;
>  	agg->dtagd_name = aid->di_name;
>  	agg->dtagd_sig = ((dt_idsig_t *)aid->di_data)->dis_auxinfo;
>  	agg->dtagd_varid = aid->di_id;
> -	agg->dtagd_size = aid->di_size / 2;
> +	agg->dtagd_size = (aid->di_size - sizeof(uint64_t))/ 2;
>  	agg->dtagd_nrecs = agg->dtagd_size / sizeof(uint64_t);
>  
>  	recs = dt_calloc(dtp, agg->dtagd_nrecs, sizeof(dtrace_recdesc_t));
> diff --git a/test/unittest/aggs/tst.empty.d b/test/unittest/aggs/tst.empty.d
> new file mode 100644
> index 00000000..95e21dee
> --- /dev/null
> +++ b/test/unittest/aggs/tst.empty.d
> @@ -0,0 +1,29 @@
> +/*
> + * Oracle Linux DTrace.
> + * Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
> + * Licensed under the Universal Permissive License v 1.0 as shown at
> + * http://oss.oracle.com/licenses/upl.
> + */
> +
> +/*
> + * ASSERTION: Empty aggregations are not printed
> + *
> + * SECTION: Aggregations/Aggregations
> + */
> +
> +#pragma D option quiet
> +
> +BEGIN
> +{
> +	i = 12345678;
> +	@a = sum(i);
> +	exit(0)

Trailing ; missing.

> +}
> +
> +/* this probe should not fire */
> +tick-1hour
> +{
> +	/* these aggregations should not be printed */
> +	@b = min(i);
> +	@c = max(i)

Trailing ; missing.

> +}
> diff --git a/test/unittest/aggs/tst.empty.r b/test/unittest/aggs/tst.empty.r
> new file mode 100644
> index 00000000..aab82b76
> --- /dev/null
> +++ b/test/unittest/aggs/tst.empty.r
> @@ -0,0 +1,2 @@
> +
> +         12345678
> -- 
> 2.18.4
> 
> 
> _______________________________________________
> DTrace-devel mailing list
> DTrace-devel at oss.oracle.com
> https://oss.oracle.com/mailman/listinfo/dtrace-devel



More information about the DTrace-devel mailing list