[DTrace-devel] [PATCH v2 1/4] Switch from per-CPU latches to per-CPU/per-agg latches
Kris Van Hees
kris.van.hees at oracle.com
Wed Jan 13 11:56:04 PST 2021
Reviewed-by: Kris Van Hees <kris.van.hees at oracle.com>
.. with minor code change below
On Mon, Jan 11, 2021 at 02:58:42PM -0500, eugene.loh at oracle.com wrote:
> From: Eugene Loh <eugene.loh at oracle.com>
>
> Change the latch sequence number to be per-aggregation, not only per-CPU.
> This way, one can see whether an aggregation has any data or not. (Some
> aggregations, like count(), already inherently have such information, but
> we need to handle the general case.)
>
> This change has the added advantage of reducing producer-consumer conflicts.
> E.g., imagine that the producer is firing very often, updating different
> aggregations, while the consuming is trying to read a single, large
> aggregation (e.g., some *quantize aggregation). A single latch sequence
> number for all aggregations would keep updating, causing the consumer to
> keep rereading an aggregation unnecessarily.
>
> Also, when taking snapshots of aggregations, copy only a single copy of
> the aggregation data. Neither the latch sequence number nor the second
> copy are needed.
>
> This also solves the problem that aggregations that appear in a D script
> were being printed even if the probes that use them never fire.
>
> Introduce a test to catch the printing of unused aggregations.
>
> https://github.com/oracle/dtrace-utils/issues/3
> Signed-off-by: Eugene Loh <eugene.loh at oracle.com>
> ---
> libdtrace/dt_aggregate.c | 59 ++++++++++++++++------------------
> libdtrace/dt_bpf.c | 8 ++---
> libdtrace/dt_cg.c | 53 +++++++++++++++---------------
> libdtrace/dt_map.c | 7 +++-
> test/unittest/aggs/tst.empty.d | 29 +++++++++++++++++
> test/unittest/aggs/tst.empty.r | 2 ++
> 6 files changed, 92 insertions(+), 66 deletions(-)
> create mode 100644 test/unittest/aggs/tst.empty.d
> create mode 100644 test/unittest/aggs/tst.empty.r
>
> diff --git a/libdtrace/dt_aggregate.c b/libdtrace/dt_aggregate.c
> index 972c19a3..389ede8c 100644
> --- a/libdtrace/dt_aggregate.c
> +++ b/libdtrace/dt_aggregate.c
> @@ -422,19 +422,30 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
> size_t ndx = hval % agh->dtah_size;
> int rval;
> uint_t i;
> + int64_t *src;
> + int realsz;
>
> rval = dt_aggid_lookup(st->dtp, aid->di_id, &agg);
> if (rval != 0)
> return rval;
>
> + /* check latch sequence number to see if there is any data */
> + src = (int64_t *)(st->buf + aid->di_offset);
> + if (*src == 0)
> + return 0;
> + src++;
> +
> + /* real size excludes latch sequence number and second data copy */
> + realsz = (aid->di_size - sizeof(uint64_t)) / 2;
> +
> /* See if we already have an entry for this aggregation. */
> for (h = agh->dtah_hash[ndx]; h != NULL; h = h->dtahe_next) {
> - int64_t *src, *dst, *cpu_dst;
> + int64_t *dst, *cpu_dst;
> uint_t cnt;
>
> if (h->dtahe_hval != hval)
> continue;
> - if (h->dtahe_size != aid->di_size)
> + if (h->dtahe_size != realsz)
> continue;
>
> /* Entry found - process the data. */
> @@ -445,7 +456,6 @@ dt_aggregate_snap_one(dt_idhash_t *dhp, dt_ident_t *aid, dt_snapstate_t *st)
> : NULL;
>
> accumulate:
> - src = (int64_t *)(st->buf + aid->di_offset);
> switch (((dt_ident_t *)aid->di_iarg)->di_id) {
> case DT_AGG_MAX:
> if (*src > *dst)
> @@ -458,7 +468,7 @@ accumulate:
>
> break;
> default:
> - for (i = 0, cnt = aid->di_size / sizeof(int64_t);
> + for (i = 0, cnt = realsz / sizeof(int64_t);
> i < cnt; i++, dst++, src++)
> *dst += *src;
> }
> @@ -480,20 +490,20 @@ accumulate:
> return dt_set_errno(st->dtp, EDT_NOMEM);
>
> agd = &h->dtahe_data;
> - agd->dtada_data = dt_alloc(st->dtp, aid->di_size);
> + agd->dtada_data = dt_alloc(st->dtp, realsz);
> if (agd->dtada_data == NULL) {
> dt_free(st->dtp, h);
> return dt_set_errno(st->dtp, EDT_NOMEM);
> }
>
> - memcpy(agd->dtada_data, st->buf + aid->di_offset, aid->di_size);
> - agd->dtada_size = aid->di_size;
> + memcpy(agd->dtada_data, src, realsz);
> + agd->dtada_size = realsz;
> agd->dtada_desc = agg;
> agd->dtada_hdl = st->dtp;
> agd->dtada_normal = 1;
>
> h->dtahe_hval = hval;
> - h->dtahe_size = aid->di_size;
> + h->dtahe_size = realsz;
>
> if (st->agp->dtat_flags & DTRACE_A_PERCPU) {
> char **percpu = dt_calloc(st->dtp,
> @@ -508,7 +518,7 @@ accumulate:
> }
>
> for (i = 0; i <= st->dtp->dt_conf.max_cpuid; i++) {
> - percpu[i] = dt_zalloc(st->dtp, aid->di_size);
> + percpu[i] = dt_zalloc(st->dtp, realsz);
> if (percpu[i] == NULL) {
> while (--i >= 0)
> dt_free(st->dtp, percpu[i]);
> @@ -519,7 +529,7 @@ accumulate:
> }
> }
>
> - memcpy(percpu[st->cpu], st->buf + aid->di_offset, aid->di_size);
> + memcpy(percpu[st->cpu], src, realsz);
> agd->dtada_percpu = percpu;
> }
>
> @@ -544,21 +554,11 @@ dt_aggregate_snap_cpu(dtrace_hdl_t *dtp, processorid_t cpu)
> {
> dt_aggregate_t *agp = &dtp->dt_aggregate;
> char *buf = agp->dtat_cpu_buf[cpu];
> - uint64_t *seq = (uint64_t *)buf;
> dt_snapstate_t st;
>
> - /* Nothing to be done? */
> - if (*seq == 0)
> - return 0;
> -
> - /*
> - * Process all static aggregation identifiers in the CPU buffer. We
> - * skip over the latch sequence number because from this point on we
> - * are simply processing data.
> - */
> st.dtp = dtp;
> st.cpu = cpu;
> - st.buf = buf + sizeof(*seq);
> + st.buf = buf;
> st.agp = agp;
>
> return dt_idhash_iter(dtp->dt_aggs,
> @@ -960,8 +960,8 @@ dt_aggregate_bundlecmp(const void *lhs, const void *rhs)
> * set for min(), so that any other value fed to the functions will register
> * properly.
> *
> - * The first value in the buffer is used as a flag to indicate whether an
> - * initial value was stored in the buffer.
> + * The latch sequence number of the first aggregation is used as a flag to
> + * indicate whether an initial value was stored for any aggregation.
> */
> static int
> init_minmax(dt_idhash_t *dhp, dt_ident_t *aid, char *buf)
> @@ -983,9 +983,10 @@ init_minmax(dt_idhash_t *dhp, dt_ident_t *aid, char *buf)
> /* Indicate that we are setting initial values. */
> *(int64_t *)buf = 1;
>
> - ptr = (int64_t *)(buf + sizeof(int64_t) + aid->di_offset);
> - ptr[0] = value;
> + /* skip ptr[0], it is the latch sequence number */
> + ptr = (int64_t *)(buf + aid->di_offset);
> ptr[1] = value;
> + ptr[2] = value;
>
> return 0;
> }
> @@ -1007,12 +1008,7 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
> * Allocate a buffer to hold the aggregation data for all possible
> * CPUs, and initialize the per-CPU data pointers for CPUs that are
> * currently enabled.
> - *
> - * The size of the aggregation data is to be increased by the size of
> - * the latch sequence number. That yields the actual size of the data
> - * allocated per CPU.
> */
> - aggsz += sizeof(uint64_t);
> agp->dtat_buf = dt_zalloc(dtp, dtp->dt_conf.num_possible_cpus * aggsz);
> if (agp->dtat_buf == NULL)
> return dt_set_errno(dtp, EDT_NOMEM);
> @@ -1044,7 +1040,7 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
> dt_idhash_iter(dtp->dt_aggs, (dt_idhash_f *) init_minmax,
> agp->dtat_buf);
> if (*(int64_t *)agp->dtat_buf != 0) {
> - *(int64_t *)agp->dtat_buf = 0; /* clear the flag */
> + *(int64_t *)agp->dtat_buf = 0; /* clear the flag */
If the flag is 0, no init data was set so there is no need to update the
map value. So, this should simply be:
if (*(int64_t *)agp->dtat_buf == 0)
return 0;
and the the remainder of the function is clearing the flag, copying the init
data, and doing the update.
> for (i = 0; i < dtp->dt_conf.num_online_cpus; i++) {
> int cpu = dtp->dt_conf.cpus[i].cpu_id;
> @@ -1055,7 +1051,6 @@ dt_aggregate_go(dtrace_hdl_t *dtp)
>
> memcpy(agp->dtat_cpu_buf[cpu], agp->dtat_buf, aggsz);
> }
> -
> dt_bpf_map_update(dtp->dt_aggmap_fd, &key, agp->dtat_buf);
> }
>
> diff --git a/libdtrace/dt_bpf.c b/libdtrace/dt_bpf.c
> index c6102f15..1705c990 100644
> --- a/libdtrace/dt_bpf.c
> +++ b/libdtrace/dt_bpf.c
> @@ -223,16 +223,12 @@ dt_bpf_gmap_create(dtrace_hdl_t *dtp)
> return -1; /* dt_errno is set for us */
>
> /*
> - * If there is aggregation data to be collected, we need to create the
> - * 'aggs' BPF map, and account for a uint64_t in the map value size to
> - * hold a latch sequence number (seq) for concurrent access to the
> - * data.
> + * Check if there is aggregation data to be collected.
> */
> if (aggsz > 0) {
> dtp->dt_aggmap_fd = create_gmap(dtp, "aggs",
> BPF_MAP_TYPE_PERCPU_ARRAY,
> - sizeof(uint32_t),
> - sizeof(uint64_t) + aggsz, 1);
> + sizeof(uint32_t), aggsz, 1);
> if (dtp->dt_aggmap_fd == -1)
> return -1; /* dt_errno is set for us */
> }
> diff --git a/libdtrace/dt_cg.c b/libdtrace/dt_cg.c
> index 8e440c8e..972b8483 100644
> --- a/libdtrace/dt_cg.c
> +++ b/libdtrace/dt_cg.c
> @@ -3033,13 +3033,14 @@ dt_cg_node(dt_node_t *dnp, dt_irlist_t *dlp, dt_regset_t *drp)
> * identifier (if not set yet).
> *
> * We consume twice the required data size because of the odd/even data pair
> - * mechanism to provide lockless, write-wait-free operation.
> + * mechanism to provide lockless, write-wait-free operation. Additionally,
> + * we make room for a latch sequence number.
> */
> #define DT_CG_AGG_SET_STORAGE(aid, sz) \
> do { \
> if ((aid)->di_offset == -1) \
> dt_ident_set_storage((aid), sizeof(uint64_t), \
> - 2 * (sz)); \
> + sizeof(uint64_t) + 2 * (sz)); \
> } while (0)
>
> /*
> @@ -3047,7 +3048,7 @@ dt_cg_node(dt_node_t *dnp, dt_irlist_t *dlp, dt_regset_t *drp)
> * return a register that holds a pointer to the aggregation data to be
> * updated.
> *
> - * We update the latch seqcount (first value in the aggregation buffer) to
> + * We update the latch seqcount (first value in the aggregation) to
> * signal that reads should be directed to the alternate copy of the data. We
> * then determine the location of data for the given aggregation that can be
> * updated. This value is stored in the register returned from this function.
> @@ -3056,49 +3057,47 @@ static int
> dt_cg_agg_buf_prepare(dt_ident_t *aid, int size, dt_irlist_t *dlp,
> dt_regset_t *drp)
> {
> - int rx, ry;
> + int ragd, roff;
>
> TRACE_REGSET(" Prep: Begin");
>
> dt_regset_xalloc(drp, BPF_REG_0);
> - rx = dt_regset_alloc(drp);
> - ry = dt_regset_alloc(drp);
> - assert(rx != -1 && ry != -1);
> + ragd = dt_regset_alloc(drp);
> + roff = dt_regset_alloc(drp);
> + assert(ragd != -1 && roff != -1);
>
> /*
> - * // (%rX = register for agg ptr)
> - * // (%rY = register for agg data)
> * agd = dctx->agg; // lddw %r0, [%fp + DT_STK_DCTX]
> - * // lddw %rX, [%r0 + DCTX_AGG]
> + * // lddw %ragd, [%r0 + DCTX_AGG]
> + * agd += aid->di_offset // %ragd += aid->di_offset
> */
> emit(dlp, BPF_LOAD(BPF_DW, BPF_REG_0, BPF_REG_FP, DT_STK_DCTX));
> - emit(dlp, BPF_LOAD(BPF_DW, rx, BPF_REG_0, DCTX_AGG));
> + emit(dlp, BPF_LOAD(BPF_DW, ragd, BPF_REG_0, DCTX_AGG));
> + emit(dlp, BPF_ALU64_IMM(BPF_ADD, ragd, aid->di_offset));
>
> /*
> - * off = (*agd % 2) * size // lddw %rY, [%rX + 0]
> - * + aid->di_offset // and %rY, 1
> - * + sizeof(uint64_t);
> - * // mul %rY, size
> - * // add %rY, aid->di_offset +
> - * // sizeof(uint64_t)
> + * off = (*agd & 1) * size // lddw %roff, [%ragd + 0]
> + * + sizeof(uint64_t); // and %roff, 1
> + * // mul %roff, size
> + * // add %roff, sizeof(uint64_t)
> * (*agd)++; // mov %r0, 1
> - * // xadd [%rX + 0], %r0
> - * agd += off; // add %rX, %rY
> + * // xadd [%ragd + 0], %r0
> + * agd += off; // add %ragd, %roff
> */
> - emit(dlp, BPF_LOAD(BPF_DW, ry, rx, 0));
> - emit(dlp, BPF_ALU64_IMM(BPF_AND, ry, 1));
> - emit(dlp, BPF_ALU64_IMM(BPF_MUL, ry, size));
> - emit(dlp, BPF_ALU64_IMM(BPF_ADD, ry, aid->di_offset + sizeof(uint64_t)));
> + emit(dlp, BPF_LOAD(BPF_DW, roff, ragd, 0));
> + emit(dlp, BPF_ALU64_IMM(BPF_AND, roff, 1));
> + emit(dlp, BPF_ALU64_IMM(BPF_MUL, roff, size));
> + emit(dlp, BPF_ALU64_IMM(BPF_ADD, roff, sizeof(uint64_t)));
> emit(dlp, BPF_MOV_IMM(BPF_REG_0, 1));
> - emit(dlp, BPF_XADD_REG(BPF_DW, rx, 0, BPF_REG_0));
> - emit(dlp, BPF_ALU64_REG(BPF_ADD, rx, ry));
> + emit(dlp, BPF_XADD_REG(BPF_DW, ragd, 0, BPF_REG_0));
> + emit(dlp, BPF_ALU64_REG(BPF_ADD, ragd, roff));
>
> - dt_regset_free(drp, ry);
> + dt_regset_free(drp, roff);
> dt_regset_free(drp, BPF_REG_0);
>
> TRACE_REGSET(" Prep: End ");
>
> - return rx;
> + return ragd;
> }
>
> #define DT_CG_AGG_IMPL(aid, sz, dlp, drp, f, ...) \
> diff --git a/libdtrace/dt_map.c b/libdtrace/dt_map.c
> index 0c9495d7..cac295fa 100644
> --- a/libdtrace/dt_map.c
> +++ b/libdtrace/dt_map.c
> @@ -274,11 +274,16 @@ dt_aggid_add(dtrace_hdl_t *dtp, const dt_ident_t *aid)
> if (agg == NULL)
> return dt_set_errno(dtp, EDT_NOMEM);
>
> + /*
> + * Note the relationship between the aggregation storage
> + * size (di_size) and the aggregation data size (dtagd_size):
> + * di_size = dtagd_size * (# agg copies) + (size of latch seq #)
> + */
> agg->dtagd_id = id;
> agg->dtagd_name = aid->di_name;
> agg->dtagd_sig = ((dt_idsig_t *)aid->di_data)->dis_auxinfo;
> agg->dtagd_varid = aid->di_id;
> - agg->dtagd_size = aid->di_size / 2;
> + agg->dtagd_size = (aid->di_size - sizeof(uint64_t))/ 2;
> agg->dtagd_nrecs = agg->dtagd_size / sizeof(uint64_t);
>
> recs = dt_calloc(dtp, agg->dtagd_nrecs, sizeof(dtrace_recdesc_t));
> diff --git a/test/unittest/aggs/tst.empty.d b/test/unittest/aggs/tst.empty.d
> new file mode 100644
> index 00000000..784e1dd9
> --- /dev/null
> +++ b/test/unittest/aggs/tst.empty.d
> @@ -0,0 +1,29 @@
> +/*
> + * Oracle Linux DTrace.
> + * Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
> + * Licensed under the Universal Permissive License v 1.0 as shown at
> + * http://oss.oracle.com/licenses/upl.
> + */
> +
> +/*
> + * ASSERTION: Empty aggregations are not printed
> + *
> + * SECTION: Aggregations/Aggregations
> + */
> +
> +#pragma D option quiet
> +
> +BEGIN
> +{
> + i = 12345678;
> + @a = sum(i);
> + exit(0);
> +}
> +
> +/* this probe should not fire */
> +tick-1hour
> +{
> + /* these aggregations should not be printed */
> + @b = min(i);
> + @c = max(i);
> +}
> diff --git a/test/unittest/aggs/tst.empty.r b/test/unittest/aggs/tst.empty.r
> new file mode 100644
> index 00000000..aab82b76
> --- /dev/null
> +++ b/test/unittest/aggs/tst.empty.r
> @@ -0,0 +1,2 @@
> +
> + 12345678
> --
> 2.18.4
>
>
> _______________________________________________
> DTrace-devel mailing list
> DTrace-devel at oss.oracle.com
> https://oss.oracle.com/mailman/listinfo/dtrace-devel
More information about the DTrace-devel
mailing list