[DTrace-devel] [PATCH v2 2/4] Aggregations are not appropriately reset
Kris Van Hees
kris.van.hees at oracle.com
Wed Jan 13 12:15:31 PST 2021
Reviewed-by: Kris Van Hees <kris.van.hees at oracle.com>
... provided you leave out the #ifdef UNUSED ... #endif that doesn't belong
in this patch
On Mon, Jan 11, 2021 at 02:58:43PM -0500, eugene.loh at oracle.com wrote:
> From: Eugene Loh <eugene.loh at oracle.com>
>
> In DTv1, each user-space aggregation snapshot would read data from the
> kernel and aggregate, not only over CPUs but also with previous
> snapshots.
>
> In DTv2, the kernel maintains aggregations in BPF maps. Each user-space
> snapshot should still aggregate over CPUs, but not with previous data.
>
> Reset aggregations appropriately with each snapshot.
>
> Tweak testing to catch this problem. The test suite will catch this
> problem even more once other functionality has been implemented.
>
> https://github.com/oracle/dtrace-utils/issues/4
> Signed-off-by: Eugene Loh <eugene.loh at oracle.com>
> ---
> libdtrace/dt_aggregate.c | 99 +++++++++++++----------
> test/unittest/aggs/tst.llquantize_basic.d | 1 -
> test/unittest/aggs/tst.reset.d | 38 +++++++++
> test/unittest/aggs/tst.reset.r | 86 ++++++++++++++++++++
> test/unittest/multiaggs/tst.same.d | 1 -
> 5 files changed, 181 insertions(+), 44 deletions(-)
> create mode 100644 test/unittest/aggs/tst.reset.d
> create mode 100644 test/unittest/aggs/tst.reset.r
>
> diff --git a/libdtrace/dt_aggregate.c b/libdtrace/dt_aggregate.c
> index 389ede8c..186e44dd 100644
> --- a/libdtrace/dt_aggregate.c
> +++ b/libdtrace/dt_aggregate.c
> @@ -282,6 +282,7 @@ dt_aggregate_quantizedcmp(int64_t *lhs, int64_t *rhs)
> return 0;
> }
>
> +#ifdef UNUSED
> static void
> dt_aggregate_usym(dtrace_hdl_t *dtp, uint64_t *data)
> {
> @@ -382,6 +383,7 @@ dt_aggregate_mod(dtrace_hdl_t *dtp, uint64_t *addr)
> }
> }
> }
> +#endif
>
> static dtrace_aggid_t
> dt_aggregate_aggid(dt_ahashent_t *ent)
Let's leave the above 2 (related) changes out of this patch.
> @@ -404,13 +406,51 @@ dt_aggregate_aggid(dt_ahashent_t *ent)
> return agg->dtagd_varid;
> }
>
> +typedef void (*agg_cpu_f)(dt_ident_t *aid,
> + int64_t *dst, int64_t *src, uint_t realsz);
> +
> typedef struct dt_snapstate {
> dtrace_hdl_t *dtp;
> processorid_t cpu;
> char *buf;
> dt_aggregate_t *agp;
> + agg_cpu_f fun;
> } dt_snapstate_t;
>
> +static void
> +dt_agg_one_copy(dt_ident_t *aid, int64_t *dst, int64_t *src, uint_t realsz)
> +{
> + src++; /* skip latch sequence number */
> + memcpy(dst, src, realsz);
> +}
> +
> +static void
> +dt_agg_one_agg(dt_ident_t *aid, int64_t *dst, int64_t *src, uint_t realsz)
> +{
> + uint_t i, cnt;
> +
> + if (*src == 0)
> + return;
> +
> + src++; /* skip latch sequence number */
> + switch (((dt_ident_t *)aid->di_iarg)->di_id) {
> + case DT_AGG_MAX:
> + if (*src > *dst)
> + *dst = *src;
> +
> + break;
> + case DT_AGG_MIN:
> + if (*src < *dst)
> + *dst = *src;
> +
> + break;
> + default:
> + for (i = 0, cnt = realsz / sizeof(int64_t);
> + i < cnt; i++, dst++, src++)
> + *dst += *src;
> + }
> +}
> +
> static int
> dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
> {
> @@ -421,70 +461,42 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
> uint64_t hval = aid->di_id;
> size_t ndx = hval % agh->dtah_size;
> int rval;
> - uint_t i;
> + uint_t i, realsz;
> int64_t *src;
> - int realsz;
>
> rval = dt_aggid_lookup(st->dtp, aid->di_id, &agg);
> if (rval != 0)
> return rval;
>
> - /* check latch sequence number to see if there is any data */
> + /* point to the latch sequence number */
> src = (int64_t *)(st->buf + aid->di_offset);
> - if (*src == 0)
> - return 0;
> - src++;
>
> /* real size excludes latch sequence number and second data copy */
> realsz = (aid->di_size - sizeof(uint64_t)) / 2;
>
> /* See if we already have an entry for this aggregation. */
> for (h = agh->dtah_hash[ndx]; h != NULL; h = h->dtahe_next) {
> - int64_t *dst, *cpu_dst;
> - uint_t cnt;
> -
> - if (h->dtahe_hval != hval)
> - continue;
> - if (h->dtahe_size != realsz)
> + if (h->dtahe_hval != hval || h->dtahe_size != realsz)
> continue;
>
> /* Entry found - process the data. */
> agd = &h->dtahe_data;
> - dst = (int64_t *)agd->dtada_data;
> - cpu_dst = agd->dtada_percpu != NULL
> - ? (int64_t *)agd->dtada_percpu[st->cpu]
> - : NULL;
> -
> -accumulate:
> - switch (((dt_ident_t *)aid->di_iarg)->di_id) {
> - case DT_AGG_MAX:
> - if (*src > *dst)
> - *dst = *src;
> -
> - break;
> - case DT_AGG_MIN:
> - if (*src < *dst)
> - *dst = *src;
>
> - break;
> - default:
> - for (i = 0, cnt = realsz / sizeof(int64_t);
> - i < cnt; i++, dst++, src++)
> - *dst += *src;
> - }
> + st->fun(aid, (int64_t *)agd->dtada_data, src, realsz);
>
> /* If we keep per-CPU data - process that as well. */
> - if (cpu_dst != NULL) {
> - dst = cpu_dst;
> - cpu_dst = NULL;
> -
> - goto accumulate;
> - }
> + if (agd->dtada_percpu != NULL)
> + st->fun(aid, (int64_t *)agd->dtada_percpu[st->cpu],
> + src, realsz);
>
> return 0;
> }
>
> - /* Not found - add it. */
> + /* not found, so skip it if latch sequence number is 0 */
> + if (*src == 0)
> + return 0;
> +
> + /* add it to the hash table */
> h = dt_zalloc(st->dtp, sizeof(dt_ahashent_t));
> if (h == NULL)
> return dt_set_errno(st->dtp, EDT_NOMEM);
> @@ -496,6 +508,7 @@ accumulate:
> return dt_set_errno(st->dtp, EDT_NOMEM);
> }
>
> + src++;
> memcpy(agd->dtada_data, src, realsz);
> agd->dtada_size = realsz;
> agd->dtada_desc = agg;
> @@ -550,7 +563,7 @@ accumulate:
> }
>
> static int
> -dt_aggregate_snap_cpu(dtrace_hdl_t *dtp, processorid_t cpu)
> +dt_aggregate_snap_cpu(dtrace_hdl_t *dtp, processorid_t cpu, agg_cpu_f fun)
> {
> dt_aggregate_t *agp = &dtp->dt_aggregate;
> char *buf = agp->dtat_cpu_buf[cpu];
> @@ -560,6 +573,7 @@ dt_aggregate_snap_cpu(dtrace_hdl_t *dtp, processorid_t cpu)
> st.cpu = cpu;
> st.buf = buf;
> st.agp = agp;
> + st.fun = fun;
>
> return dt_idhash_iter(dtp->dt_aggs,
> (dt_idhash_f *)dt_aggregate_snap_one, &st);
> @@ -587,7 +601,8 @@ dtrace_aggregate_snap(dtrace_hdl_t *dtp)
> return dt_set_errno(dtp, -rval);
>
> for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
> - rval = dt_aggregate_snap_cpu(dtp, dtp->dt_conf.cpus[i].cpu_id);
> + rval = dt_aggregate_snap_cpu(dtp, dtp->dt_conf.cpus[i].cpu_id,
> + i == 0 ? dt_agg_one_copy : dt_agg_one_agg);
> if (rval != 0)
> return rval;
> }
> diff --git a/test/unittest/aggs/tst.llquantize_basic.d b/test/unittest/aggs/tst.llquantize_basic.d
> index 1aa30680..74d6fe7a 100644
> --- a/test/unittest/aggs/tst.llquantize_basic.d
> +++ b/test/unittest/aggs/tst.llquantize_basic.d
> @@ -4,7 +4,6 @@
> * Licensed under the Universal Permissive License v 1.0 as shown at
> * http://oss.oracle.com/licenses/upl.
> */
> -/* @@xfail: dtv2 */
>
> /*
> * ASSERTION:
> diff --git a/test/unittest/aggs/tst.reset.d b/test/unittest/aggs/tst.reset.d
> new file mode 100644
> index 00000000..2ab3a78e
> --- /dev/null
> +++ b/test/unittest/aggs/tst.reset.d
> @@ -0,0 +1,38 @@
> +/*
> + * Oracle Linux DTrace.
> + * Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
> + * Licensed under the Universal Permissive License v 1.0 as shown at
> + * http://oss.oracle.com/licenses/upl.
> + */
> +
> +/*
> + * ASSERTION: Printing aggregations repeatedly should reproduce results
> + *
> + * SECTION: Aggregations/Aggregations
> + */
> +
> +#pragma D option quiet
> +
> +BEGIN
> +{
> + i = 10;
> + j = 20;
> + k = 30;
> + l = 40;
> + m = 50;
> + @a = sum(i); @b = stddev(i); @c = count(); @d = avg(i); @e = quantize(i);
> + @a = sum(j); @b = stddev(j); @c = count(); @d = avg(j); @e = quantize(j);
> + @a = sum(k); @b = stddev(k); @c = count(); @d = avg(k); @e = quantize(k);
> + @a = sum(l); @b = stddev(l); @c = count(); @d = avg(l); @e = quantize(l);
> + @a = sum(m); @b = stddev(m); @c = count(); @d = avg(m); @e = quantize(m);
> + exit(0)
> +}
> +
> +END
> +{
> + printa(@a); printa(@b); printa(@c); printa(@d); printa(@e);
> + printa(@a); printa(@b); printa(@c); printa(@d); printa(@e);
> + printa(@a); printa(@b); printa(@c); printa(@d); printa(@e);
> + printa(@a); printa(@b); printa(@c); printa(@d); printa(@e);
> + printa(@a); printa(@b); printa(@c); printa(@d); printa(@e);
> +}
> diff --git a/test/unittest/aggs/tst.reset.r b/test/unittest/aggs/tst.reset.r
> new file mode 100644
> index 00000000..eb3447a7
> --- /dev/null
> +++ b/test/unittest/aggs/tst.reset.r
> @@ -0,0 +1,86 @@
> +
> + 150
> +
> + 14
> +
> + 5
> +
> + 30
> +
> +
> + value ------------- Distribution ------------- count
> + 4 | 0
> + 8 |@@@@@@@@ 1
> + 16 |@@@@@@@@@@@@@@@@ 2
> + 32 |@@@@@@@@@@@@@@@@ 2
> + 64 | 0
> +
> +
> + 150
> +
> + 14
> +
> + 5
> +
> + 30
> +
> +
> + value ------------- Distribution ------------- count
> + 4 | 0
> + 8 |@@@@@@@@ 1
> + 16 |@@@@@@@@@@@@@@@@ 2
> + 32 |@@@@@@@@@@@@@@@@ 2
> + 64 | 0
> +
> +
> + 150
> +
> + 14
> +
> + 5
> +
> + 30
> +
> +
> + value ------------- Distribution ------------- count
> + 4 | 0
> + 8 |@@@@@@@@ 1
> + 16 |@@@@@@@@@@@@@@@@ 2
> + 32 |@@@@@@@@@@@@@@@@ 2
> + 64 | 0
> +
> +
> + 150
> +
> + 14
> +
> + 5
> +
> + 30
> +
> +
> + value ------------- Distribution ------------- count
> + 4 | 0
> + 8 |@@@@@@@@ 1
> + 16 |@@@@@@@@@@@@@@@@ 2
> + 32 |@@@@@@@@@@@@@@@@ 2
> + 64 | 0
> +
> +
> + 150
> +
> + 14
> +
> + 5
> +
> + 30
> +
> +
> + value ------------- Distribution ------------- count
> + 4 | 0
> + 8 |@@@@@@@@ 1
> + 16 |@@@@@@@@@@@@@@@@ 2
> + 32 |@@@@@@@@@@@@@@@@ 2
> + 64 | 0
> +
> +
> diff --git a/test/unittest/multiaggs/tst.same.d b/test/unittest/multiaggs/tst.same.d
> index 69121b09..4fd8815d 100644
> --- a/test/unittest/multiaggs/tst.same.d
> +++ b/test/unittest/multiaggs/tst.same.d
> @@ -4,7 +4,6 @@
> * Licensed under the Universal Permissive License v 1.0 as shown at
> * http://oss.oracle.com/licenses/upl.
> */
> -/* @@xfail: dtv2 */
>
> #pragma D option quiet
>
> --
> 2.18.4
>
>
> _______________________________________________
> DTrace-devel mailing list
> DTrace-devel at oss.oracle.com
> https://oss.oracle.com/mailman/listinfo/dtrace-devel
More information about the DTrace-devel
mailing list