[DTrace-devel] [PATCH 1/1] tools/dtrace: initial implementation of DTrace

Kris Van Hees kris.van.hees at oracle.com
Mon Jul 8 09:48:10 PDT 2019


On Thu, Jul 04, 2019 at 03:03:36PM +0200, Peter Zijlstra wrote:
> On Wed, Jul 03, 2019 at 08:14:30PM -0700, Kris Van Hees wrote:
> > +/*
> > + * Read the data_head offset from the header page of the ring buffer.  The
> > + * argument is declared 'volatile' because it references a memory mapped page
> > + * that the kernel may be writing to while we access it here.
> > + */
> > +static u64 read_rb_head(volatile struct perf_event_mmap_page *rb_page)
> > +{
> > +	u64	head = rb_page->data_head;
> > +
> > +	asm volatile("" ::: "memory");
> > +
> > +	return head;
> > +}
> > +
> > +/*
> > + * Write the data_tail offset in the header page of the ring buffer.  The
> > + * argument is declared 'volatile' because it references a memory mapped page
> > + * that the kernel may be writing to while we access it here.
> 
> s/writing/reading/

Thanks!

> > + */
> > +static void write_rb_tail(volatile struct perf_event_mmap_page *rb_page,
> > +			  u64 tail)
> > +{
> > +	asm volatile("" ::: "memory");
> > +
> > +	rb_page->data_tail = tail;
> > +}
> 
> That volatile usage is atrocious (kernel style would have you use
> {READ,WRITE}_ONCE()). Also your comments fail to mark these as
> load_acquire and store_release. And by only using a compiler barrier
> you're hard assuming TSO, which is somewhat fragile at best.
> 
> Alternatively, you can use the C11 bits and write:
> 
> 	return __atomic_load_n(&rb_page->data_head, __ATOMIC_ACQUIRE);
> 
> 	__atomic_store_n(&rb_page->data_tail, tail, __ATOMIC_RELEASE);

Perhaps I should just use ring_buffer_read_head() and ring_buffer_write_tail()
since they are provided in tools/include/linux/ring_buffer.h?  I expect that
would be even more preferable over __atomic_load_n() and __atomic_store_n()?

> > +/*
> > + * Process and output the probe data at the supplied address.
> > + */
> > +static int output_event(int cpu, u64 *buf)
> > +{
> > +	u8				*data = (u8 *)buf;
> > +	struct perf_event_header	*hdr;
> > +
> > +	hdr = (struct perf_event_header *)data;
> > +	data += sizeof(struct perf_event_header);
> > +
> > +	if (hdr->type == PERF_RECORD_SAMPLE) {
> > +		u8		*ptr = data;
> > +		u32		i, size, probe_id;
> > +
> > +		/*
> > +		 * struct {
> > +		 *	struct perf_event_header	header;
> > +		 *	u32				size;
> > +		 *	u32				probe_id;
> > +		 *	u32				gap;
> > +		 *	u64				data[n];
> > +		 * }
> > +		 * and data points to the 'size' member at this point.
> > +		 */
> > +		if (ptr > (u8 *)buf + hdr->size) {
> > +			fprintf(stderr, "BAD: corrupted sample header\n");
> > +			goto out;
> > +		}
> > +
> > +		size = *(u32 *)data;
> > +		data += sizeof(size);
> > +		ptr += sizeof(size) + size;
> > +		if (ptr != (u8 *)buf + hdr->size) {
> > +			fprintf(stderr, "BAD: invalid sample size\n");
> > +			goto out;
> > +		}
> > +
> > +		probe_id = *(u32 *)data;
> > +		data += sizeof(probe_id);
> > +		size -= sizeof(probe_id);
> > +		data += sizeof(u32);		/* skip 32-bit gap */
> > +		size -= sizeof(u32);
> > +		buf = (u64 *)data;
> > +
> > +		printf("%3d %6d ", cpu, probe_id);
> > +		for (i = 0, size /= sizeof(u64); i < size; i++)
> > +			printf("%#016lx ", buf[i]);
> > +		printf("\n");
> > +	} else if (hdr->type == PERF_RECORD_LOST) {
> > +		u64	lost;
> > +
> > +		/*
> > +		 * struct {
> > +		 *	struct perf_event_header	header;
> > +		 *	u64				id;
> > +		 *	u64				lost;
> > +		 * }
> > +		 * and data points to the 'id' member at this point.
> > +		 */
> > +		lost = *(u64 *)(data + sizeof(u64));
> > +
> > +		printf("[%ld probes dropped]\n", lost);
> > +	} else
> > +		fprintf(stderr, "UNKNOWN: record type %d\n", hdr->type);
> > +
> > +out:
> > +	return hdr->size;
> > +}
> 
> I see a distinct lack of wrapping support. AFAICT when buf+hdr->size
> wraps you're doing out-of-bounds accesses.

