[DTrace-devel] [PATCH v2 2/5] Implement a 'aggs' per-CPU BPF map to store aggregation data

Kris Van Hees kris.van.hees at oracle.com
Mon Nov 30 12:57:29 PST 2020


On Mon, Nov 30, 2020 at 12:44:04PM -0800, Eugene Loh wrote:
> Reviewed-by: Eugene Loh <eugene.loh at oracle.com>

Thanks.

> A few comments below for your consideration.
> 
> 
> On 11/30/2020 12:33 PM, Kris Van Hees wrote:
> > On Mon, Nov 30, 2020 at 09:26:20AM -0800, Eugene Loh wrote:
> >> On 11/24/2020 01:50 PM, Kris Van Hees wrote:
> >>> Aggregation data consists of data items (all uint64_t) that are updated
> >> Unsigned?  Should that be int64_t?
> > Yes, I forgot to update that.  Thanks!
> >
> >>> when an aggregation function is executed.  This is quite different from
> >>> other data recording actions that append data to an output buffer.
> >>>
> >>> We need support for concurrency control to allow reading the data during
> >>> non-atomic updates.  We use a latch mechanism, i.e. a multiversion
> >>> concurrency control mechanism, to satify these requirements.
> >> Maybe an implementation choice should be stated explicitly here -- that
> >> is, to treat all aggregations as non-atomic, even though a number of
> >> them, such as sum(), min(), and max(), are atomic.
> > Sure, though I think that the statement you comment on actually does that.
> > It explicitly states that we're using a latch mechanism for concurrency
> > control.
> 
> Okay, though I wouldn't mind an explicit statement to that effect to 
> "remind" readers that this mechanism is being used for all aggregations, 
> even when not necessary.  I think it's a worthwhile point, since we're 
> introducing some possibility of stalling the consumer even if all 
> aggregations are atomic.

I am going to opt to not add that because that is already covered by the text
that follows, indicating that the latch applies to the per-CPU aggregation
data area which implies all aggregations.  That is also the subject matter for
your other comment, so I think that is clear.

> >>> Simply put, the per-CPU aggregation data area stores a sequence id that
> >>> tracks the generation of the data updates.  Since only one aggregation
> >>> can be updated at a time on a single CPU, one latch is sufficient for
> >>> all aggregations (per CPU-buffer).  We also allocate two copies of each
> >>> aggregation, which we will call A and B.
> >> While one latch is sufficient from this point of view, it may still be
> >> advantageous to have a different latch for each aggregation. E.g., let's
> >> say there are three aggregations:  an lquantize(), a min(), and a
> >> max().  Let's say the producer is moving back and forth between the
> >> min() and the max().  Meanwhile, the consumer is trying to read the
> >> lquantize(), which is potentially "a lot" of data.  The producer should
> >> be able to sustain a high rate of min() and max() updates... that's what
> >> aggregations are for.  But if there is only a single latch, the sequence
> >> ID for the lquantize() keeps getting spoiled, and the consumer never
> >> makes progress.
> >>
> >> Now, granted, having a separate latch for each aggregation might not
> >> help if the producer, in this example, were beating hard on the
> >> lquantize().  But at least the "separate latch" solution would be an
> >> improvement.
> >>
> >> The "separate latch" solution requires additional space, but one could
> >> easily imagine that the space will largely be used for dual copies of
> >> "large" quantize operations, for which the relative cost of separate
> >> latches would be small.
> > Yes, that is a very likely optimization that I expect we'll end up doing.
> > Right now, the implementation is sufficient to get the rest implemented, and
> > therefore I do intend to look at that as a follow-up patch.
> 
> Okay, though I wouldn't mind an explicit statement along that line... 
> not so much speculation about future optimization work, but more just 
> discussion on the present choices.

I don't think such discussion belongs in the commit message.

