[DTrace-devel] [Review] [PATCH] Implementation of aggregations: max(), min(), sum(), and stddev()

Kris Van Hees kris.van.hees at oracle.com
Wed Dec 9 01:06:48 PST 2020


On Tue, Dec 08, 2020 at 11:51:25AM -0800, david.mclean at oracle.com wrote:
> Please review.
> 

> >From 6b274ef291c0e9e358d0c08abaed53153603f318 Mon Sep 17 00:00:00 2001
> From: David  Mc Lean <david.mclean at oracle.com>
> Date: Mon, 7 Dec 2020 23:41:34 -0800
> Subject: [PATCH] Implementation of aggregations: max(), min(), sum(), and
>  stddev()

There should definitely be a commit message here, especially since you are
performing some data initialization that is crucial to the correct operation
of min() and max().  Also, some commentary on the algorithm used for the
stddev calculations would be welcome.

> Signed-off-by: David Mc Lean <david.mclean at oracle.com>
> ---
>  libdtrace/dt_aggregate.c |  56 +++++++++++-
>  libdtrace/dt_cg.c        | 218 +++++++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 273 insertions(+), 1 deletion(-)
> 
> diff --git a/libdtrace/dt_aggregate.c b/libdtrace/dt_aggregate.c
> index 5fd79de..5f14a8f 100644
> --- a/libdtrace/dt_aggregate.c
> +++ b/libdtrace/dt_aggregate.c
> @@ -907,12 +907,49 @@ dt_aggregate_bundlecmp(const void *lhs, const void *rhs)
>  	}
>  }
>  
> +/*
> + * Callback for initializing the min() and max() aggregation functions.

It is not the aggregation functions that are being initialized but rather the
aggregation data.  Maybe just say this is a callback to set the initial values
for max() and min() aggregations?

> + * The minimum 64 bit value is set for max() and the maximum 64 bit value is
> + * set for min(), so that any other value fed to the functions will register
> + * properly.
> + *
> + * The first 64 bits of the buffer passed in is used as a flag.  The flag is
> + * used so the map update only runs when there is something to update.

Since all data items are 64-bit integers, you can simply refer to the first
value in the buffer rather than the first 64 bits.  So, something like:

"The first value in the buffer is used as a flag to indicate whether an
 initial value was stored in the buffer."

You also want to mention why it is safe to use that first value as a flag at
this point.

> + */
> +static int
> +init_minmax(dt_idhash_t *dhp, dt_ident_t *idp, char *aggset)

A typical name for the 3rd argument here would be 'buf' because we're dealing
with a buffer.  There is not much value in using non-standard names.

