[DTrace-devel] [PATCH 08/08] Buffer consumption semantics fixes

Kris Van Hees kris.van.hees at oracle.com
Fri Sep 18 23:10:53 PDT 2020


On Fri, Sep 18, 2020 at 03:13:10PM -0700, Eugene Loh wrote:
> On 09/14/2020 12:51 PM, Kris Van Hees wrote:
> > The implementation of the buffer consumption mechanism was hardwired
> > to end all buffer processing once an exit() action was seen.  The
> > correct semantics are quite a bit more complicated.
> >
> > - BEGIN probe data must be processed and reported before any other
> >    probes.
> > - END probe data must be processed and reported after all other
> >    probes.
> > - The exit() action must prevent any further probe firings other than
> >    the END probe.  The clause it occurs in should finish executing.
> 
> Well, that's interesting.  Let me try that:
> 
>      # dtrace -n 'BEGIN { trace(1); exit(0); trace(2) } END { trace(4) }'
>      dtrace: description 'BEGIN ' matched 2 probes
>      CPU     ID                    FUNCTION:NAME
>        0      1                           :BEGIN 1        2
>        0      2                             :END         4
> 
> Sure enough, it works!  But oddly, if I try:
> 
>      # dtrace -n 'BEGIN { trace(1); exit(0); trace(2); exit(0) } END { 
> trace(4) }'
> 
> then with DTv1 I get
> 
>        0      2                             :END         4
> 
> while with DTv2 I get
> 
>        3      1                           :BEGIN         1 2
>        3      2                             :END         4
> 
> I guess DTv1 has a bug.

Hm, yes, that is an interesting case.  I'll take a look at what might be going
wrong with the legacy implementation - sounds like the port might indeed have a
bug in this area.  Fortunately it is a rather rare occurence, and quite obscure
semantics at best.  But knowing where there is a bug is still valuable.

Thanks for pointing this out.

> > The following changes are included in this patch:
> >
> > - dt_consume: dtrace_consume()
> 
> Add a .c to the dt_consume.c file name.

Thanks.

> >      Remove dead code.
> >
> >      If the BEGIN probe data (if any) was not processed yet, it will
> >      be processed now using dt_consume_begin().
> >
> >      Use default callback functions if none were provided.
> >
> >      If the END probe has fired and we're processing output buffers,
> >      skip the one on which the END probe executed.  It gets processed
> >      after all other buffers have been processed.
> >
> > - dt_consume: dtrace_consume_begin()
> 
> Add .c to dt_consume file name.  And I think it's dt_ rather than 
> dtrace_ on the function name.

Indeed.

> >      Remove dead code.
> 
> What does this mean?  I didn't see any "#if 0".  Do you mean calls to 
> obsolete dt_ioctl() calls?  If so, such added specificity would be nice 
> to have.

I opt to not spell out all the dead code that gets removed because that is
what the patch itself is for.  Since it is code that does not apply here
anymore (different implementation) there isn't really much value in explaining
what it was used for.

> >      If not data is found in the buffer for the CPU on which the BEGIN
> >      probe executed, it did not record any data (no special processing).
> 
> s/not data/no data/

Thanks.

> > - dt_consume: dtrace_consume_begin()
> 
> If this is the same file name and function as the previous bullet, just 
> combine with that bullet.

I wrote the wrong function name... should be dtrace_consume_begin_probe()

> >      When looking for BEGIN probe data only, if data for another probe
> >      is found, signal that we should be done processing this buffer.
> >
> > - dt_map.c: dt_epid_lookup()
> >
> >      Return -1 if the epid is out of range, or if no ddesc or pdesc
> >      entry is found rather than using asserts.
> >
> > - dt_work.c: dtrace_status()
> >
> >      Rewrite to use dt_state_get_activity().
> >
> > - dt_work.c: dtrace_go()
> >
> >      Remove dead code.
> >
> >      Record the CPU on which the BEGIN probe executed.
> >
> >      If the activity state after the BEGIN probe is STOPPED an exit()
> >      action must have occured in the clause.  In that case, the activity
> >      state must be moved back to DRAINING.
> >
> > - dt_work.c: dtrace_stop()
> >
> >      Remove dead code.
> 
> Okay, though I still see another "#if 0 dt_ioctl()" block in there. Is 
> that intentional?

Yes, there remains dead code in some areas because we're not quite ready to
tackle them yet.  In other words, no final determination has been made whether
we can just drop that code or whether we need to provide an alternative under
the new implementation.

> >      Record the CPU on which the END probe executed.
> >
> > - dt_work.c: dtrace_work()
> 
> You've been removing a lot of dead code, but there is still a "#if 0" 
> version of dtrace_work() in there.  Is that intentional?

Yes, it serves as a convenient reminder of what the implementation needs to do.
It is a bit messy and I could handle that by simply having a 2nd tree around
or another window where I check the diff against the legacy implementation but
I have found this to be both convenient and instructive to people looking at
the code in its current development stage to see the equivalence directly.

It will get removed soon.