> >>> When we update the aggregation data, we first modify A and then we modify
> >>> B, while ensuring that while we modify one, the reader is directed to
> >>> read from the other.
> >>>
> >>> Signed-off-by: Kris Van Hees <kris.van.hees at oracle.com>
> >>> ---
> >>>    libdtrace/dt_bpf.c                       | 20 ++++++++++++++++-
> >>>    libdtrace/dt_dctx.h                      | 16 ++++++++------
> >>>    libdtrace/dt_dlibs.c                     |  1 +
> >>>    test/unittest/codegen/tst.stack_layout.r | 28 ++++++++++++------------
> >>>    4 files changed, 43 insertions(+), 22 deletions(-)
> >>>
> >>> diff --git a/libdtrace/dt_bpf.c b/libdtrace/dt_bpf.c
> >>> index eb295b00..f7028b90 100644
> >>> --- a/libdtrace/dt_bpf.c
> >>> +++ b/libdtrace/dt_bpf.c
> >>> @@ -144,6 +144,11 @@ set_task_offsets(dtrace_hdl_t *dtp)
> >>>     * - state:	DTrace session state, used to communicate state between BPF
> >>>     *		programs and userspace.  The content of the map is defined in
> >>>     *		dt_state.h.
> >>> + * - aggs:	Aggregation data buffer map, associated with each CPU.  The
> >>> + *		map is implemented as a global per-CPU map with a singleton
> >>> + *		element (key 0).  Every aggregation is stored with two copies
> >>> + *		of its data to provide a lockless latch-based mechanism for
> >>> + *		atomic reading and writing.
> >>>     * - buffers:	Perf event output buffer map, associating a perf event output
> >>>     *		buffer with each CPU.  The map is indexed by CPU id.
> >>>     * - cpuinfo:	CPU information map, associating a cpuinfo_t structure with
> >>> @@ -191,7 +196,7 @@ set_task_offsets(dtrace_hdl_t *dtp)
> >>>    int
> >>>    dt_bpf_gmap_create(dtrace_hdl_t *dtp)
> >>>    {
> >>> -	int		gvarc, tvarc;
> >>> +	int		gvarc, tvarc, aggsz;
> >>>    	int		ci_mapfd;
> >>>    	uint32_t	key = 0;
> >>>    
> >>> @@ -202,6 +207,9 @@ dt_bpf_gmap_create(dtrace_hdl_t *dtp)
> >>>    	/* Mark global maps creation as completed. */
> >>>    	dt_gmap_done = 1;
> >>>    
> >>> +	/* Determine the aggregation buffer size.  */
> >>> +	aggsz = dt_idhash_nextoff(dtp->dt_aggs, 1, 0);
> >>> +
> >>>    	/* Determine the number of global and TLS variables. */
> >>>    	gvarc = dt_idhash_peekid(dtp->dt_globals) - DIF_VAR_OTHER_UBASE;
> >>>    	tvarc = dt_idhash_peekid(dtp->dt_tls) - DIF_VAR_OTHER_UBASE;
> >>> @@ -214,6 +222,16 @@ dt_bpf_gmap_create(dtrace_hdl_t *dtp)
> >>>    	if (dtp->dt_stmap_fd == -1)
> >>>    		return -1;	/* dt_errno is set for us */
> >>>    
> >>> +	/*
> >>> +	 * If there is aggregation data to be collected, we need to add a
> >>> +	 * uint64_t to the map value size to hold a latch sequence number (seq)
> >>> +	 * for concurrent access to the data.
> >>> +	 */
> >>> +	if (aggsz > 0 &&
> >>> +	    create_gmap(dtp, "aggs", BPF_MAP_TYPE_PERCPU_ARRAY,
> >>> +			sizeof(uint32_t), sizeof(uint64_t) +  aggsz, 1) == -1)
> >>> +		return -1;	/* dt_errno is set for us */
> >>> +
> >>>    	if (create_gmap(dtp, "buffers", BPF_MAP_TYPE_PERF_EVENT_ARRAY,
> >>>    			sizeof(uint32_t), sizeof(uint32_t),
> >>>    			dtp->dt_conf.num_online_cpus) == -1)
> >>> diff --git a/libdtrace/dt_dctx.h b/libdtrace/dt_dctx.h
> >>> index 3be4dbc4..7ebf4aea 100644
> >>> --- a/libdtrace/dt_dctx.h
> >>> +++ b/libdtrace/dt_dctx.h
> >>> @@ -37,12 +37,14 @@ typedef struct dt_dctx {
> >>>    	dt_activity_t	*act;		/* pointer to activity state */
> >>>    	dt_mstate_t	*mst;		/* DTrace machine state */
> >>>    	char		*buf;		/* Output buffer scratch memory */
> >>> +	char		*agg;		/* Aggregation data */
> >>>    } dt_dctx_t;
> >>>    
> >>>    #define DCTX_CTX	offsetof(dt_dctx_t, ctx)
> >>>    #define DCTX_ACT	offsetof(dt_dctx_t, act)
> >>>    #define DCTX_MST	offsetof(dt_dctx_t, mst)
> >>>    #define DCTX_BUF	offsetof(dt_dctx_t, buf)
> >>> +#define DCTX_AGG	offsetof(dt_dctx_t, agg)
> >>>    #define DCTX_SIZE	((int16_t)sizeof(dt_dctx_t))
> >>>    
> >>>    /*
> >>> @@ -92,23 +94,23 @@ typedef struct dt_dctx {
> >>>     *                             +----------------+
> >>>     *         SCRATCH_BASE = -512 | Scratch Memory |
> >>>     *                             +----------------+
> >>> - *   LVAR_END = LVAR(n) = -256 | LVAR n         | (n = DT_LVAR_MAX = 18)
> >>> + *   LVAR_END = LVAR(n) = -256 | LVAR n         | (n = DT_LVAR_MAX = 17)
> >>>     *                             +----------------+
> >>>     *                             |      ...       |
> >>>     *                             +----------------+
> >>> - *              LVAR(1) = -120 | LVAR 1         |
> >>> + *              LVAR(1) = -128 | LVAR 1         |
> >>>     *                             +----------------+
> >>> - *  LVAR_BASE = LVAR(0) = -112 | LVAR 0         |
> >>> + *  LVAR_BASE = LVAR(0) = -120 | LVAR 0         |
> >>>     *                             +----------------+
> >>> - *             SPILL(n) = -104 | %r8            | (n = DT_STK_NREGS - 1 = 8)
> >>> + *             SPILL(n) = -112 | %r8            | (n = DT_STK_NREGS - 1 = 8)
> >>>     *                             +----------------+
> >>>     *                             |      ...       |
> >>>     *                             +----------------+
> >>> - *              SPILL(1) = -48 | %r1            |
> >>> + *              SPILL(1) = -56 | %r1            |
> >>>     *                             +----------------+
> >>> - * SPILL_BASE = SPILL(0) = -40 | %r0            |
> >>> + * SPILL_BASE = SPILL(0) = -48 | %r0            |
> >>>     *                             +----------------+
> >>> - *                  DCTX = -32 | DTrace Context | -1
> >>> + *                  DCTX = -40 | DTrace Context | -1
> >>>     *                             +----------------+
> >>>     */
> >>>    #define DT_STK_BASE		((int16_t)0)
> >>> diff --git a/libdtrace/dt_dlibs.c b/libdtrace/dt_dlibs.c
> >>> index 1e3701e0..607ebbe0 100644
> >>> --- a/libdtrace/dt_dlibs.c
> >>> +++ b/libdtrace/dt_dlibs.c
> >>> @@ -61,6 +61,7 @@ static const dt_ident_t		dt_bpf_symbols[] = {
> >>>    	DT_BPF_SYMBOL(dt_set_tvar, DT_IDENT_SYMBOL),
> >>>    	DT_BPF_SYMBOL(dt_strnlen, DT_IDENT_SYMBOL),
> >>>    	/* BPF maps */
> >>> +	DT_BPF_SYMBOL(aggs, DT_IDENT_PTR),
> >>>    	DT_BPF_SYMBOL(buffers, DT_IDENT_PTR),
> >>>    	DT_BPF_SYMBOL(cpuinfo, DT_IDENT_PTR),
> >>>    	DT_BPF_SYMBOL(gvars, DT_IDENT_PTR),
> >>> diff --git a/test/unittest/codegen/tst.stack_layout.r b/test/unittest/codegen/tst.stack_layout.r
> >>> index 2333cf95..78d41c82 100644
> >>> --- a/test/unittest/codegen/tst.stack_layout.r
> >>> +++ b/test/unittest/codegen/tst.stack_layout.r
> >>> @@ -1,17 +1,17 @@
> >>>    Base:          0
> >>> -dctx:        -32
> >>> -%r0:         -40
> >>> -%r1:         -48
> >>> -%r2:         -56
> >>> -%r3:         -64
> >>> -%r4:         -72
> >>> -%r5:         -80
> >>> -%r6:         -88
> >>> -%r7:         -96
> >>> -%r8:        -104
> >>> -lvar[ -1]:  -111 (ID  -1)
> >>> -lvar[  0]:  -112 (ID   0)
> >>> -lvar[  1]:  -120 (ID   1)
> >>> -lvar[ 18]:  -256 (ID  18)
> >>> +dctx:        -40
> >>> +%r0:         -48
> >>> +%r1:         -56
> >>> +%r2:         -64
> >>> +%r3:         -72
> >>> +%r4:         -80
> >>> +%r5:         -88
> >>> +%r6:         -96
> >>> +%r7:        -104
> >>> +%r8:        -112
> >>> +lvar[ -1]:  -119 (ID  -1)
> >>> +lvar[  0]:  -120 (ID   0)
> >>> +lvar[  1]:  -128 (ID   1)
> >>> +lvar[ 17]:  -256 (ID  17)
> >>>    lvar[ -1]:  -257 (ID  -1)
> >>>    scratch:    -257 .. -512
> >>
> >> _______________________________________________
> >> DTrace-devel mailing list
> >> DTrace-devel at oss.oracle.com
> >> https://oss.oracle.com/mailman/listinfo/dtrace-devel
> 
> 
> _______________________________________________
> DTrace-devel mailing list
> DTrace-devel at oss.oracle.com
> https://oss.oracle.com/mailman/listinfo/dtrace-devel



More information about the DTrace-devel mailing list