[DTrace-devel] [PATCH v2 1/4] Switch from per-CPU latches to per-CPU/per-agg latches

Kris Van Hees kris.van.hees at oracle.com
Wed Jan 13 11:56:04 PST 2021


Reviewed-by: Kris Van Hees <kris.van.hees at oracle.com>

 .. with minor code change below

On Mon, Jan 11, 2021 at 02:58:42PM -0500, eugene.loh at oracle.com wrote:
> From: Eugene Loh <eugene.loh at oracle.com>
> 
> Change the latch sequence number to be per-aggregation, not only per-CPU.
> This way, one can see whether an aggregation has any data or not.  (Some
> aggregations, like count(), already inherently have such information, but
> we need to handle the general case.)
> 
> This change has the added advantage of reducing producer-consumer conflicts.
> E.g., imagine that the producer is firing very often, updating different
> aggregations, while the consuming is trying to read a single, large
> aggregation (e.g., some *quantize aggregation).  A single latch sequence
> number for all aggregations would keep updating, causing the consumer to
> keep rereading an aggregation unnecessarily.
> 
> Also, when taking snapshots of aggregations, copy only a single copy of
> the aggregation data.  Neither the latch sequence number nor the second
> copy are needed.
> 
> This also solves the problem that aggregations that appear in a D script
> were being printed even if the probes that use them never fire.
> 
> Introduce a test to catch the printing of unused aggregations.
> 
> https://github.com/oracle/dtrace-utils/issues/3
> Signed-off-by: Eugene Loh <eugene.loh at oracle.com>
> ---
>  libdtrace/dt_aggregate.c       | 59 ++++++++++++++++------------------
>  libdtrace/dt_bpf.c             |  8 ++---
>  libdtrace/dt_cg.c              | 53 +++++++++++++++---------------
>  libdtrace/dt_map.c             |  7 +++-
>  test/unittest/aggs/tst.empty.d | 29 +++++++++++++++++
>  test/unittest/aggs/tst.empty.r |  2 ++
>  6 files changed, 92 insertions(+), 66 deletions(-)
>  create mode 100644 test/unittest/aggs/tst.empty.d
>  create mode 100644 test/unittest/aggs/tst.empty.r
> 
> diff --git a/libdtrace/dt_aggregate.c b/libdtrace/dt_aggregate.c
> index 972c19a3..389ede8c 100644
> --- a/libdtrace/dt_aggregate.c
> +++ b/libdtrace/dt_aggregate.c
> @@ -422,19 +422,30 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
>  	size_t			ndx = hval % agh->dtah_size;
>  	int			rval;
>  	uint_t			i;
> +	int64_t			*src;
> +	int			realsz;
>  
>  	rval = dt_aggid_lookup(st->dtp, aid->di_id, &agg);
>  	if (rval != 0)
>  		return rval;
>  
> +	/* check latch sequence number to see if there is any data */
> +	src = (int64_t *)(st->buf + aid->di_offset);
> +	if (*src == 0)
> +		return 0;
> +	src++;
> +
> +	/* real size excludes latch sequence number and second data copy */
> +	realsz = (aid->di_size - sizeof(uint64_t)) / 2;
> +
>  	/* See if we already have an entry for this aggregation. */
>  	for (h = agh->dtah_hash[ndx]; h != NULL; h = h->dtahe_next) {
> -		int64_t	*src, *dst, *cpu_dst;
> +		int64_t	*dst, *cpu_dst;
>  		uint_t	cnt;
>  
>  		if (h->dtahe_hval != hval)
>  			continue;
> -		if (h->dtahe_size != aid->di_size)
> +		if (h->dtahe_size != realsz)
>  			continue;
>  
>  		/* Entry found - process the data. */
> @@ -445,7 +456,6 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
>  				: NULL;
>  
>  accumulate:
> -		src = (int64_t *)(st->buf + aid->di_offset);
>  		switch (((dt_ident_t *)aid->di_iarg)->di_id) {
>  		case DT_AGG_MAX:
>  			if (*src > *dst)
> @@ -458,7 +468,7 @@ accumulate:
>  
>  			break;
>  		default:
> -			for (i = 0, cnt = aid->di_size / sizeof(int64_t);
> +			for (i = 0, cnt = realsz / sizeof(int64_t);
>  			     i < cnt; i++, dst++, src++)
>  				*dst += *src;
>  		}
> @@ -480,20 +490,20 @@ accumulate:
>  		return dt_set_errno(st->dtp, EDT_NOMEM);
>  
>  	agd = &h->dtahe_data;
> -	agd->dtada_data = dt_alloc(st->dtp, aid->di_size);
> +	agd->dtada_data = dt_alloc(st->dtp, realsz);
>  	if (agd->dtada_data == NULL) {
>  		dt_free(st->dtp, h);
>  		return dt_set_errno(st->dtp, EDT_NOMEM);
>  	}
>  
> -	memcpy(agd->dtada_data, st->buf + aid->di_offset, aid->di_size);
> -	agd->dtada_size = aid->di_size;
> +	memcpy(agd->dtada_data, src, realsz);
> +	agd->dtada_size = realsz;
>  	agd->dtada_desc = agg;
>  	agd->dtada_hdl = st->dtp;
>  	agd->dtada_normal = 1;
>  
>  	h->dtahe_hval = hval;
> -	h->dtahe_size = aid->di_size;
> +	h->dtahe_size = realsz;
>  
>  	if (st->agp->dtat_flags & DTRACE_A_PERCPU) {
>  		char	**percpu = dt_calloc(st->dtp,
> @@ -508,7 +518,7 @@ accumulate:
>  		}
>  
>  		for (i = 0; i <= st->dtp->dt_conf.max_cpuid; i++) {
> -			percpu[i] = dt_zalloc(st->dtp, aid->di_size);
> +			percpu[i] = dt_zalloc(st->dtp, realsz);
>  			if (percpu[i] == NULL) {
>  				while (--i >= 0)
>  					dt_free(st->dtp, percpu[i]);
> @@ -519,7 +529,7 @@ accumulate:
>  			}
>  		}
>  
> -		memcpy(percpu[st->cpu], st->buf + aid->di_offset, aid->di_size);
> +		memcpy(percpu[st->cpu], src, realsz);
>  		agd->dtada_percpu = percpu;
>  	}
>  
> @@ -544,21 +554,11 @@ dt_aggregate_snap_cpu(dtrace_hdl_t *dtp, processorid_t cpu)
>  {
>  	dt_aggregate_t	*agp = &dtp->dt_aggregate;
>  	char		*buf = agp->dtat_cpu_buf[cpu];
> -	uint64_t	*seq = (uint64_t *)buf;
>  	dt_snapstate_t	st;
>  
> -	/* Nothing to be done? */
> -	if (*seq == 0)
> -		return 0;
> -
> -	/*
> -	 * Process all static aggregation identifiers in the CPU buffer.  We
> -	 * skip over the latch sequence number because from this point on we
> -	 * are simply processing data.
> -	 */
>  	st.dtp = dtp;
>  	st.cpu = cpu;
> -	st.buf = buf + sizeof(*seq);
> +	st.buf = buf;
>  	st.agp = agp;
>  
>  	return dt_idhash_iter(dtp->dt_aggs,
> @@ -960,8 +960,8 @@ dt_aggregate_bundlecmp(const void *lhs, const void *rhs)
>   * set for min(), so that any other value fed to the functions will register
>   * properly.
>   *
> - * The first value in the buffer is used as a flag to indicate whether an
> - * initial value was stored in the buffer.
> + * The latch sequence number of the first aggregation is used as a flag to
> + * indicate whether an initial value was stored for any aggregation.
>   */
>  static int
>  init_minmax(dt_idhash_t *dhp, dt_ident_t *aid, char *buf)
> @@ -983,9 +983,10 @@ init_minmax(dt_idhash_t *dhp, dt_ident_t *aid, char *buf)
>  	/* Indicate that we are setting initial values. */
>  	*(int64_t *)buf = 1;
>  
> -	ptr = (int64_t *)(buf + sizeof(int64_t) + aid->di_offset);
> -	ptr[0] = value;
> +	/* skip ptr[0], it is the latch sequence number */
> +	ptr = (int64_t *)(buf + aid->di_offset);
>  	ptr[1] = value;
> +	ptr[2] = value;
>  
>  	return 0;
>  }
> @@ -1007,12 +1008,7 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
>  	 * Allocate a buffer to hold the aggregation data for all possible
>  	 * CPUs, and initialize the per-CPU data pointers for CPUs that are
>  	 * currently enabled.
> -	 *
> -	 * The size of the aggregation data is to be increased by the size of
> -	 * the latch sequence number.  That yields the actual size of the data
> -	 * allocated per CPU.
>  	 */
> -	aggsz += sizeof(uint64_t);
>  	agp->dtat_buf = dt_zalloc(dtp, dtp->dt_conf.num_possible_cpus * aggsz);
>  	if (agp->dtat_buf == NULL)
>  		return dt_set_errno(dtp, EDT_NOMEM);
> @@ -1044,7 +1040,7 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
>  	dt_idhash_iter(dtp->dt_aggs, (dt_idhash_f *) init_minmax,
>  		       agp->dtat_buf);
>  	if (*(int64_t *)agp->dtat_buf != 0) {
> -		*(int64_t *)agp->dtat_buf = 0;	/* clear the flag */
> +		*(int64_t *)agp->dtat_buf = 0;  /* clear the flag */

If the flag is 0, no init data was set so there is no need to update the
map value.  So, this should simply be:

  	if (*(int64_t *)agp->dtat_buf == 0)
		return 0;

and the the remainder of the function is clearing the flag, copying the init
data, and doing the update.

>  		for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
>  			int	cpu = dtp->dt_conf.cpus[i].cpu_id;
> @@ -1055,7 +1051,6 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
>  
>  			memcpy(agp->dtat_cpu_buf[cpu], agp->dtat_buf, aggsz);
>  		}
> -
>  		dt_bpf_map_update(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
>  	}
>  
> diff --git a/libdtrace/dt_bpf.c b/libdtrace/dt_bpf.c
> index c6102f15..1705c990 100644
> --- a/libdtrace/dt_bpf.c
> +++ b/libdtrace/dt_bpf.c
> @@ -223,16 +223,12 @@ dt_bpf_gmap_create(dtrace_hdl_t *dtp)
>  		return -1;		/* dt_errno is set for us */
>  
>  	/*
> -	 * If there is aggregation data to be collected, we need to create the
> -	 * 'aggs' BPF map, and account for a uint64_t in the map value size to
> -	 * hold a latch sequence number (seq) for concurrent access to the
> -	 * data.
> +	 * Check if there is aggregation data to be collected.
>  	 */
>  	if (aggsz > 0) {
>  		dtp->dt_aggmap_fd = create_gmap(dtp, "aggs",
>  						BPF_MAP_TYPE_PERCPU_ARRAY,
> -						sizeof(uint32_t),
> -						sizeof(uint64_t) + aggsz, 1);
> +						sizeof(uint32_t), aggsz, 1);
>  		if (dtp->dt_aggmap_fd == -1)
>  			return -1;	/* dt_errno is set for us */
>  	}
> diff --git a/libdtrace/dt_cg.c b/libdtrace/dt_cg.c
> index 8e440c8e..972b8483 100644
> --- a/libdtrace/dt_cg.c
> +++ b/libdtrace/dt_cg.c
> @@ -3033,13 +3033,14 @@ dt_cg_node(dt_node_t *dnp, dt_irlist_t *dlp, dt_regset_t *drp)
>   * identifier (if not set yet).
>   *
>   * We consume twice the required data size because of the odd/even data pair
> - * mechanism to provide lockless, write-wait-free operation.
> + * mechanism to provide lockless, write-wait-free operation.  Additionally,
> + * we make room for a latch sequence number.
>   */
>  #define DT_CG_AGG_SET_STORAGE(aid, sz) \
>  	do { \
>  		if ((aid)->di_offset == -1) \
>  			dt_ident_set_storage((aid), sizeof(uint64_t), \
> -					     2 * (sz)); \
> +					     sizeof(uint64_t) + 2 * (sz)); \
>  	} while (0)
>  
>  /*
> @@ -3047,7 +3048,7 @@ dt_cg_node(dt_node_t *dnp, dt_irlist_t *dlp, dt_regset_t *drp)
>   * return a register that holds a pointer to the aggregation data to be
>   * updated.
>   *
> - * We update the latch seqcount (first value in the aggregation buffer) to
> + * We update the latch seqcount (first value in the aggregation) to
>   * signal that reads should be directed to the alternate copy of the data.  We
>   * then determine the location of data for the given aggregation that can be
>   * updated.  This value is stored in the register returned from this function.
> @@ -3056,49 +3057,47 @@ static int
>  dt_cg_agg_buf_prepare(dt_ident_t *aid, int size, dt_irlist_t *dlp,
>  		      dt_regset_t *drp)
>  {
> -	int		rx, ry;
> +	int		ragd, roff;
>  
>  	TRACE_REGSET("            Prep: Begin");
>  
>  	dt_regset_xalloc(drp, BPF_REG_0);
> -	rx = dt_regset_alloc(drp);
> -	ry = dt_regset_alloc(drp);
> -	assert(rx != -1 && ry != -1);
> +	ragd = dt_regset_alloc(drp);
> +	roff = dt_regset_alloc(drp);
> +	assert(ragd != -1 && roff != -1);
>  
>  	/*
> -	 *				//     (%rX = register for agg ptr)
> -	 *				//     (%rY = register for agg data)
>  	 *	agd = dctx->agg;	// lddw %r0, [%fp + DT_STK_DCTX]
> -	 *				// lddw %rX, [%r0 + DCTX_AGG]
> +	 *				// lddw %ragd, [%r0 + DCTX_AGG]
> +	 *	agd += aid->di_offset	// %ragd += aid->di_offset
>  	 */
>  	emit(dlp, BPF_LOAD(BPF_DW, BPF_REG_0, BPF_REG_FP, DT_STK_DCTX));
> -	emit(dlp, BPF_LOAD(BPF_DW, rx, BPF_REG_0, DCTX_AGG));
> +	emit(dlp, BPF_LOAD(BPF_DW, ragd, BPF_REG_0, DCTX_AGG));
> +	emit(dlp, BPF_ALU64_IMM(BPF_ADD, ragd, aid->di_offset));
>  
>  	/*
> -	 *	off = (*agd % 2) * size	// lddw %rY, [%rX + 0]
> -	 *	      + aid->di_offset	// and %rY, 1
> -	 *	      + sizeof(uint64_t);
> -	 *				// mul %rY, size
> -	 *				// add %rY, aid->di_offset +
> -	 *				//		sizeof(uint64_t)
> +	 *	off = (*agd & 1) * size	// lddw %roff, [%ragd + 0]
> +	 *	      + sizeof(uint64_t);	// and %roff, 1
> +	 *				// mul %roff, size
> +	 *				// add %roff, sizeof(uint64_t)
>  	 *	(*agd)++;		// mov %r0, 1
> -	 *				// xadd [%rX + 0], %r0
> -	 *	agd += off;		// add %rX, %rY
> +	 *				// xadd [%ragd + 0], %r0
> +	 *	agd += off;		// add %ragd, %roff
>  	 */
> -	emit(dlp, BPF_LOAD(BPF_DW, ry, rx, 0));
> -	emit(dlp, BPF_ALU64_IMM(BPF_AND, ry, 1));
> -	emit(dlp, BPF_ALU64_IMM(BPF_MUL, ry, size));
> -	emit(dlp, BPF_ALU64_IMM(BPF_ADD, ry, aid->di_offset + sizeof(uint64_t)));
> +	emit(dlp, BPF_LOAD(BPF_DW, roff, ragd, 0));
> +	emit(dlp, BPF_ALU64_IMM(BPF_AND, roff, 1));
> +	emit(dlp, BPF_ALU64_IMM(BPF_MUL, roff, size));
> +	emit(dlp, BPF_ALU64_IMM(BPF_ADD, roff, sizeof(uint64_t)));
>  	emit(dlp, BPF_MOV_IMM(BPF_REG_0, 1));
> -	emit(dlp, BPF_XADD_REG(BPF_DW, rx, 0, BPF_REG_0));
> -	emit(dlp, BPF_ALU64_REG(BPF_ADD, rx, ry));
> +	emit(dlp, BPF_XADD_REG(BPF_DW, ragd, 0, BPF_REG_0));
> +	emit(dlp, BPF_ALU64_REG(BPF_ADD, ragd, roff));
>  
> -	dt_regset_free(drp, ry);
> +	dt_regset_free(drp, roff);
>  	dt_regset_free(drp, BPF_REG_0);
>  
>  	TRACE_REGSET("            Prep: End  ");
>  
> -	return rx;
> +	return ragd;
>  }
>  
>  #define DT_CG_AGG_IMPL(aid, sz, dlp, drp, f, ...) \
> diff --git a/libdtrace/dt_map.c b/libdtrace/dt_map.c
> index 0c9495d7..cac295fa 100644
> --- a/libdtrace/dt_map.c
> +++ b/libdtrace/dt_map.c
> @@ -274,11 +274,16 @@ dt_aggid_add(dtrace_hdl_t *dtp, const dt_ident_t *aid)
>  	if (agg == NULL)
>  		return dt_set_errno(dtp, EDT_NOMEM);
>  
> +	/*
> +	 * Note the relationship between the aggregation storage
> +	 * size (di_size) and the aggregation data size (dtagd_size):
> +	 *     di_size = dtagd_size * (# agg copies) + (size of latch seq #)
> +	 */
>  	agg->dtagd_id = id;
>  	agg->dtagd_name = aid->di_name;
>  	agg->dtagd_sig = ((dt_idsig_t *)aid->di_data)->dis_auxinfo;
>  	agg->dtagd_varid = aid->di_id;
> -	agg->dtagd_size = aid->di_size / 2;
> +	agg->dtagd_size = (aid->di_size - sizeof(uint64_t))/ 2;
>  	agg->dtagd_nrecs = agg->dtagd_size / sizeof(uint64_t);
>  
>  	recs = dt_calloc(dtp, agg->dtagd_nrecs, sizeof(dtrace_recdesc_t));
> diff --git a/test/unittest/aggs/tst.empty.d b/test/unittest/aggs/tst.empty.d
> new file mode 100644
> index 00000000..784e1dd9
> --- /dev/null
> +++ b/test/unittest/aggs/tst.empty.d
> @@ -0,0 +1,29 @@
> +/*
> + * Oracle Linux DTrace.
> + * Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
> + * Licensed under the Universal Permissive License v 1.0 as shown at
> + * http://oss.oracle.com/licenses/upl.
> + */
> +
> +/*
> + * ASSERTION: Empty aggregations are not printed
> + *
> + * SECTION: Aggregations/Aggregations
> + */
> +
> +#pragma D option quiet
> +
> +BEGIN
> +{
> +	i = 12345678;
> +	@a = sum(i);
> +	exit(0);
> +}
> +
> +/* this probe should not fire */
> +tick-1hour
> +{
> +	/* these aggregations should not be printed */
> +	@b = min(i);
> +	@c = max(i);
> +}
> diff --git a/test/unittest/aggs/tst.empty.r b/test/unittest/aggs/tst.empty.r
> new file mode 100644
> index 00000000..aab82b76
> --- /dev/null
> +++ b/test/unittest/aggs/tst.empty.r
> @@ -0,0 +1,2 @@
> +
> +         12345678
> -- 
> 2.18.4
> 
> 
> _______________________________________________
> DTrace-devel mailing list
> DTrace-devel at oss.oracle.com
> https://oss.oracle.com/mailman/listinfo/dtrace-devel



More information about the DTrace-devel mailing list