[DTrace-devel] [PATCH v2 2/5] Implement a 'aggs' per-CPU BPF map to store aggregation data
Eugene Loh
eugene.loh at oracle.com
Mon Nov 30 09:26:20 PST 2020
On 11/24/2020 01:50 PM, Kris Van Hees wrote:
> Aggregation data consists of data items (all uint64_t) that are updated
Unsigned? Should that be int64_t?
> when an aggregation function is executed. This is quite different from
> other data recording actions that append data to an output buffer.
>
> We need support for concurrency control to allow reading the data during
> non-atomic updates. We use a latch mechanism, i.e. a multiversion
> concurrency control mechanism, to satify these requirements.
Maybe an implementation choice should be stated explicitly here -- that
is, to treat all aggregations as non-atomic, even though a number of
them, such as sum(), min(), and max(), are atomic.
> Simply put, the per-CPU aggregation data area stores a sequence id that
> tracks the generation of the data updates. Since only one aggregation
> can be updated at a time on a single CPU, one latch is sufficient for
> all aggregations (per CPU-buffer). We also allocate two copies of each
> aggregation, which we will call A and B.
While one latch is sufficient from this point of view, it may still be
advantageous to have a different latch for each aggregation. E.g., let's
say there are three aggregations: an lquantize(), a min(), and a
max(). Let's say the producer is moving back and forth between the
min() and the max(). Meanwhile, the consumer is trying to read the
lquantize(), which is potentially "a lot" of data. The producer should
be able to sustain a high rate of min() and max() updates... that's what
aggregations are for. But if there is only a single latch, the sequence
ID for the lquantize() keeps getting spoiled, and the consumer never
makes progress.
Now, granted, having a separate latch for each aggregation might not
help if the producer, in this example, were beating hard on the
lquantize(). But at least the "separate latch" solution would be an
improvement.
The "separate latch" solution requires additional space, but one could
easily imagine that the space will largely be used for dual copies of
"large" quantize operations, for which the relative cost of separate
latches would be small.
> When we update the aggregation data, we first modify A and then we modify
> B, while ensuring that while we modify one, the reader is directed to
> read from the other.
>
> Signed-off-by: Kris Van Hees <kris.van.hees at oracle.com>
> ---
> libdtrace/dt_bpf.c | 20 ++++++++++++++++-
> libdtrace/dt_dctx.h | 16 ++++++++------
> libdtrace/dt_dlibs.c | 1 +
> test/unittest/codegen/tst.stack_layout.r | 28 ++++++++++++------------
> 4 files changed, 43 insertions(+), 22 deletions(-)
>
> diff --git a/libdtrace/dt_bpf.c b/libdtrace/dt_bpf.c
> index eb295b00..f7028b90 100644
> --- a/libdtrace/dt_bpf.c
> +++ b/libdtrace/dt_bpf.c
> @@ -144,6 +144,11 @@ set_task_offsets(dtrace_hdl_t *dtp)
> * - state: DTrace session state, used to communicate state between BPF
> * programs and userspace. The content of the map is defined in
> * dt_state.h.
> + * - aggs: Aggregation data buffer map, associated with each CPU. The
> + * map is implemented as a global per-CPU map with a singleton
> + * element (key 0). Every aggregation is stored with two copies
> + * of its data to provide a lockless latch-based mechanism for
> + * atomic reading and writing.
> * - buffers: Perf event output buffer map, associating a perf event output
> * buffer with each CPU. The map is indexed by CPU id.
> * - cpuinfo: CPU information map, associating a cpuinfo_t structure with
> @@ -191,7 +196,7 @@ set_task_offsets(dtrace_hdl_t *dtp)
> int
> dt_bpf_gmap_create(dtrace_hdl_t *dtp)
> {
> - int gvarc, tvarc;
> + int gvarc, tvarc, aggsz;
> int ci_mapfd;
> uint32_t key = 0;
>
> @@ -202,6 +207,9 @@ dt_bpf_gmap_create(dtrace_hdl_t *dtp)
> /* Mark global maps creation as completed. */
> dt_gmap_done = 1;
>
> + /* Determine the aggregation buffer size. */
> + aggsz = dt_idhash_nextoff(dtp->dt_aggs, 1, 0);
> +
> /* Determine the number of global and TLS variables. */
> gvarc = dt_idhash_peekid(dtp->dt_globals) - DIF_VAR_OTHER_UBASE;
> tvarc = dt_idhash_peekid(dtp->dt_tls) - DIF_VAR_OTHER_UBASE;
> @@ -214,6 +222,16 @@ dt_bpf_gmap_create(dtrace_hdl_t *dtp)
> if (dtp->dt_stmap_fd == -1)
> return -1; /* dt_errno is set for us */
>
> + /*
> + * If there is aggregation data to be collected, we need to add a
> + * uint64_t to the map value size to hold a latch sequence number (seq)
> + * for concurrent access to the data.
> + */
> + if (aggsz > 0 &&
> + create_gmap(dtp, "aggs", BPF_MAP_TYPE_PERCPU_ARRAY,
> + sizeof(uint32_t), sizeof(uint64_t) + aggsz, 1) == -1)
> + return -1; /* dt_errno is set for us */
> +
> if (create_gmap(dtp, "buffers", BPF_MAP_TYPE_PERF_EVENT_ARRAY,
> sizeof(uint32_t), sizeof(uint32_t),
> dtp->dt_conf.num_online_cpus) == -1)
> diff --git a/libdtrace/dt_dctx.h b/libdtrace/dt_dctx.h
> index 3be4dbc4..7ebf4aea 100644
> --- a/libdtrace/dt_dctx.h
> +++ b/libdtrace/dt_dctx.h
> @@ -37,12 +37,14 @@ typedef struct dt_dctx {
> dt_activity_t *act; /* pointer to activity state */
> dt_mstate_t *mst; /* DTrace machine state */
> char *buf; /* Output buffer scratch memory */
> + char *agg; /* Aggregation data */
> } dt_dctx_t;
>
> #define DCTX_CTX offsetof(dt_dctx_t, ctx)
> #define DCTX_ACT offsetof(dt_dctx_t, act)
> #define DCTX_MST offsetof(dt_dctx_t, mst)
> #define DCTX_BUF offsetof(dt_dctx_t, buf)
> +#define DCTX_AGG offsetof(dt_dctx_t, agg)
> #define DCTX_SIZE ((int16_t)sizeof(dt_dctx_t))
>
> /*
> @@ -92,23 +94,23 @@ typedef struct dt_dctx {
> * +----------------+
> * SCRATCH_BASE = -512 | Scratch Memory |
> * +----------------+
> - * LVAR_END = LVAR(n) = -256 | LVAR n | (n = DT_LVAR_MAX = 18)
> + * LVAR_END = LVAR(n) = -256 | LVAR n | (n = DT_LVAR_MAX = 17)
> * +----------------+
> * | ... |
> * +----------------+
> - * LVAR(1) = -120 | LVAR 1 |
> + * LVAR(1) = -128 | LVAR 1 |
> * +----------------+
> - * LVAR_BASE = LVAR(0) = -112 | LVAR 0 |
> + * LVAR_BASE = LVAR(0) = -120 | LVAR 0 |
> * +----------------+
> - * SPILL(n) = -104 | %r8 | (n = DT_STK_NREGS - 1 = 8)
> + * SPILL(n) = -112 | %r8 | (n = DT_STK_NREGS - 1 = 8)
> * +----------------+
> * | ... |
> * +----------------+
> - * SPILL(1) = -48 | %r1 |
> + * SPILL(1) = -56 | %r1 |
> * +----------------+
> - * SPILL_BASE = SPILL(0) = -40 | %r0 |
> + * SPILL_BASE = SPILL(0) = -48 | %r0 |
> * +----------------+
> - * DCTX = -32 | DTrace Context | -1
> + * DCTX = -40 | DTrace Context | -1
> * +----------------+
> */
> #define DT_STK_BASE ((int16_t)0)
> diff --git a/libdtrace/dt_dlibs.c b/libdtrace/dt_dlibs.c
> index 1e3701e0..607ebbe0 100644
> --- a/libdtrace/dt_dlibs.c
> +++ b/libdtrace/dt_dlibs.c
> @@ -61,6 +61,7 @@ static const dt_ident_t dt_bpf_symbols[] = {
> DT_BPF_SYMBOL(dt_set_tvar, DT_IDENT_SYMBOL),
> DT_BPF_SYMBOL(dt_strnlen, DT_IDENT_SYMBOL),
> /* BPF maps */
> + DT_BPF_SYMBOL(aggs, DT_IDENT_PTR),
> DT_BPF_SYMBOL(buffers, DT_IDENT_PTR),
> DT_BPF_SYMBOL(cpuinfo, DT_IDENT_PTR),
> DT_BPF_SYMBOL(gvars, DT_IDENT_PTR),
> diff --git a/test/unittest/codegen/tst.stack_layout.r b/test/unittest/codegen/tst.stack_layout.r
> index 2333cf95..78d41c82 100644
> --- a/test/unittest/codegen/tst.stack_layout.r
> +++ b/test/unittest/codegen/tst.stack_layout.r
> @@ -1,17 +1,17 @@
> Base: 0
> -dctx: -32
> -%r0: -40
> -%r1: -48
> -%r2: -56
> -%r3: -64
> -%r4: -72
> -%r5: -80
> -%r6: -88
> -%r7: -96
> -%r8: -104
> -lvar[ -1]: -111 (ID -1)
> -lvar[ 0]: -112 (ID 0)
> -lvar[ 1]: -120 (ID 1)
> -lvar[ 18]: -256 (ID 18)
> +dctx: -40
> +%r0: -48
> +%r1: -56
> +%r2: -64
> +%r3: -72
> +%r4: -80
> +%r5: -88
> +%r6: -96
> +%r7: -104
> +%r8: -112
> +lvar[ -1]: -119 (ID -1)
> +lvar[ 0]: -120 (ID 0)
> +lvar[ 1]: -128 (ID 1)
> +lvar[ 17]: -256 (ID 17)
> lvar[ -1]: -257 (ID -1)
> scratch: -257 .. -512
More information about the DTrace-devel
mailing list