> >      Provide an implementation that actually uses the dtrace_status()
> >      to determine what to do and how to handle the dtrace_consume()
> >      return value.
> >
> > - dtrace.h
> >
> >      Add DTRACE_CONSUME_DONE as a valid turn value for dtrace_consume().
> >
> > - dtrace_prov_dtrace.c
> 
> s/dtrace_/dt_/
> 
> >      Update the comment block explaining the BEGIN and END probe activity
> >      state requirements and changes.
> >
> >      Change the expected activity state from ACTIVE to DRAINING for the
> >      END probe.
> >
> > Signed-off-by: Kris Van Hees <kris.van.hees at oracle.com>
> > ---
> >   libdtrace/dt_consume.c                        | 830 ++++--------------
> >   libdtrace/dt_map.c                            |   8 +-
> >   libdtrace/dt_prov_dtrace.c                    |   9 +-
> >   libdtrace/dt_work.c                           | 128 +--
> >   libdtrace/dtrace.h                            |   3 +-
> >   test/unittest/actions/exit/tst.begin-exit.d   |  22 +
> >   test/unittest/actions/exit/tst.begin-exit.r   |   6 +
> >   test/unittest/actions/exit/tst.end-exit.d     |  16 +
> >   test/unittest/actions/exit/tst.end-exit.r     |   6 +
> >   .../actions/exit/tst.probe-begin-exit.d       |  20 +
> >   .../actions/exit/tst.probe-begin-exit.r       |   6 +
> >   .../actions/exit/tst.probe-end-exit.d         |  20 +
> >   .../actions/exit/tst.probe-end-exit.r         |   7 +
> >   test/unittest/actions/exit/tst.probe-exit.d   |  19 +
> >   test/unittest/actions/exit/tst.probe-exit.r   |   7 +
> >   15 files changed, 357 insertions(+), 750 deletions(-)
> >   create mode 100644 test/unittest/actions/exit/tst.begin-exit.d
> >   create mode 100644 test/unittest/actions/exit/tst.begin-exit.r
> >   create mode 100644 test/unittest/actions/exit/tst.end-exit.d
> >   create mode 100644 test/unittest/actions/exit/tst.end-exit.r
> >   create mode 100644 test/unittest/actions/exit/tst.probe-begin-exit.d
> >   create mode 100644 test/unittest/actions/exit/tst.probe-begin-exit.r
> >   create mode 100644 test/unittest/actions/exit/tst.probe-end-exit.d
> >   create mode 100644 test/unittest/actions/exit/tst.probe-end-exit.r
> >   create mode 100644 test/unittest/actions/exit/tst.probe-exit.d
> >   create mode 100644 test/unittest/actions/exit/tst.probe-exit.r
> >
> > diff --git a/libdtrace/dt_consume.c b/libdtrace/dt_consume.c
> > index f94459d9..07ee454b 100644
> > --- a/libdtrace/dt_consume.c
> > +++ b/libdtrace/dt_consume.c
> > @@ -16,6 +16,7 @@
> >   #include <dt_impl.h>
> >   #include <dt_pcap.h>
> >   #include <dt_peb.h>
> > +#include <dt_state.h>
> >   #include <libproc.h>
> >   #include <port.h>
> >   #include <sys/epoll.h>
> > @@ -447,13 +448,13 @@ out:
> >   static int
> >   dt_nullprobe()
> >   {
> > -	return (DTRACE_CONSUME_THIS);
> > +	return DTRACE_CONSUME_THIS;
> >   }
> >   
> >   static int
> >   dt_nullrec()
> >   {
> > -	return (DTRACE_CONSUME_NEXT);
> > +	return DTRACE_CONSUME_NEXT;
> >   }
> >   
> >   int
> > @@ -1826,434 +1827,6 @@ dt_setopt(dtrace_hdl_t *dtp, const dtrace_probedata_t *data,
> >   	return (rval);
> >   }
> >   
> > -#if 0
> > -int
> > -dt_consume_cpu(dtrace_hdl_t *dtp, FILE *fp, int cpu, dtrace_bufdesc_t *buf,
> > -    dtrace_consume_probe_f *efunc, dtrace_consume_rec_f *rfunc, void *arg)
> > -{
> > -	dtrace_epid_t id;
> > -	size_t offs, start = buf->dtbd_oldest, end = buf->dtbd_size;
> > -	int flow = (dtp->dt_options[DTRACEOPT_FLOWINDENT] != DTRACEOPT_UNSET);
> > -	int quiet = (dtp->dt_options[DTRACEOPT_QUIET] != DTRACEOPT_UNSET);
> > -	int rval, i, n;
> > -	dtrace_epid_t last = DTRACE_EPIDNONE;
> > -	dtrace_probedata_t data;
> > -	uint64_t drops;
> > -	caddr_t addr;
> > -
> > -	memset(&data, 0, sizeof (data));
> > -	data.dtpda_handle = dtp;
> > -	data.dtpda_cpu = cpu;
> > -
> > -again:
> > -	for (offs = start; offs < end; ) {
> > -		dtrace_datadesc_t *dd;
> > -
> > -		/*
> > -		 * We're guaranteed to have an ID.
> > -		 */
> > -		id = *(uint32_t *)((uintptr_t)buf->dtbd_data + offs);
> > -
> > -		if (id == DTRACE_EPIDNONE) {
> > -			/*
> > -			 * This is filler to assure proper alignment of the
> > -			 * next record; we simply ignore it.
> > -			 */
> > -			offs += sizeof (id);
> > -			continue;
> > -		}
> > -
> > -		if ((rval = dt_epid_lookup(dtp, id, &data.dtpda_ddesc,
> > -					   &data.dtpda_pdesc)) != 0)
> > -			return (rval);
> > -
> > -		dd = data.dtpda_ddesc;
> > -		data.dtpda_data = buf->dtbd_data + offs;
> > -
> > -		if (data.dtpda_ddesc->dtdd_uarg != DT_ECB_DEFAULT) {
> > -			rval = dt_handle(dtp, &data);
> > -
> > -			if (rval == DTRACE_CONSUME_NEXT)
> > -				goto nextepid;
> > -
> > -			if (rval == DTRACE_CONSUME_ERROR)
> > -				return (-1);
> > -		}
> > -
> > -		if (flow)
> > -			(void) dt_flowindent(dtp, &data, last, buf, offs);
> > -
> > -		rval = (*efunc)(&data, arg);
> > -
> > -		if (flow) {
> > -			if (data.dtpda_flow == DTRACEFLOW_ENTRY)
> > -				data.dtpda_indent += 2;
> > -		}
> > -
> > -		if (rval == DTRACE_CONSUME_NEXT)
> > -			goto nextepid;
> > -
> > -		if (rval == DTRACE_CONSUME_ABORT)
> > -			return (dt_set_errno(dtp, EDT_DIRABORT));
> > -
> > -		if (rval != DTRACE_CONSUME_THIS)
> > -			return (dt_set_errno(dtp, EDT_BADRVAL));
> > -
> > -		for (i = 0; i < dd->dtdd_nrecs; i++) {
> > -			dtrace_recdesc_t *rec = &dd->dtdd_recs[i];
> > -			dtrace_actkind_t act = rec->dtrd_action;
> > -
> > -			data.dtpda_data = buf->dtbd_data + offs +
> > -			    rec->dtrd_offset;
> > -			addr = data.dtpda_data;
> > -
> > -			if (act == DTRACEACT_LIBACT) {
> > -				uint64_t arg = rec->dtrd_arg;
> > -				dtrace_aggvarid_t id;
> > -
> > -				switch (arg) {
> > -				case DT_ACT_CLEAR:
> > -					/* LINTED - alignment */
> > -					id = *((dtrace_aggvarid_t *)addr);
> > -					(void) dtrace_aggregate_walk(dtp,
> > -					    dt_clear_agg, &id);
> > -					continue;
> > -
> > -				case DT_ACT_DENORMALIZE:
> > -					/* LINTED - alignment */
> > -					id = *((dtrace_aggvarid_t *)addr);
> > -					(void) dtrace_aggregate_walk(dtp,
> > -					    dt_denormalize_agg, &id);
> > -					continue;
> > -
> > -				case DT_ACT_FTRUNCATE:
> > -					if (fp == NULL)
> > -						continue;
> > -
> > -					(void) fflush(fp);
> > -					(void) ftruncate(fileno(fp), 0);
> > -					(void) fseeko(fp, 0, SEEK_SET);
> > -					continue;
> > -
> > -				case DT_ACT_NORMALIZE:
> > -					if (i == dd->dtdd_nrecs - 1)
> > -						return (dt_set_errno(dtp,
> > -						    EDT_BADNORMAL));
> > -
> > -					if (dt_normalize(dtp,
> > -					    buf->dtbd_data + offs, rec) != 0)
> > -						return (-1);
> > -
> > -					i++;
> > -					continue;
> > -
> > -				case DT_ACT_SETOPT: {
> > -					uint64_t *opts = dtp->dt_options;
> > -					dtrace_recdesc_t *valrec;
> > -					uint32_t valsize;
> > -					caddr_t val;
> > -					int rv;
> > -
> > -					if (i == dd->dtdd_nrecs - 1) {
> > -						return (dt_set_errno(dtp,
> > -						    EDT_BADSETOPT));
> > -					}
> > -
> > -					valrec = &dd->dtdd_recs[++i];
> > -					valsize = valrec->dtrd_size;
> > -
> > -					if (valrec->dtrd_action != act ||
> > -					    valrec->dtrd_arg != arg) {
> > -						return (dt_set_errno(dtp,
> > -						    EDT_BADSETOPT));
> > -					}
> > -
> > -					if (valsize > sizeof (uint64_t)) {
> > -						val = buf->dtbd_data + offs +
> > -						    valrec->dtrd_offset;
> > -					} else {
> > -						val = "1";
> > -					}
> > -
> > -					rv = dt_setopt(dtp, &data, addr, val);
> > -
> > -					if (rv != 0)
> > -						return (-1);
> > -
> > -					flow = (opts[DTRACEOPT_FLOWINDENT] !=
> > -					    DTRACEOPT_UNSET);
> > -					quiet = (opts[DTRACEOPT_QUIET] !=
> > -					    DTRACEOPT_UNSET);
> > -
> > -					continue;
> > -				}
> > -
> > -				case DT_ACT_TRUNC:
> > -					if (i == dd->dtdd_nrecs - 1)
> > -						return (dt_set_errno(dtp,
> > -						    EDT_BADTRUNC));
> > -
> > -					if (dt_trunc(dtp,
> > -					    buf->dtbd_data + offs, rec) != 0)
> > -						return (-1);
> > -
> > -					i++;
> > -					continue;
> > -
> > -				default:
> > -					continue;
> > -				}
> > -			}
> > -
> > -			rval = (*rfunc)(&data, rec, arg);
> > -
> > -			if (rval == DTRACE_CONSUME_NEXT)
> > -				continue;
> > -
> > -			if (rval == DTRACE_CONSUME_ABORT)
> > -				return (dt_set_errno(dtp, EDT_DIRABORT));
> > -
> > -			if (rval != DTRACE_CONSUME_THIS)
> > -				return (dt_set_errno(dtp, EDT_BADRVAL));
> > -
> > -			if (act == DTRACEACT_STACK) {
> > -				int depth = rec->dtrd_arg;
> > -
> > -				if (dt_print_stack(dtp, fp, NULL, addr, depth,
> > -				    rec->dtrd_size / depth) < 0)
> > -					return (-1);
> > -				goto nextrec;
> > -			}
> > -
> > -			if (act == DTRACEACT_USTACK ||
> > -			    act == DTRACEACT_JSTACK) {
> > -				if (dt_print_ustack(dtp, fp, NULL,
> > -				    addr, rec->dtrd_arg) < 0)
> > -					return (-1);
> > -				goto nextrec;
> > -			}
> > -
> > -			if (act == DTRACEACT_SYM) {
> > -				if (dt_print_sym(dtp, fp, NULL, addr) < 0)
> > -					return (-1);
> > -				goto nextrec;
> > -			}
> > -
> > -			if (act == DTRACEACT_MOD) {
> > -				if (dt_print_mod(dtp, fp, NULL, addr) < 0)
> > -					return (-1);
> > -				goto nextrec;
> > -			}
> > -
> > -			if (act == DTRACEACT_USYM || act == DTRACEACT_UADDR) {
> > -				if (dt_print_usym(dtp, fp, addr, act) < 0)
> > -					return (-1);
> > -				goto nextrec;
> > -			}
> > -
> > -			if (act == DTRACEACT_UMOD) {
> > -				if (dt_print_umod(dtp, fp, NULL, addr) < 0)
> > -					return (-1);
> > -				goto nextrec;
> > -			}
> > -
> > -			if (DTRACEACT_ISPRINTFLIKE(act)) {
> > -				void *fmtdata;
> > -				int (*func)(dtrace_hdl_t *, FILE *, void *,
> > -				    const dtrace_probedata_t *,
> > -				    const dtrace_recdesc_t *, uint_t,
> > -				    const void *buf, size_t);
> > -
> > -				if ((fmtdata = dt_format_lookup(dtp,
> > -				    rec->dtrd_format)) == NULL)
> > -					goto nofmt;
> > -
> > -				switch (act) {
> > -				case DTRACEACT_PRINTF:
> > -					func = dtrace_fprintf;
> > -					break;
> > -				case DTRACEACT_PRINTA:
> > -					func = dtrace_fprinta;
> > -					break;
> > -				case DTRACEACT_SYSTEM:
> > -					func = dtrace_system;
> > -					break;
> > -				case DTRACEACT_FREOPEN:
> > -					func = dtrace_freopen;
> > -					break;
> > -				default:
> > -					dt_dprintf("dt_consume_cpu(): "
> > -					    "unknown is-printf-like action %d\n",
> > -					    (int) act);
> > -					return (-1);
> > -				}
> > -
> > -				n = (*func)(dtp, fp, fmtdata, &data,
> > -				    rec, dd->dtdd_nrecs - i,
> > -				    (uchar_t *)buf->dtbd_data + offs,
> > -				    buf->dtbd_size - offs);
> > -
> > -				if (n < 0)
> > -					return (-1); /* errno is set for us */
> > -
> > -				if (n > 0)
> > -					i += n - 1;
> > -				goto nextrec;
> > -			}
> > -
> > -nofmt:
> > -			if (act == DTRACEACT_PRINTA) {
> > -				dtrace_print_aggdata_t pd;
> > -				dtrace_aggvarid_t *aggvars;
> > -				int j, naggvars = 0;
> > -				size_t size = ((dd->dtdd_nrecs - i) *
> > -				    sizeof (dtrace_aggvarid_t));
> > -
> > -				if ((aggvars = dt_alloc(dtp, size)) == NULL)
> > -					return (-1);
> > -
> > -				/*
> > -				 * This might be a printa() with multiple
> > -				 * aggregation variables.  We need to scan
> > -				 * forward through the records until we find
> > -				 * a record from a different statement.
> > -				 */
> > -				for (j = i; j < dd->dtdd_nrecs; j++) {
> > -					dtrace_recdesc_t *nrec;
> > -					caddr_t naddr;
> > -
> > -					nrec = &dd->dtdd_recs[j];
> > -
> > -					if (nrec->dtrd_uarg != rec->dtrd_uarg)
> > -						break;
> > -
> > -					if (nrec->dtrd_action != act) {
> > -						return (dt_set_errno(dtp,
> > -						    EDT_BADAGG));
> > -					}
> > -
> > -					naddr = buf->dtbd_data + offs +
> > -					    nrec->dtrd_offset;
> > -
> > -					aggvars[naggvars++] =
> > -					    /* LINTED - alignment */
> > -					    *((dtrace_aggvarid_t *)naddr);
> > -				}
> > -
> > -				i = j - 1;
> > -				memset(&pd, 0, sizeof (pd));
> > -				pd.dtpa_dtp = dtp;
> > -				pd.dtpa_fp = fp;
> > -
> > -				assert(naggvars >= 1);
> > -
> > -				if (naggvars == 1) {
> > -					pd.dtpa_id = aggvars[0];
> > -					dt_free(dtp, aggvars);
> > -
> > -					if (dt_printf(dtp, fp, "\n") < 0 ||
> > -					    dtrace_aggregate_walk_sorted(dtp,
> > -					    dt_print_agg, &pd) < 0)
> > -						return (-1);
> > -					goto nextrec;
> > -				}
> > -
> > -				if (dt_printf(dtp, fp, "\n") < 0 ||
> > -				    dtrace_aggregate_walk_joined(dtp, aggvars,
> > -				    naggvars, dt_print_aggs, &pd) < 0) {
> > -					dt_free(dtp, aggvars);
> > -					return (-1);
> > -				}
> > -
> > -				dt_free(dtp, aggvars);
> > -				goto nextrec;
> > -			}
> > -
> > -			if (act == DTRACEACT_TRACEMEM) {
> > -				n = dt_print_tracemem(dtp, fp, rec,
> > -				    dd->dtdd_nrecs - i,
> > -				    buf->dtbd_data + offs);
> > -
> > -				if (n < 0)
> > -					return (-1); /* errno is set for us */
> > -
> > -				i += n - 1;
> > -				goto nextrec;
> > -			}
> > -
> > -			if (act == DTRACEACT_PCAP) {
> > -				n = dt_print_pcap(dtp, fp, rec,
> > -						  buf->dtbd_data + offs);
> > -
> > -				if (n < 0)
> > -					return (-1); /* errno is set for us */
> > -
> > -				i += n + 1;
> > -				goto nextrec;
> > -			}
> > -
> > -			switch (rec->dtrd_size) {
> > -			case sizeof (uint64_t):
> > -				n = dt_printf(dtp, fp,
> > -				    quiet ? "%lld" : " %16lld",
> > -				    /* LINTED - alignment */
> > -				    *((unsigned long long *)addr));
> > -				break;
> > -			case sizeof (uint32_t):
> > -				n = dt_printf(dtp, fp, quiet ? "%d" : " %8d",
> > -				    /* LINTED - alignment */
> > -				    *((uint32_t *)addr));
> > -				break;
> > -			case sizeof (uint16_t):
> > -				n = dt_printf(dtp, fp, quiet ? "%d" : " %5d",
> > -				    /* LINTED - alignment */
> > -				    *((uint16_t *)addr));
> > -				break;
> > -			case sizeof (uint8_t):
> > -				n = dt_printf(dtp, fp, quiet ? "%d" : " %3d",
> > -				    *((uint8_t *)addr));
> > -				break;
> > -			default:
> > -				n = dt_print_bytes(dtp, fp, addr,
> > -				    rec->dtrd_size, 33, quiet);
> > -				break;
> > -			}
> > -
> > -			if (n < 0)
> > -				return (-1); /* errno is set for us */
> > -
> > -nextrec:
> > -			if (dt_buffered_flush(dtp, &data, rec, NULL, 0) < 0)
> > -				return (-1); /* errno is set for us */
> > -		}
> > -
> > -		/*
> > -		 * Call the record callback with a NULL record to indicate
> > -		 * that we're done processing this EPID.
> > -		 */
> > -		rval = (*rfunc)(&data, NULL, arg);
> > -nextepid:
> > -		offs += dd->dtdd_size;
> > -		last = id;
> > -	}
> > -
> > -	if (buf->dtbd_oldest != 0 && start == buf->dtbd_oldest) {
> > -		end = buf->dtbd_oldest;
> > -		start = 0;
> > -		goto again;
> > -	}
> > -
> > -	if ((drops = buf->dtbd_drops) == 0)
> > -		return (0);
> > -
> > -	/*
> > -	 * Explicitly zero the drops to prevent us from processing them again.
> > -	 */
> > -	buf->dtbd_drops = 0;
> > -
> > -	return (dt_handle_cpudrop(dtp, cpu, DTRACEDROP_PRINCIPAL, drops));
> > -}
> > -#else
> >   static int
> >   dt_print_trace(dtrace_hdl_t *dtp, FILE *fp, dtrace_recdesc_t *rec,
> >   	       caddr_t data, int quiet)
> > @@ -2315,7 +1888,7 @@ dt_print_trace(dtrace_hdl_t *dtp, FILE *fp, dtrace_recdesc_t *rec,
> >   }
> >   
> >   static dtrace_workstatus_t
> > -dt_consume_one(dtrace_hdl_t *dtp, FILE *fp, int cpu, char *buf,
> > +dt_consume_one(dtrace_hdl_t *dtp, FILE *fp, char *buf,
> >   	       dtrace_probedata_t *pdat, dtrace_consume_probe_f *efunc,
> >   	       dtrace_consume_rec_f *rfunc, int flow, int quiet,
> >   	       dtrace_epid_t *last, void *arg)
> > @@ -2346,13 +1919,13 @@ dt_consume_one(dtrace_hdl_t *dtp, FILE *fp, int cpu, char *buf,
> >   		 * (Note that 'n' may be 0.)
> >   		 */
> >   		if (ptr > buf + hdr->size)
> > -			return -1;
> > +			return DTRACE_WORKSTATUS_ERROR;
> >   
> >   		size = *(uint32_t *)data;
> >   		data += sizeof(size);
> >   		ptr += sizeof(size) + size;
> >   		if (ptr != buf + hdr->size)
> > -			return -1;
> > +			return DTRACE_WORKSTATUS_ERROR;
> >   
> >   		data += sizeof(uint32_t);		/* skip padding */
> >   		size -= sizeof(uint32_t);
> > @@ -2370,7 +1943,7 @@ dt_consume_one(dtrace_hdl_t *dtp, FILE *fp, int cpu, char *buf,
> >   		rval = dt_epid_lookup(dtp, epid, &pdat->dtpda_ddesc,
> >   						 &pdat->dtpda_pdesc);
> >   		if (rval != 0)
> > -			return rval;
> > +			return DTRACE_WORKSTATUS_ERROR;
> >   
> >   		if (flow)
> >   			dt_flowindent(dtp, pdat, *last, DTRACE_EPIDNONE);
> > @@ -2382,14 +1955,18 @@ dt_consume_one(dtrace_hdl_t *dtp, FILE *fp, int cpu, char *buf,
> >   				pdat->dtpda_indent += 2;
> >   		}
> >   
> > -		if (rval == DTRACE_CONSUME_NEXT)
> > -			return 0;
> > -
> > -		if (rval == DTRACE_CONSUME_ABORT)
> > +		switch (rval) {
> > +		case DTRACE_CONSUME_NEXT:
> > +			return DTRACE_WORKSTATUS_OKAY;
> > +		case DTRACE_CONSUME_DONE:
> > +			return DTRACE_WORKSTATUS_DONE;
> > +		case DTRACE_CONSUME_ABORT:
> >   			return dt_set_errno(dtp, EDT_DIRABORT);
> > -
> > -		if (rval != DTRACE_CONSUME_THIS)
> > +		case DTRACE_CONSUME_THIS:
> > +			break;
> > +		default:
> >   			return dt_set_errno(dtp, EDT_BADRVAL);
> > +		}
> >   
> >   		/*
> >   		 * FIXME: This code is temporary.
> > @@ -2467,7 +2044,7 @@ dt_consume_one(dtrace_hdl_t *dtp, FILE *fp, int cpu, char *buf,
> >   
> >   		*last = epid;
> >   
> > -		return done ? DTRACE_WORKSTATUS_DONE : DTRACE_WORKSTATUS_OKAY;
> > +		return DTRACE_WORKSTATUS_OKAY;
> >   	} else if (hdr->type == PERF_RECORD_LOST) {
> >   		uint64_t	lost;
> >   
> > @@ -2488,7 +2065,7 @@ dt_consume_one(dtrace_hdl_t *dtp, FILE *fp, int cpu, char *buf,
> >   }
> >   
> >   int
> > -dt_consume_cpu(dtrace_hdl_t *dtp, FILE *fp, int cpu, dt_peb_t *peb,
> > +dt_consume_cpu(dtrace_hdl_t *dtp, FILE *fp, dt_peb_t *peb,
> >   	       dtrace_consume_probe_f *efunc, dtrace_consume_rec_f *rfunc,
> >   	       void *arg)
> >   {
> > @@ -2513,7 +2090,7 @@ dt_consume_cpu(dtrace_hdl_t *dtp, FILE *fp, int cpu, dt_peb_t *peb,
> >   	 */
> >   	memset(&pdat, 0, sizeof(pdat));
> >   	pdat.dtpda_handle = dtp;
> > -	pdat.dtpda_cpu = cpu;
> > +	pdat.dtpda_cpu = peb->cpu;
> >   
> >   	/*
> >   	 * Set base to be the start of the buffer data, i.e. we skip the first
> > @@ -2555,9 +2132,10 @@ dt_consume_cpu(dtrace_hdl_t *dtp, FILE *fp, int cpu, dt_peb_t *peb,
> >   				event = dst;
> >   			}
> >   
> > -			rval = dt_consume_one(dtp, fp, cpu, event, &pdat,
> > -					      efunc, rfunc, flow, quiet, &last,
> > -					      arg);
> > +			rval = dt_consume_one(dtp, fp, event, &pdat, efunc,
> > +					      rfunc, flow, quiet, &last, arg);
> > +			if (rval == DTRACE_WORKSTATUS_DONE)
> > +				return DTRACE_WORKSTATUS_OKAY;
> >   			if (rval != DTRACE_WORKSTATUS_OKAY)
> >   				return rval;
> >   
> > @@ -2569,7 +2147,6 @@ dt_consume_cpu(dtrace_hdl_t *dtp, FILE *fp, int cpu, dt_peb_t *peb,
> >   
> >   	return DTRACE_WORKSTATUS_OKAY;
> >   }
> > -#endif
> >   
> >   typedef struct dt_begin {
> >   	dtrace_consume_probe_f *dtbgn_probefunc;
> > @@ -2583,110 +2160,118 @@ typedef struct dt_begin {
> >   static int
> >   dt_consume_begin_probe(const dtrace_probedata_t *data, void *arg)
> >   {
> > -	dt_begin_t *begin = (dt_begin_t *)arg;
> > -	dtrace_probedesc_t *pd = data->dtpda_pdesc;
> > -
> > -	int r1 = (strcmp(pd->prv, "dtrace") == 0);
> > -	int r2 = (strcmp(pd->prb, "BEGIN") == 0);
> > +	dt_begin_t		*begin = (dt_begin_t *)arg;
> > +	dtrace_probedesc_t	*pd = data->dtpda_pdesc;
> > +	int			r1 = (strcmp(pd->prv, "dtrace") == 0);
> > +	int			r2 = (strcmp(pd->prb, "BEGIN") == 0);
> >   
> >   	if (begin->dtbgn_beginonly) {
> >   		if (!(r1 && r2))
> > -			return (DTRACE_CONSUME_NEXT);
> > +			return DTRACE_CONSUME_DONE;
> >   	} else {
> >   		if (r1 && r2)
> > -			return (DTRACE_CONSUME_NEXT);
> > +			return DTRACE_CONSUME_NEXT;
> >   	}
> >   
> >   	/*
> >   	 * We have a record that we're interested in.  Now call the underlying
> >   	 * probe function...
> >   	 */
> > -	return (begin->dtbgn_probefunc(data, begin->dtbgn_arg));
> > +	return begin->dtbgn_probefunc(data, begin->dtbgn_arg);
> >   }
> >   
> >   static int
> >   dt_consume_begin_record(const dtrace_probedata_t *data,
> > -    const dtrace_recdesc_t *rec, void *arg)
> > +			const dtrace_recdesc_t *rec, void *arg)
> >   {
> > -	dt_begin_t *begin = (dt_begin_t *)arg;
> > +	dt_begin_t	*begin = (dt_begin_t *)arg;
> >   
> > -	return (begin->dtbgn_recfunc(data, rec, begin->dtbgn_arg));
> > +	return begin->dtbgn_recfunc(data, rec, begin->dtbgn_arg);
> >   }
> >   
> >   static int
> >   dt_consume_begin_error(const dtrace_errdata_t *data, void *arg)
> >   {
> > -	dt_begin_t *begin = (dt_begin_t *)arg;
> > -	dtrace_probedesc_t *pd = data->dteda_pdesc;
> > -
> > -	int r1 = (strcmp(pd->prv, "dtrace") == 0);
> > -	int r2 = (strcmp(pd->prb, "BEGIN") == 0);
> > +	dt_begin_t		*begin = (dt_begin_t *)arg;
> > +	dtrace_probedesc_t	*pd = data->dteda_pdesc;
> > +	int			r1 = (strcmp(pd->prv, "dtrace") == 0);
> > +	int			r2 = (strcmp(pd->prb, "BEGIN") == 0);
> >   
> >   	if (begin->dtbgn_beginonly) {
> >   		if (!(r1 && r2))
> > -			return (DTRACE_HANDLE_OK);
> > +			return DTRACE_HANDLE_OK;
> >   	} else {
> >   		if (r1 && r2)
> > -			return (DTRACE_HANDLE_OK);
> > +			return DTRACE_HANDLE_OK;
> >   	}
> >   
> > -	return (begin->dtbgn_errhdlr(data, begin->dtbgn_errarg));
> > +	return begin->dtbgn_errhdlr(data, begin->dtbgn_errarg);
> >   }
> >   
> > +/*
> > + * There is this idea that the BEGIN probe should be processed before
> > + * everything else, and that the END probe should be processed after anything
> > + * else.
> > + *
> > + * In the common case, this is pretty easy to deal with.  However, a situation
> > + * may arise where the BEGIN enabling and END enabling are on the same CPU, and
> > + * some enabling in the middle occurred on a different CPU.
> > + *
> > + * To deal with this (blech!) we need to consume the BEGIN buffer up until the
> 
> I realize these comments are verbatim from before, but since they're so 
> heavily reformatted and moved and stuff anyhow, might be a good time to 
> strike the editorializing like "(blech!)"...
> 
> > + * end of the BEGIN probe, and then set it aside.  We will then process every
> > + * other CPU, and then we'll return to the BEGIN CPU and process the rest of
> > + * the data (which will inevitably include the END probe, if any).
> > + *
> > + * Making this even more complicated (!) is the library's ERROR enabling.
> 
> ... and "(!)".
> 
> > + * Because this enabling is processed before we even get into the consume call
> > + * back, any ERROR firing would result in the library's ERROR enabling being
> > + * processed twice -- once in our first pass (for BEGIN probes), and again in
> > + * our second pass (for everything but BEGIN probes).
> > + *
> > + * To deal with this, we interpose on the ERROR handler to assure that we only
> > + * process ERROR enablings induced by BEGIN enablings in the first pass, and
> > + * that we only process ERROR enablings _not_ induced by BEGIN enablings in the
> > + * second pass.
> > + */
> >   static int
> > -dt_consume_begin(dtrace_hdl_t *dtp, FILE *fp, dtrace_bufdesc_t *buf,
> > -    dtrace_consume_probe_f *pf, dtrace_consume_rec_f *rf, void *arg)
> > +dt_consume_begin(dtrace_hdl_t *dtp, FILE *fp, struct epoll_event *events,
> > +		 int cnt, dtrace_consume_probe_f *pf, dtrace_consume_rec_f *rf,
> > +		 void *arg)
> >   {
> > +	dt_peb_t	*bpeb = NULL;
> > +	dt_begin_t	begin;
> > +	processorid_t	cpu = dtp->dt_beganon;
> > +	int		rval, i;
> > +
> >   	/*
> > -	 * There's this idea that the BEGIN probe should be processed before
> > -	 * everything else, and that the END probe should be processed after
> > -	 * anything else.  In the common case, this is pretty easy to deal
> > -	 * with.  However, a situation may arise where the BEGIN enabling and
> > -	 * END enabling are on the same CPU, and some enabling in the middle
> > -	 * occurred on a different CPU.  To deal with this (blech!) we need to
> > -	 * consume the BEGIN buffer up until the end of the BEGIN probe, and
> > -	 * then set it aside.  We will then process every other CPU, and then
> > -	 * we'll return to the BEGIN CPU and process the rest of the data
> > -	 * (which will inevitably include the END probe, if any).  Making this
> > -	 * even more complicated (!) is the library's ERROR enabling.  Because
> > -	 * this enabling is processed before we even get into the consume call
> > -	 * back, any ERROR firing would result in the library's ERROR enabling
> > -	 * being processed twice -- once in our first pass (for BEGIN probes),
> > -	 * and again in our second pass (for everything but BEGIN probes).  To
> > -	 * deal with this, we interpose on the ERROR handler to assure that we
> > -	 * only process ERROR enablings induced by BEGIN enablings in the
> > -	 * first pass, and that we only process ERROR enablings _not_ induced
> > -	 * by BEGIN enablings in the second pass.
> > +	 * Ensure we get called only once...
> >   	 */
> > -	dt_begin_t begin;
> > -	processorid_t cpu = dtp->dt_beganon;
> > -	dtrace_bufdesc_t nbuf;
> > -	int rval, i;
> > -	static int max_ncpus;
> > -	dtrace_optval_t size;
> > -
> >   	dtp->dt_beganon = -1;
> >   
> > -	if (dt_ioctl(dtp, DTRACEIOC_BUFSNAP, buf) == -1) {
> > -		/*
> > -		 * We really don't expect this to fail, but it is at least
> > -		 * technically possible for this to fail with ENOENT.  In this
> > -		 * case, we just drive on...
> > -		 */
> > -		if (errno == ENOENT)
> > -			return (0);
> > +	/*
> > +	 * Find the buffer for the CPU on which the BEGIN probe executed).
> > +	 */
> > +	for (i = 0; i < cnt; i++) {
> > +		bpeb = events[i].data.ptr;
> >   
> > -		return (dt_set_errno(dtp, errno));
> > +		if (bpeb->cpu == cpu)
> > +			break;
> >   	}
> >   
> > -	if (!dtp->dt_stopped || buf->dtbd_cpu != dtp->dt_endedon) {
> > -		/*
> > -		 * This is the simple case.  We're either not stopped, or if
> > -		 * we are, we actually processed any END probes on another
> > -		 * CPU.  We can simply consume this buffer and return.
> > -		 */
> > -		return (dt_consume_cpu(dtp, fp, cpu, buf, pf, rf, arg));
> > -	}
> > +	/*
> > +	 * If not found, the BEGIN probe does not have data recording clauses
> > +	 * so we are done here.
> > +	 */
> > +	if (bpeb == NULL || bpeb->cpu != cpu)
> > +		return DTRACE_WORKSTATUS_OKAY;
> 
> It seems the question is whether we exited the loop because of the break 
> or because all iterations were exhausted.  So it's easier just to check 
> "if (i >= cnt)".

It is easier in a sense, but the code is more clear with the conditional as-is
because that explicitly states the conditions we care about.

> > +
> > +	/*
> > +	 * The simple case: we are either not stopped, or we are stopped and
> > +	 * the END probe executed on another CPU.  Simply consume this buffer
> > +	 * and return.
> > +	 */
> > +	if (!dtp->dt_stopped || cpu != dtp->dt_endedon)
> > +		return dt_consume_cpu(dtp, fp, bpeb, pf, rf, arg);
> >   
> >   	begin.dtbgn_probefunc = pf;
> >   	begin.dtbgn_recfunc = rf;
> > @@ -2694,70 +2279,40 @@ dt_consume_begin(dtrace_hdl_t *dtp, FILE *fp, dtrace_bufdesc_t *buf,
> >   	begin.dtbgn_beginonly = 1;
> >   
> >   	/*
> > -	 * We need to interpose on the ERROR handler to be sure that we
> > -	 * only process ERRORs induced by BEGIN.
> > +	 * We need to interpose on the ERROR handler to be sure that we only
> > +	 * process ERRORs induced by BEGIN.
> >   	 */
> >   	begin.dtbgn_errhdlr = dtp->dt_errhdlr;
> >   	begin.dtbgn_errarg = dtp->dt_errarg;
> >   	dtp->dt_errhdlr = dt_consume_begin_error;
> >   	dtp->dt_errarg = &begin;
> >   
> > -	rval = dt_consume_cpu(dtp, fp, cpu, buf, dt_consume_begin_probe,
> > -	    dt_consume_begin_record, &begin);
> > +	rval = dt_consume_cpu(dtp, fp, bpeb, dt_consume_begin_probe,
> > +			      dt_consume_begin_record, &begin);
> >   
> >   	dtp->dt_errhdlr = begin.dtbgn_errhdlr;
> >   	dtp->dt_errarg = begin.dtbgn_errarg;
> >   
> >   	if (rval != 0)
> > -		return (rval);
> > +		return rval;
> >   
> > -	/*
> > -	 * Now allocate a new buffer.  We'll use this to deal with every other
> > -	 * CPU.
> > -	 */
> > -	memset(&nbuf, 0, sizeof (dtrace_bufdesc_t));
> > -	(void) dtrace_getopt(dtp, "bufsize", &size);
> > -	if ((nbuf.dtbd_data = malloc(size)) == NULL)
> > -		return (dt_set_errno(dtp, EDT_NOMEM));
> > -
> > -	if (max_ncpus == 0)
> > -		max_ncpus = dtp->dt_conf.dtc_maxbufs;
> > -
> > -	for (i = 0; i < max_ncpus; i++) {
> > -		nbuf.dtbd_cpu = i;
> > +	for (i = 0; i < cnt; i++) {
> > +		dt_peb_t	*peb = events[i].data.ptr;
> >   
> > -		if (i == cpu)
> > +		if (peb == bpeb)
> >   			continue;
> >   
> > -		if (dt_ioctl(dtp, DTRACEIOC_BUFSNAP, &nbuf) == -1) {
> > -			/*
> > -			 * If we failed with ENOENT, it may be because the
> > -			 * CPU was unconfigured -- this is okay.  Any other
> > -			 * error, however, is unexpected.
> > -			 */
> > -			if (errno == ENOENT)
> > -				continue;
> > -
> > -			free(nbuf.dtbd_data);
> > -
> > -			return (dt_set_errno(dtp, errno));
> > -		}
> > -
> > -		if ((rval = dt_consume_cpu(dtp, fp,
> > -		    i, &nbuf, pf, rf, arg)) != 0) {
> > -			free(nbuf.dtbd_data);
> > -			return (rval);
> > -		}
> > +		rval = dt_consume_cpu(dtp, fp, peb, pf, rf, arg);
> > +		if (rval != 0)
> > +			return rval;
> >   	}
> >   
> > -	free(nbuf.dtbd_data);
> > -
> >   	/*
> > -	 * Okay -- we're done with the other buffers.  Now we want to
> > -	 * reconsume the first buffer -- but this time we're looking for
> > -	 * everything _but_ BEGIN.  And of course, in order to only consume
> > -	 * those ERRORs _not_ associated with BEGIN, we need to reinstall our
> > -	 * ERROR interposition function...
> > +	 * Okay -- we're done with the other buffers.  Now we want to reconsume
> > +	 * the first buffer -- but this time we're looking for everything _but_
> > +	 * BEGIN.  And of course, in order to only consume those ERRORs _not_
> > +	 * associated with BEGIN, we need to reinstall our ERROR interposition
> > +	 * function...
> >   	 */
> >   	begin.dtbgn_beginonly = 0;
> >   
> > @@ -2766,120 +2321,42 @@ dt_consume_begin(dtrace_hdl_t *dtp, FILE *fp, dtrace_bufdesc_t *buf,
> >   	dtp->dt_errhdlr = dt_consume_begin_error;
> >   	dtp->dt_errarg = &begin;
> >   
> > -	rval = dt_consume_cpu(dtp, fp, cpu, buf, dt_consume_begin_probe,
> > -	    dt_consume_begin_record, &begin);
> > +	rval = dt_consume_cpu(dtp, fp, bpeb, dt_consume_begin_probe,
> > +			      dt_consume_begin_record, &begin);
> >   
> >   	dtp->dt_errhdlr = begin.dtbgn_errhdlr;
> >   	dtp->dt_errarg = begin.dtbgn_errarg;
> >   
> > -	return (rval);
> > +	return rval;
> >   }
> >   
> >   dtrace_workstatus_t
> > -dtrace_consume(dtrace_hdl_t *dtp, FILE *fp,
> > -    dtrace_consume_probe_f *pf, dtrace_consume_rec_f *rf, void *arg)
> > +dtrace_consume(dtrace_hdl_t *dtp, FILE *fp, dtrace_consume_probe_f *pf,
> > +	       dtrace_consume_rec_f *rf, void *arg)
> >   {
> > -#if 0
> > -	dtrace_bufdesc_t *buf = &dtp->dt_buf;
> > -	dtrace_optval_t size;
> > -	static int max_ncpus;
> > -	int i, rval;
> > -	dtrace_optval_t interval = dtp->dt_options[DTRACEOPT_SWITCHRATE];
> > -	hrtime_t now = gethrtime();
> > -
> > -	if (dtp->dt_lastswitch != 0) {
> > -		if (now - dtp->dt_lastswitch < interval)
> > -			return (0);
> > -
> > -		dtp->dt_lastswitch += interval;
> > -	} else {
> > -		dtp->dt_lastswitch = now;
> > -	}
> > +	dtrace_optval_t		timeout = dtp->dt_options[DTRACEOPT_SWITCHRATE];
> > +	struct epoll_event	events[dtp->dt_conf.num_online_cpus];
> > +	int			i, cnt;
> > +	dtrace_workstatus_t	rval;
> >   
> > +	/*
> > +	 * Don't try to consume trace data when tracing hasn't even been
> > +	 * started yet.  This usually means that the consumer didn't call
> > +	 * dtrace_go() yet.
> > +	 */
> >   	if (!dtp->dt_active)
> > -		return (dt_set_errno(dtp, EINVAL));
> > -
> > -	if (max_ncpus == 0)
> > -		max_ncpus = dtp->dt_conf.dtc_maxbufs;
> > +		return dt_set_errno(dtp, EINVAL);
> >   
> > +	/*
> > +	 * Ensure that we have callback functions to use (if none we provided,
> > +	 * we use the default no-op ones).
> > +	 */
> >   	if (pf == NULL)
> >   		pf = (dtrace_consume_probe_f *)dt_nullprobe;
> >   
> >   	if (rf == NULL)
> >   		rf = (dtrace_consume_rec_f *)dt_nullrec;
> >   
> > -	if (buf->dtbd_data == NULL) {
> > -		(void) dtrace_getopt(dtp, "bufsize", &size);
> > -		if ((buf->dtbd_data = malloc(size)) == NULL)
> > -			return (dt_set_errno(dtp, EDT_NOMEM));
> > -
> > -		buf->dtbd_size = size;
> > -	}
> > -
> > -	/*
> > -	 * If we have just begun, we want to first process the CPU that
> > -	 * executed the BEGIN probe (if any).
> > -	 */
> > -	if (dtp->dt_active && dtp->dt_beganon != -1) {
> > -		buf->dtbd_cpu = dtp->dt_beganon;
> > -		if ((rval = dt_consume_begin(dtp, fp, buf, pf, rf, arg)) != 0)
> > -			return (rval);
> > -	}
> > -
> > -	for (i = 0; i < max_ncpus; i++) {
> > -		buf->dtbd_cpu = i;
> > -
> > -		/*
> > -		 * If we have stopped, we want to process the CPU on which the
> > -		 * END probe was processed only _after_ we have processed
> > -		 * everything else.
> > -		 */
> > -		if (dtp->dt_stopped && (i == dtp->dt_endedon))
> > -			continue;
> > -
> > -		if (dt_ioctl(dtp, DTRACEIOC_BUFSNAP, buf) == -1) {
> > -			/*
> > -			 * If we failed with ENOENT, it may be because the
> > -			 * CPU was unconfigured -- this is okay.  Any other
> > -			 * error, however, is unexpected.
> > -			 */
> > -			if (errno == ENOENT)
> > -				continue;
> > -
> > -			return (dt_set_errno(dtp, errno));
> > -		}
> > -
> > -		if ((rval = dt_consume_cpu(dtp, fp, i, buf, pf, rf, arg)) != 0)
> > -			return (rval);
> > -	}
> > -
> > -	if (!dtp->dt_stopped)
> > -		return (0);
> > -
> > -	buf->dtbd_cpu = dtp->dt_endedon;
> > -
> > -	if (dt_ioctl(dtp, DTRACEIOC_BUFSNAP, buf) == -1) {
> > -		/*
> > -		 * This _really_ shouldn't fail, but it is strictly speaking
> > -		 * possible for this to return ENOENT if the CPU that called
> > -		 * the END enabling somehow managed to become unconfigured.
> > -		 * It's unclear how the user can possibly expect anything
> > -		 * rational to happen in this case -- the state has been thrown
> > -		 * out along with the unconfigured CPU -- so we'll just drive
> > -		 * on...
> > -		 */
> > -		if (errno == ENOENT)
> > -			return (0);
> > -
> > -		return (dt_set_errno(dtp, errno));
> > -	}
> > -
> > -	return (dt_consume_cpu(dtp, fp, dtp->dt_endedon, buf, pf, rf, arg));
> > -#else
> > -	dtrace_optval_t		timeout = dtp->dt_options[DTRACEOPT_SWITCHRATE];
> > -	struct epoll_event	events[dtp->dt_conf.num_online_cpus];
> > -	int			i, cnt;
> > -
> >   	/*
> >   	 * The epoll_wait() function expects the timeout to be expressed in
> >   	 * milliseconds whereas the switch rate is expressed in nanoseconds.
> > @@ -2893,19 +2370,56 @@ dtrace_consume(dtrace_hdl_t *dtp, FILE *fp,
> >   		return DTRACE_WORKSTATUS_ERROR;
> >   	}
> >   
> > +	/*
> > +	 * If dtp->dt_beganon is not -1, we did not process the BEGIN probe
> > +	 * data (if any) yet.  We do know (since dtp->dt_active is TRUE) that
> > +	 * the BEGIN probe completed processing and that it therefore recorded
> > +	 * the id of the CPU it executed on in DT_STATE_BEGANON.
> > +	 */
> > +	if (dtp->dt_beganon != -1) {
> > +		rval = dt_consume_begin(dtp, fp, events, cnt, pf, rf, arg);
> > +		if (rval != 0)
> > +			return rval;
> > +	}
> > +
> >   	/*
> >   	 * Loop over the buffers that have data available, and process them one
> > -	 * by one.
> > +	 * by one.  If tracing has stopped, skip the CPU on which the END probe
> > +	 * executed because we want to process that one last.
> >   	 */
> >   	for (i = 0; i < cnt; i++) {
> >   		dt_peb_t	*peb = events[i].data.ptr;
> > -		int		rval;
> >   
> > -		rval = dt_consume_cpu(dtp, fp, peb->cpu, peb, pf, rf, arg);
> > +		if (dtp->dt_stopped && peb->cpu == dtp->dt_endedon)
> > +			continue;
> > +
> > +		rval = dt_consume_cpu(dtp, fp, peb, pf, rf, arg);
> >   		if (rval != 0)
> >   			return rval;
> >   	}
> >   
> > +	/*
> > +	 * If tracing has not been stopped, we are done here.
> > +	 */
> > +	if (!dtp->dt_stopped)
> > +		return 0;
> > +
> > +	/*
> > +	 * Tracing has stopped, so we need to process the buffer for the CPU on
> > +	 * which the END probe executed.
> > +	 */
> > +	for (i = 0; i < cnt; i++) {
> > +		dt_peb_t	*peb = events[i].data.ptr;
> > +
> > +		if (peb->cpu != dtp->dt_endedon)
> > +			continue;
> > +
> > +		return dt_consume_cpu(dtp, fp, peb, pf, rf, arg);
> > +	}
> > +
> > +	/*
> > +	 * If we get here, the END probe fired without any data being recorded
> > +	 * for it.  That's OK.
> > +	 */
> >   	return DTRACE_WORKSTATUS_OKAY;
> > -#endif
> >   }
> > diff --git a/libdtrace/dt_map.c b/libdtrace/dt_map.c
> > index d92259d0..bd5464df 100644
> > --- a/libdtrace/dt_map.c
> > +++ b/libdtrace/dt_map.c
> > @@ -139,14 +139,14 @@ int
> >   dt_epid_lookup(dtrace_hdl_t *dtp, dtrace_epid_t epid, dtrace_datadesc_t **ddp,
> >   	       dtrace_probedesc_t **pdp)
> >   {
> > -	assert(epid < dtp->dt_maxprobe);
> > -	assert(dtp->dt_ddesc[epid] != NULL);
> > -	assert(dtp->dt_pdesc[epid] != NULL);
> > +	if (epid >= dtp->dt_maxprobe ||
> > +	    dtp->dt_ddesc[epid] == NULL || dtp->dt_pdesc[epid] == NULL)
> > +		return -1;
> >   
> >   	*ddp = dtp->dt_ddesc[epid];
> >   	*pdp = dtp->dt_pdesc[epid];
> >   
> > -	return (0);
> > +	return 0;
> >   }
> >   
> >   void
> > diff --git a/libdtrace/dt_prov_dtrace.c b/libdtrace/dt_prov_dtrace.c
> > index 4fde9942..9cf1af57 100644
> > --- a/libdtrace/dt_prov_dtrace.c
> > +++ b/libdtrace/dt_prov_dtrace.c
> > @@ -83,15 +83,16 @@ static void trampoline(dt_pcb_t *pcb)
> >   	/*
> >   	 * The BEGIN probe should only run when the activity state is INACTIVE.
> >   	 * At the end of the trampoline (after executing any clauses), the
> > -	 * state must be advanced to the next state (INACTIVE -> ACTIVE).
> > +	 * state must be advanced to the next state (INACTIVE -> ACTIVE, or if
> > +	 * there was an exit() action in the clause, DRAINING -> STOPPED).
> >   	 *
> 
> The BEGIN{exit} case is a big gotcha here.  The exit action moves us to 
> DRAINING, at which point the BEGIN trampoline will advance us to 
> STOPPED, as you point out.  But then you go on to say that END requires 
> DRAINING.  Hence, we should believe that BEGIN{exit} means the END 
> clause will not be executed.

This describes the behaviour as it is implemented by the provider.  Since this
is in the provider source code that is appropriate.

The semantics of how the BEGIN and END probes are used is defined at the
consumer level as you describe below...

> What saves us is the new code over in dtrace_go() to handle this case.  
> I think this special handling is important enough, weird enough, and 
> located far enough away (in the consumer, while here we are in 
> trampoline cg) that it deserves being mentioned here.  E.g., something like:
> 
>           * At the end of the trampoline (after executing any clauses), the
> -        * state must be advanced to the next state (INACTIVE -> ACTIVE).
> +        * state is advanced to the next state. (Typically, this means
> +        * INACTIVE -> ACTIVE.  If the clause has an exit() action, however,
> +        * we end up advancing DRAINING -> STOPPED. Special code in
> +        * dtrace_go() moves it back to DRAINING in this case.)
> 
> Seem reasonable?  That last phrase is trying to mimic the comment in 
> dtrace_go() to make it easier to find.

I don't want to add that detail here because it is not part of the provider
implementation of the probe behaviour.  It is not the provider that ensures
that the END probe is executed at all times - that is done by the overall
tracing framework for DTrace, and that is where this is documented.

> >   	 * When the BEGIN probe is triggered, we need to record the CPU it runs
> >   	 * on in state[DT_STATE_BEGANON] to ensure that we know which trace
> >   	 * data buffer to process first.
> >   	 *
> > -	 * The END probe should only run when the activity state is ACTIVE.
> > +	 * The END probe should only run when the activity state is DRAINING.
> >   	 * At the end of the trampoline (after executing any clauses), the
> > -	 * state must be advanced to the next state (ACTIVE -> DRAINING).
> > +	 * state must be advanced to the next state (DRAINING -> STOPPED).
> >   	 *
> >   	 * When the END probe is triggered, we need to record the CPU it runs
> >   	 * on in state[DT_STATE_ENDEDON] to ensure that we know which trace
> > @@ -105,7 +106,7 @@ static void trampoline(dt_pcb_t *pcb)
> >   		adv_act = 1;
> >   		key = DT_STATE_BEGANON;
> >   	} else if (strcmp(pcb->pcb_probe->desc->prb, "END") == 0) {
> > -		act = DT_ACTIVITY_ACTIVE;
> > +		act = DT_ACTIVITY_DRAINING;
> >   		adv_act = 1;
> >   		key = DT_STATE_ENDEDON;
> >   	} else {
> > diff --git a/libdtrace/dt_work.c b/libdtrace/dt_work.c
> > index 21bfe4f0..85ca2699 100644
> > --- a/libdtrace/dt_work.c
> > +++ b/libdtrace/dt_work.c
> > @@ -9,6 +9,7 @@
> >   #include <dt_peb.h>
> >   #include <dt_probe.h>
> >   #include <dt_bpf.h>
> > +#include <dt_state.h>
> >   #include <stddef.h>
> >   #include <errno.h>
> >   #include <assert.h>
> > @@ -123,61 +124,28 @@ dtrace_sleep(dtrace_hdl_t *dtp)
> >   int
> >   dtrace_status(dtrace_hdl_t *dtp)
> >   {
> > -	int gen = dtp->dt_statusgen;
> > -	dtrace_optval_t interval = dtp->dt_options[DTRACEOPT_STATUSRATE];
> > -	hrtime_t now = gethrtime();
> > -
> >   	if (!dtp->dt_active)
> > -		return (DTRACE_STATUS_NONE);
> > +		return DTRACE_STATUS_NONE;
> >   
> >   	if (dtp->dt_stopped)
> > -		return (DTRACE_STATUS_STOPPED);
> > -
> > -	if (dtp->dt_laststatus != 0) {
> > -		if (now - dtp->dt_laststatus < interval)
> > -			return (DTRACE_STATUS_NONE);
> > -
> > -		dtp->dt_laststatus += interval;
> > -	} else {
> > -		dtp->dt_laststatus = now;
> > -	}
> > -
> > -	if (dt_ioctl(dtp, DTRACEIOC_STATUS, &dtp->dt_status[gen]) == -1)
> > -		return (dt_set_errno(dtp, errno));
> > +		return DTRACE_STATUS_STOPPED;
> >   
> > -	dtp->dt_statusgen ^= 1;
> > -
> > -	if (dt_handle_status(dtp, &dtp->dt_status[dtp->dt_statusgen],
> > -	    &dtp->dt_status[gen]) == -1)
> > -		return (-1);
> > -
> > -	if (dtp->dt_status[gen].dtst_exiting) {
> > +	if (dt_state_get_activity(dtp) == DT_ACTIVITY_DRAINING) {
> >   		if (!dtp->dt_stopped)
> > -			(void) dtrace_stop(dtp);
> > -
> > -		return (DTRACE_STATUS_EXITED);
> > -	}
> > -
> > -	if (dtp->dt_status[gen].dtst_filled == 0)
> > -		return (DTRACE_STATUS_OKAY);
> > +			dtrace_stop(dtp);
> >   
> > -	if (dtp->dt_options[DTRACEOPT_BUFPOLICY] != DTRACEOPT_BUFPOLICY_FILL)
> > -		return (DTRACE_STATUS_OKAY);
> > -
> > -	if (!dtp->dt_stopped) {
> > -		if (dtrace_stop(dtp) == -1)
> > -			return (-1);
> > +		return DTRACE_STATUS_EXITED;
> >   	}
> >   
> > -	return (DTRACE_STATUS_FILLED);
> > +	return DTRACE_STATUS_OKAY;
> >   }
> >   
> >   int
> >   dtrace_go(dtrace_hdl_t *dtp, uint_t cflags)
> >   {
> > -	void	*dof;
> > -	size_t	size;
> > -	int	err;
> > +	void		*dof;
> > +	size_t		size;
> > +	int		err;
> >   
> >   	if (dtp->dt_active)
> >   		return (dt_set_errno(dtp, EINVAL));
> > @@ -206,17 +174,6 @@ dtrace_go(dtrace_hdl_t *dtp, uint_t cflags)
> >   	if (err)
> >   		return err;
> >   
> > -#if 0
> > -	if ((dof = dtrace_getopt_dof(dtp)) == NULL)
> > -		return (-1); /* dt_errno has been set for us */
> > -
> > -	err = dt_ioctl(dtp, DTRACEIOC_ENABLE, dof);
> > -	dtrace_dof_destroy(dtp, dof);
> > -
> > -	if (err == -1 && (errno != ENOTTY || dtp->dt_vector == NULL))
> > -		return (dt_set_errno(dtp, errno));
> > -#endif
> > -
> >   	/*
> >   	 * Set up the event polling file descriptor.
> >   	 */
> > @@ -239,28 +196,17 @@ dtrace_go(dtrace_hdl_t *dtp, uint_t cflags)
> >   		return dt_set_errno(dtp, EDT_NOMEM);
> >   
> >   	BEGIN_probe();
> > -#if 0
> > -	if (dt_ioctl(dtp, DTRACEIOC_GO, &dtp->dt_beganon) == -1) {
> > -		if (errno == EACCES)
> > -			return (dt_set_errno(dtp, EDT_DESTRUCTIVE));
> > -
> > -		if (errno == EALREADY)
> > -			return (dt_set_errno(dtp, EDT_ISANON));
> > -
> > -		if (errno == ENOENT)
> > -			return (dt_set_errno(dtp, EDT_NOANON));
> > -
> > -		if (errno == E2BIG)
> > -			return (dt_set_errno(dtp, EDT_ENDTOOBIG));
> > -
> > -		if (errno == ENOSPC)
> > -			return (dt_set_errno(dtp, EDT_BUFTOOSMALL));
> > -
> > -		return (dt_set_errno(dtp, errno));
> > -	}
> > -#endif
> >   
> >   	dtp->dt_active = 1;
> > +	dtp->dt_beganon = dt_state_get_beganon(dtp);
> > +
> > +	/*
> > +	 * An exit() action during the BEGIN probe processing will cause the
> > +	 * activity state to become STOPPED once the BEGIN probe is done.  We
> > +	 * need to move it back to DRAINING in that case.
> > +	 */
> > +	if (dt_state_get_activity(dtp) == DT_ACTIVITY_STOPPED)
> > +		dt_state_set_activity(dtp, DT_ACTIVITY_DRAINING);
> >   
> >   #if 0
> >   	if (dt_options_load(dtp) == -1)
> > @@ -275,19 +221,15 @@ dtrace_go(dtrace_hdl_t *dtp, uint_t cflags)
> >   int
> >   dtrace_stop(dtrace_hdl_t *dtp)
> >   {
> > -	int gen = dtp->dt_statusgen;
> > +	int		gen = dtp->dt_statusgen;
> >   
> >   	if (dtp->dt_stopped)
> > -		return (0);
> > -
> > -#if 0
> > -	if (dt_ioctl(dtp, DTRACEIOC_STOP, &dtp->dt_endedon) == -1)
> > -		return (dt_set_errno(dtp, errno));
> > -#endif
> > +		return 0;
> >   
> >   	END_probe();
> >   
> >   	dtp->dt_stopped = 1;
> > +	dtp->dt_endedon = dt_state_get_endedon(dtp);
> >   
> >   #if 0
> >   	/*
> > @@ -299,9 +241,9 @@ dtrace_stop(dtrace_hdl_t *dtp)
> >   
> >   	if (dt_handle_status(dtp, &dtp->dt_status[gen ^ 1],
> >   	    &dtp->dt_status[gen]) == -1)
> > -		return (-1);
> > +		return -1;
> >   
> > -	return (0);
> > +	return 0;
> >   }
> >   
> >   #if 0
> > @@ -363,6 +305,26 @@ dtrace_workstatus_t
> >   dtrace_work(dtrace_hdl_t *dtp, FILE *fp, dtrace_consume_probe_f *pfunc,
> >   	    dtrace_consume_rec_f *rfunc, void *arg)
> >   {
> > -	return dtrace_consume(dtp, fp, pfunc, rfunc, arg);
> > +	int			status = dtrace_status(dtp);
> > +	dtrace_workstatus_t	rval;
> > +
> > +	switch (dtrace_status(dtp)) {
> > +	case DTRACE_STATUS_EXITED:
> > +	case DTRACE_STATUS_STOPPED:
> > +		rval = DTRACE_WORKSTATUS_DONE;
> > +		break;
> > +	case DTRACE_STATUS_NONE:
> > +	case DTRACE_STATUS_OKAY:
> > +		rval = DTRACE_WORKSTATUS_OKAY;
> > +		break;
> > +	default:
> > +		return DTRACE_WORKSTATUS_ERROR;
> > +	}
> > +
> > +	if (dtrace_consume(dtp, fp, pfunc, rfunc, arg) ==
> > +	    DTRACE_WORKSTATUS_ERROR)
> > +		return DTRACE_WORKSTATUS_ERROR;
> > +
> > +	return rval;
> >   }
> >   #endif
> > diff --git a/libdtrace/dtrace.h b/libdtrace/dtrace.h
> > index 1353a1fc..17809a56 100644
> > --- a/libdtrace/dtrace.h
> > +++ b/libdtrace/dtrace.h
> > @@ -166,7 +166,8 @@ typedef enum {
> >   #define	DTRACE_CONSUME_ERROR		-1	/* error while processing */
> >   #define	DTRACE_CONSUME_THIS		0	/* consume this probe/record */
> >   #define	DTRACE_CONSUME_NEXT		1	/* advance to next probe/rec */
> > -#define	DTRACE_CONSUME_ABORT		2	/* abort consumption */
> > +#define	DTRACE_CONSUME_DONE		2	/* advance to next buffer */
> > +#define	DTRACE_CONSUME_ABORT		3	/* abort consumption */
> >   
> >   typedef struct dtrace_probedata {
> >   	dtrace_hdl_t *dtpda_handle;		/* handle to DTrace library */
> > diff --git a/test/unittest/actions/exit/tst.begin-exit.d b/test/unittest/actions/exit/tst.begin-exit.d
> > new file mode 100644
> > index 00000000..53c95951
> 
> I don't know what to make of this.  I take it this is a verbatim copy of 
> test/unittest/providers/tst.beginexit.d .  It seems "bad style" (or 
> whatever you want to call it) to keep two replicas of exactly the same 
> test.  If you're trying to consolidate a bunch of similar tests here, 
> maybe the thing to do is eliminate the old copy.

Cleanup of the tests is left for a later patch where we go through a lot more
of the testsuite.  Although this is effectively a duplicate test, it is being
added as part of the patch because it exercises this specific functionality.

The merit for retaining the original copy of the patch, modify it, or remove
is to be evaluated when we tackle testsuite cleanup.

> > --- /dev/null
> > +++ b/test/unittest/actions/exit/tst.begin-exit.d
> > @@ -0,0 +1,22 @@
> > +/*
> > + * Oracle Linux DTrace.
> > + * Copyright (c) 2006, 2020, Oracle and/or its affiliates. All rights reserved.
> > + * Licensed under the Universal Permissive License v 1.0 as shown at
> > + * http://oss.oracle.com/licenses/upl.
> > + */
> > +
> > +BEGIN
> > +{
> > +	exit(0);
> > +}
> > +
> > +BEGIN
> > +{
> > +	printf("shouldn't be here...");
> > +	here++;
> > +}
> > +
> > +END
> > +{
> > +	exit(here);
> > +}
> > diff --git a/test/unittest/actions/exit/tst.begin-exit.r b/test/unittest/actions/exit/tst.begin-exit.r
> > new file mode 100644
> > index 00000000..6383c084
> > --- /dev/null
> > +++ b/test/unittest/actions/exit/tst.begin-exit.r
> > @@ -0,0 +1,6 @@
> > +                   FUNCTION:NAME
> > +                          :BEGIN
> > +                            :END
> > +
> > +-- @@stderr --
> > +dtrace: script 'test/unittest/actions/exit/tst.begin-exit.d' matched 3 probes
> > diff --git a/test/unittest/actions/exit/tst.end-exit.d b/test/unittest/actions/exit/tst.end-exit.d
> > new file mode 100644
> > index 00000000..34fcfd52
> > --- /dev/null
> > +++ b/test/unittest/actions/exit/tst.end-exit.d
> > @@ -0,0 +1,16 @@
> > +/*
> > + * Oracle Linux DTrace.
> > + * Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
> > + * Licensed under the Universal Permissive License v 1.0 as shown at
> > + * http://oss.oracle.com/licenses/upl.
> > + */
> > +
> > +BEGIN
> > +{
> > +	exit(1);
> > +}
> > +
> > +END
> > +{
> > +	exit(0);
> > +}
> > diff --git a/test/unittest/actions/exit/tst.end-exit.r b/test/unittest/actions/exit/tst.end-exit.r
> > new file mode 100644
> > index 00000000..5256e406
> > --- /dev/null
> > +++ b/test/unittest/actions/exit/tst.end-exit.r
> > @@ -0,0 +1,6 @@
> > +                   FUNCTION:NAME
> > +                          :BEGIN
> > +                            :END
> > +
> > +-- @@stderr --
> > +dtrace: script 'test/unittest/actions/exit/tst.end-exit.d' matched 2 probes
> > diff --git a/test/unittest/actions/exit/tst.probe-begin-exit.d b/test/unittest/actions/exit/tst.probe-begin-exit.d
> > new file mode 100644
> > index 00000000..44b2bf28
> > --- /dev/null
> > +++ b/test/unittest/actions/exit/tst.probe-begin-exit.d
> > @@ -0,0 +1,20 @@
> > +/*
> > + * Oracle Linux DTrace.
> > + * Copyright (c) 2006, 2020, Oracle and/or its affiliates. All rights reserved.
> 
> This is a new file with a 2006-2020 copyright?  I suspect the 2006 
> should be removed?  That's how tst.end-exit.d seemed to have been handled.
> 
> Same comment applies to the next two .d files as well.

Yes, copyright statements were not updated - thans for catching that.
> 
> > + * Licensed under the Universal Permissive License v 1.0 as shown at
> > + * http://oss.oracle.com/licenses/upl.
> > + */
> > +
> > +BEGIN
> > +{
> > +	exit(0);
> > +}
> > +
> > +write:entry
> > +{
> > +	exit(1);
> > +}
> > +
> > +END
> > +{
> > +}
> > diff --git a/test/unittest/actions/exit/tst.probe-begin-exit.r b/test/unittest/actions/exit/tst.probe-begin-exit.r
> > new file mode 100644
> > index 00000000..531c68fe
> > --- /dev/null
> > +++ b/test/unittest/actions/exit/tst.probe-begin-exit.r
> > @@ -0,0 +1,6 @@
> > +                   FUNCTION:NAME
> > +                          :BEGIN
> > +                            :END
> > +
> > +-- @@stderr --
> > +dtrace: script 'test/unittest/actions/exit/tst.probe-begin-exit.d' matched 3 probes
> > diff --git a/test/unittest/actions/exit/tst.probe-end-exit.d b/test/unittest/actions/exit/tst.probe-end-exit.d
> > new file mode 100644
> > index 00000000..66da68d0
> > --- /dev/null
> > +++ b/test/unittest/actions/exit/tst.probe-end-exit.d
> > @@ -0,0 +1,20 @@
> > +/*
> > + * Oracle Linux DTrace.
> > + * Copyright (c) 2006, 2020, Oracle and/or its affiliates. All rights reserved.
> > + * Licensed under the Universal Permissive License v 1.0 as shown at
> > + * http://oss.oracle.com/licenses/upl.
> > + */
> > +
> > +BEGIN
> > +{
> > +}
> > +
> > +write:entry
> > +{
> > +	exit(1);
> > +}
> > +
> > +END
> > +{
> > +	exit(0);
> > +}
> > diff --git a/test/unittest/actions/exit/tst.probe-end-exit.r b/test/unittest/actions/exit/tst.probe-end-exit.r
> > new file mode 100644
> > index 00000000..e9ea8e38
> > --- /dev/null
> > +++ b/test/unittest/actions/exit/tst.probe-end-exit.r
> > @@ -0,0 +1,7 @@
> > +                   FUNCTION:NAME
> > +                          :BEGIN
> > +                     write:entry
> > +                            :END
> > +
> > +-- @@stderr --
> > +dtrace: script 'test/unittest/actions/exit/tst.probe-end-exit.d' matched 3 probes
> > diff --git a/test/unittest/actions/exit/tst.probe-exit.d b/test/unittest/actions/exit/tst.probe-exit.d
> > new file mode 100644
> > index 00000000..8bf8ef47
> > --- /dev/null
> > +++ b/test/unittest/actions/exit/tst.probe-exit.d
> > @@ -0,0 +1,19 @@
> > +/*
> > + * Oracle Linux DTrace.
> > + * Copyright (c) 2006, 2020, Oracle and/or its affiliates. All rights reserved.
> > + * Licensed under the Universal Permissive License v 1.0 as shown at
> > + * http://oss.oracle.com/licenses/upl.
> > + */
> > +
> > +BEGIN
> > +{
> > +}
> > +
> > +write:entry
> > +{
> > +	exit(0);
> > +}
> > +
> > +END
> > +{
> > +}
> > diff --git a/test/unittest/actions/exit/tst.probe-exit.r b/test/unittest/actions/exit/tst.probe-exit.r
> > new file mode 100644
> > index 00000000..ce51aaf4
> > --- /dev/null
> > +++ b/test/unittest/actions/exit/tst.probe-exit.r
> > @@ -0,0 +1,7 @@
> > +                   FUNCTION:NAME
> > +                          :BEGIN
> > +                     write:entry
> > +                            :END
> > +
> > +-- @@stderr --
> > +dtrace: script 'test/unittest/actions/exit/tst.probe-exit.d' matched 3 probes
> 
> _______________________________________________
> DTrace-devel mailing list
> DTrace-devel at oss.oracle.com
> https://oss.oracle.com/mailman/listinfo/dtrace-devel



More information about the DTrace-devel mailing list