Yes, that is correct.  I'm actually trying to figure out why it didn't actually
cause a SEGV when I tested this because I'm clearly reading past the end of
the mmap'd memory.  Thank you for noticing this - I was trying to be too
minimal in the code I was putting out and really didn't pay attention to this.

Fixed in the V2 I am preparing.

> > +/*
> > + * Process the available probe data in the given buffer.
> > + */
> > +static void process_data(struct dtrace_buffer *buf)
> > +{
> > +	/* This is volatile because the kernel may be updating the content. */
> > +	volatile struct perf_event_mmap_page	*rb_page = buf->base;
> > +	u8					*base = (u8 *)buf->base +
> > +							buf->page_size;
> > +	u64					head = read_rb_head(rb_page);
> > +
> > +	while (rb_page->data_tail != head) {
> > +		u64	tail = rb_page->data_tail;
> > +		u64	*ptr = (u64 *)(base + tail % buf->data_size);
> > +		int	len;
> > +
> > +		len = output_event(buf->cpu, ptr);
> > +
> > +		write_rb_tail(rb_page, tail + len);
> > +		head = read_rb_head(rb_page);
> > +	}
> > +}
> 
> more volatile yuck.
> 
> Also:
> 
> 	for (;;) {
> 		head = __atomic_load_n(&rb_page->data_head, __ATOMIC_ACQUIRE);
> 		tail = __atomic_load_n(&rb_page->data_tail, __ATOMIC_RELAXED);
> 
> 		if (head == tail)
> 			break;
> 
> 		do {
> 			hdr = buf->base + (tail & ((1UL << buf->data_shift) - 1));
> 			if ((tail >> buf->data_shift) !=
> 			    ((tail + hdr->size) >> buf->data_shift))
> 				/* handle wrap case */
> 			else
> 				/* normal case */
> 
> 			tail += hdr->size;
> 		} while (tail != head);
> 
> 		__atomic_store_n(&rb_page->data_tail, tail, __ATOMIC_RELEASE);
> 	}
> 
> Or something.

Thank you for this suggestion.  As mentioned above, I lean towards using the
provided ring_buffer_(read_head,write_tail) implementations since that is the
'other end' of the ring buffer head/tail mechanism that is going to be kept
in sync with any changes that might happen on the kernel side, right?

> > +/*
> > + * Wait for data to become available in any of the buffers.
> > + */
> > +int dt_buffer_poll(int epoll_fd, int timeout)
> > +{
> > +	struct epoll_event	events[dt_numcpus];
> > +	int			i, cnt;
> > +
> > +	cnt = epoll_wait(epoll_fd, events, dt_numcpus, timeout);
> > +	if (cnt < 0)
> > +		return -errno;
> > +
> > +	for (i = 0; i < cnt; i++)
> > +		process_data((struct dtrace_buffer *)events[i].data.ptr);
> > +
> > +	return cnt;
> > +}
> 
> Or make sure to read on the CPU by having a poll thread per CPU, then
> you can do away with the memory barriers.

That is definitely something for the todo list for future optimizations.

Thanks for your review and code suggestions.

	Kris



More information about the DTrace-devel mailing list