> +{
> +	dt_ident_t *id = idp->di_iarg;

This is typically named fid rather than id because it is the function id.
.
> +	int64_t value, *point;

I am not sure why you name this variable "point".  Perhaps "ptr" for pointer
might be better?

> +	assert(idp->di_kind == DT_IDENT_AGG);
> +	assert(id);
> +	if (id->di_id == DT_AGG_MIN)
> +		value = INT64_MAX;
> +	else if (id->di_id == DT_AGG_MAX)
> +		value = INT64_MIN;
> +	else
> +		return 0;
> +
> +	/* Use the previously unpopulated first word as a flag of activity */
> +	point    = aggset;
> +	*point   = value;

How about simply buf[0] = 1 if buf is int64_t *?

> +	point    = aggset + idp->di_offset + sizeof(int64_t);
> +	*point++ = value;
> +	*point   = value;

How about:

	*(int64_t *)buf = 1;

	ptr = (int64 *)(buf + sizeof(int64_t) + idp->di_offset);
	ptr[0] = value;
	ptr[1] = value;

> +	return 0;
> +}
> +
>  int
>  dt_aggregate_go(dtrace_hdl_t *dtp)
>  {
>  	dt_aggregate_t	*agp = &dtp->dt_aggregate;
>  	dt_ahash_t	*agh = &agp->dtat_hash;
>  	int		aggsz, i;
> +	uint32_t	key = 0;
> +	int64_t		*wdpnt;

No need for "wdpnt".  I am also not sure what that name means.


>  	/* If there are no aggregations there is nothing to do. */
>  	aggsz = dt_idhash_datasize(dtp->dt_aggs);
> @@ -921,7 +958,11 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
>  
>  	/*
>  	 * Allocate a buffer to hold the aggregation data for all possible
> -	 * CPUs, and initialize the per-CPU data pointers for CPUs that are
> +	 * CPUs.
> +	 *
> +	 * Initialize the starting values for min() and max() aggregations.
> +	 *
> +	 * Initialize the per-CPU data pointers for CPUs that are
>  	 * currently enabled.
>  	 *
>  	 * The size of the aggregation data is to be increased by the size of
> @@ -933,6 +974,19 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
>  	if (agp->dtat_buf == NULL)
>  		return dt_set_errno(dtp, EDT_NOMEM);
>  
> +	wdpnt = agp->dtat_buf;
> +	assert(*wdpnt == 0); /* Assume allocated value is zeroed out */

These two statements are not needed.  You can access agp->dtat_buf directly
so there is no need for wdpnt, and the assert is unnecessary because the
buffer is allocated using dt_calloc() which zeros out the memory.

> +	dt_idhash_iter(dtp->dt_aggs, (dt_idhash_f *) init_minmax,
> +		       agp->dtat_buf);
> +	if (*wdpnt != 0) {  /* Flag is set for min() and/or max() updates */
> +		*wdpnt = 0; /* Clear the flag */

Just use agp->dtat_buf directly rather than wdpnt.

> +		for (i = 1; i < dtp->dt_conf.num_online_cpus; i++) {
> +			memcpy(agp->dtat_buf + aggsz * i, agp->dtat_buf, aggsz);
> +		}

This is wrong.  You are looping over the list of CPUs but not taking into
account that the ids of enabled CPUs may not be a continuous linear sequence.
In other words, there may be 4 CPUs enabled, but they are not necessarily
known on the system as CPU 0, 1, 2, and 3.  So, you either need to copy the
initialization data to all CPUs with an id <= max_cpuid, or you need to use
the actual cpuid for the enabled CPUs.  The code that sets up the per-CPU
data buffers further down in this function does that.

> +		dt_bpf_map_update(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
> +	}
> +
>  	agp->dtat_cpu_buf = dt_calloc(dtp, dtp->dt_conf.max_cpuid + 1,
>  				      sizeof(char *));
>  	if (agp->dtat_cpu_buf == NULL) {
> diff --git a/libdtrace/dt_cg.c b/libdtrace/dt_cg.c
> index 2d73b43..85d8358 100644
> --- a/libdtrace/dt_cg.c
> +++ b/libdtrace/dt_cg.c
> @@ -3615,12 +3615,68 @@ dt_cg_agg_lquantize(dt_pcb_t *pcb, dt_ident_t *aid, dt_node_t *dnp,
>  }
>  
>  static void
> +dt_cg_agg_max_impl(dt_irlist_t *dlp, dt_regset_t *drp, int dreg, int vreg)
> +{
> +	int     treg;   /* register for temporary use value */
> +	uint_t  Lmax = dt_irlist_label(dlp);
> +
> +	TRACE_REGSET("            Impl: Begin");
> +
> +	if ((treg = dt_regset_alloc(drp)) == -1)
> +		longjmp(yypcb->pcb_jmpbuf, EDT_NOREG);
> +	/* %treg = *(%dreg) */
> +	emit(dlp, BPF_LOAD(BPF_DW, treg, dreg, 0));
> +	/* skip if not a new max value */
> +	emit(dlp, BPF_BRANCH_REG(BPF_JSGE, treg, vreg, Lmax));
> +	/* keep the new highest value; *(%dreg) = %vreg */
> +	emit(dlp, BPF_STORE(BPF_DW, dreg, 0, vreg));
> +	emitl(dlp, Lmax,
> +		   BPF_NOP());
> +	dt_regset_free(drp, treg);

By convention, we are usually aligning the BPF_ arguments to the emit
macros.  So, if there id an emite or emitl, then the BPF_ arguments to
emit macros get an extra space before them.

> +
> +	TRACE_REGSET("            Impl: End  ");
> +}
> +
> +static void
>  dt_cg_agg_max(dt_pcb_t *pcb, dt_ident_t *aid, dt_node_t *dnp,
>  	      dt_irlist_t *dlp, dt_regset_t *drp)
>  {
>  	int	sz = 1 * sizeof(uint64_t);
>  
>  	DT_CG_AGG_SET_STORAGE(aid, sz);
> +
> +	TRACE_REGSET("    AggMax : Begin");
> +
> +	dt_cg_node(dnp->dn_aggfun->dn_args, dlp, drp);
> +	DT_CG_AGG_IMPL(aid, sz, dlp, drp, dt_cg_agg_max_impl,
> +		       dnp->dn_aggfun->dn_args->dn_reg);
> +	dt_regset_free(drp, dnp->dn_aggfun->dn_args->dn_reg);
> +
> +	TRACE_REGSET("    AggMax : End  ");
> +
> +}
> +
> +static void
> +dt_cg_agg_min_impl(dt_irlist_t *dlp, dt_regset_t *drp, int dreg, int vreg)
> +{
> +	int     treg;   /* register for temporary use value */
> +	uint_t  Lmin = dt_irlist_label(dlp);
> +
> +	TRACE_REGSET("            Impl: Begin");
> +
> +	if ((treg = dt_regset_alloc(drp)) == -1)
> +		longjmp(yypcb->pcb_jmpbuf, EDT_NOREG);
> +	/* %treg = *(%dreg) */
> +	emit(dlp, BPF_LOAD(BPF_DW, treg, dreg, 0));
> +	/* skip if not a new min value */
> +	emit(dlp, BPF_BRANCH_REG(BPF_JSLE, treg, vreg, Lmin));
> +	/* keep the new lowest value; *(%dreg) = %vreg */
> +	emit(dlp, BPF_STORE(BPF_DW, dreg, 0, vreg));
> +	emitl(dlp, Lmin,
> +		   BPF_NOP());
> +	dt_regset_free(drp, treg);
> +
> +	TRACE_REGSET("            Impl: End  ");
>  }
>  
>  static void
> @@ -3630,6 +3686,15 @@ dt_cg_agg_min(dt_pcb_t *pcb, dt_ident_t *aid, dt_node_t *dnp,
>  	int	sz = 1 * sizeof(uint64_t);
>  
>  	DT_CG_AGG_SET_STORAGE(aid, sz);
> +
> +	TRACE_REGSET("    AggMin : Begin");
> +
> +	dt_cg_node(dnp->dn_aggfun->dn_args, dlp, drp);
> +	DT_CG_AGG_IMPL(aid, sz, dlp, drp, dt_cg_agg_min_impl,
> +		       dnp->dn_aggfun->dn_args->dn_reg);
> +	dt_regset_free(drp, dnp->dn_aggfun->dn_args->dn_reg);
> +
> +	TRACE_REGSET("    AggMin : End  ");
>  }
>  
>  static void
> @@ -3645,12 +3710,156 @@ dt_cg_agg_quantize(dt_pcb_t *pcb, dt_ident_t *aid, dt_node_t *dnp,
>  }
>  
>  static void
> +dt_cg_agg_stddev_impl(dt_irlist_t *dlp, dt_regset_t *drp, int dreg, int vreg,
> +		      uint64_t hreg, uint64_t lreg)
> +{
> +	int onereg, lowreg;
> +
> +	/* label */
> +	uint_t  Lncr = dt_irlist_label(dlp);
> +
> +	TRACE_REGSET("            Impl: Begin");
> +
> +	/* Get some registers to work with */
> +	if ((onereg = dt_regset_alloc(drp)) == -1 ||
> +	    (lowreg = dt_regset_alloc(drp)) == -1)
> +		longjmp(yypcb->pcb_jmpbuf, EDT_NOREG);
> +
> +	/* Set onereg to a value of 1 for repeated use */
> +	emit(dlp, BPF_MOV_IMM(onereg, 1));
> +
> +	/* dst[0]++; increment count of calls eq. value count */
> +	emit(dlp, BPF_XADD_REG(BPF_DW, dreg, 0, onereg));
> +
> +	/* dst[1] += val; add value to the total sum */
> +	emit(dlp, BPF_XADD_REG(BPF_DW, dreg, 8, vreg));
> +
> +	/* Add double-dst[2-3] += sq(val) starting here */
> +
> +	/* Load the lower half (64 bits) of the previous established value */
> +	emit(dlp, BPF_LOAD(BPF_DW, lowreg, dreg, 16));
> +
> +	/* Add low part of the value; *lowreg += *lreg */
> +	emit(dlp, BPF_ALU64_REG(BPF_ADD, lowreg, lreg));
> +
> +	/* Handle the overflow/carry case; overflow if lowreg < lreg */
> +	emit(dlp, BPF_BRANCH_REG(BPF_JLE, lreg, lowreg, Lncr));
> +	emit(dlp, BPF_XADD_REG(BPF_DW, dreg, 24, onereg)) /* Carry */;
> +	emitl(dlp, Lncr,
> +		   BPF_STORE(BPF_DW, dreg, 16, lowreg));
> +
> +	emit(dlp, BPF_XADD_REG(BPF_DW, dreg, 24, hreg)); /* Add higher half */
> +
> +	dt_regset_free(drp, onereg);
> +	dt_regset_free(drp, lowreg);
> +
> +	TRACE_REGSET("            Impl: End  ");
> +}
> +
> +
> +static void
>  dt_cg_agg_stddev(dt_pcb_t *pcb, dt_ident_t *aid, dt_node_t *dnp,
>  		 dt_irlist_t *dlp, dt_regset_t *drp)
>  {
>  	int	sz = 4 * sizeof(uint64_t);
> +	uint64_t  hi_reg, lowreg, midreg, lmdreg;
> +
> +	/* labels */
> +	uint_t  Lpos = dt_irlist_label(dlp);
> +	uint_t  Lncy = dt_irlist_label(dlp);
>  
>  	DT_CG_AGG_SET_STORAGE(aid, sz);
> +
> +	TRACE_REGSET("    AggSDv : Begin");
> +
> +	dt_cg_node(dnp->dn_aggfun->dn_args, dlp, drp);
> +
> +	/* Handle sq(val) starting here; 128-bit answer in 2 64-bit registers */
> +	/* val = (uint64_t)dnp->dn_aggfun->dn_args->dn_value; */
> +	/* sqr = (__int128)val * val; */
> +	/* low = sqr & 0xFFFFFFFFFFFFFFFFULL; */
> +	/* hi_ = sqr >> 64; */
> +	/* get some registers to work with */
> +	if ((hi_reg = dt_regset_alloc(drp)) == -1 ||
> +	    (lowreg = dt_regset_alloc(drp)) == -1 ||
> +	    (midreg = dt_regset_alloc(drp)) == -1 ||
> +	    (lmdreg = dt_regset_alloc(drp)) == -1)
> +		longjmp(yypcb->pcb_jmpbuf, EDT_NOREG);
> +
> +	/* If value is negative make it positive, sq() is positive anyhow */
> +	emit(dlp, BPF_MOV_REG(lowreg, dnp->dn_aggfun->dn_args->dn_reg));
> +	emit(dlp, BPF_BRANCH_IMM(BPF_JSGE, lowreg, 0, Lpos));
> +	emit(dlp, BPF_NEG_REG(lowreg))  /* Change to a positive value */;
> +
> +	/* Now we are sure lowreg holds our absolute value */
> +	/* Split our value into two pieces: high and low */
> +	emitl(dlp, Lpos,
> +		   BPF_MOV_REG(hi_reg, lowreg));
> +
> +	/* Put least significant half in lowreg */
> +	/* Implicit AND of lowreg with 0xFFFFFFFF mask */
> +	emit(dlp, BPF_ALU64_IMM(BPF_LSH, lowreg, 32));
> +	emit(dlp, BPF_ALU64_IMM(BPF_RSH, lowreg, 32));
> +	/* Put most significant half in (the lower part of) hi_reg */
> +	emit(dlp, BPF_ALU64_IMM(BPF_RSH, hi_reg, 32));
> +
> +	/* Multiply using similar approach to algebraic FOIL of binomial */
> +	/* That is: high*high + 2*high*low + low*low */
> +	/* Contain interim values to 64 bits or account for carry */
> +
> +	/* First multiply high times low value */
> +	/* Value must be doubled for answer, eventually */
> +	/* This result represents a value shifted 32 bits to the right */
> +	emit(dlp, BPF_MOV_REG(midreg, lowreg));
> +	emit(dlp, BPF_ALU64_REG(BPF_MUL, midreg, hi_reg));
> +	/* Product of low values */
> +	emit(dlp, BPF_ALU64_REG(BPF_MUL, lowreg, lowreg));
> +	/* Product of high values; result represented shifted 64 to the right */
> +	emit(dlp, BPF_ALU64_REG(BPF_MUL, hi_reg, hi_reg));
> +
> +	/* Now add the pieces in their proper place */
> +
> +	/* Get the bottom half of the mid value to add to the low value */
> +	emit(dlp, BPF_MOV_REG(lmdreg, midreg));
> +	/* The mid values are doubled, as O and I in FOIL are for a square */
> +	/* Adjust for shifted representation with respect to low half value */
> +	/* Implicit AND of lmdreg (lower mid) with 0xFFFFFFFF mask, times 2 */
> +	emit(dlp, BPF_ALU64_IMM(BPF_LSH, lmdreg, 33));
> +	/* Upper half times 2 + carry bit from lower half x2 */
> +	emit(dlp, BPF_ALU64_IMM(BPF_RSH, midreg, 31));
> +
> +	/* Add low value part from mid to lowreg */
> +	emit(dlp, BPF_ALU64_REG(BPF_ADD, lowreg, lmdreg));
> +	/* Handle the overflow/carry case */
> +	emit(dlp, BPF_BRANCH_REG(BPF_JLT, lmdreg, lowreg, Lncy));
> +	emit(dlp, BPF_ALU64_IMM(BPF_ADD, hi_reg, 1)) /* account for carry */;
> +
> +	/* Sum high value; no overflow expected nor accounted for */
> +	emitl(dlp, Lncy,
> +		   BPF_ALU64_REG(BPF_ADD, hi_reg, midreg));
> +
> +	dt_regset_free(drp, lmdreg);
> +	dt_regset_free(drp, midreg);
> +
> +	DT_CG_AGG_IMPL(aid, sz, dlp, drp, dt_cg_agg_stddev_impl,
> +		       dnp->dn_aggfun->dn_args->dn_reg, hi_reg, lowreg);
> +
> +	dt_regset_free(drp, dnp->dn_aggfun->dn_args->dn_reg);
> +	dt_regset_free(drp, hi_reg);
> +	dt_regset_free(drp, lowreg);
> +
> +	TRACE_REGSET("    AggSDv : End  ");
> +}
> +
> +static void
> +dt_cg_agg_sum_impl(dt_irlist_t *dlp, dt_regset_t *drp, int dreg, int vreg)
> +{
> +	TRACE_REGSET("            Impl: Begin");
> +
> +	/* *(%dreg) += %vreg */
> +	emit(dlp, BPF_XADD_REG(BPF_DW, dreg, 0, vreg));
> +
> +	TRACE_REGSET("            Impl: End  ");
>  }
>  
>  static void
> @@ -3660,6 +3869,15 @@ dt_cg_agg_sum(dt_pcb_t *pcb, dt_ident_t *aid, dt_node_t *dnp,
>  	int	sz = 1 * sizeof(uint64_t);
>  
>  	DT_CG_AGG_SET_STORAGE(aid, sz);
> +
> +	TRACE_REGSET("    AggSum : Begin");
> +
> +	dt_cg_node(dnp->dn_aggfun->dn_args, dlp, drp);
> +	DT_CG_AGG_IMPL(aid, sz, dlp, drp, dt_cg_agg_sum_impl,
> +		       dnp->dn_aggfun->dn_args->dn_reg);
> +	dt_regset_free(drp, dnp->dn_aggfun->dn_args->dn_reg);
> +
> +	TRACE_REGSET("    AggSum : End  ");
>  }
>  
>  typedef void dt_cg_aggfunc_f(dt_pcb_t *, dt_ident_t *, dt_node_t *,
> -- 
> 1.8.3.1
> 

> _______________________________________________
> DTrace-devel mailing list
> DTrace-devel at oss.oracle.com
> https://oss.oracle.com/mailman/listinfo/dtrace-devel




More information about the DTrace-devel mailing list