[rds-devel] [PATCH RFC net-next 5/6] rds: support for zcopy completion notification

Sowmini Varadhan sowmini.varadhan at oracle.com
Wed Jan 17 04:20:03 PST 2018


RDS removes a datagram from the retransmit queue when an ACK is
received. The ACK indicates that the receiver has queued the
RDS datagram, so that the sender can safely forget the datagram.

If the datagram to be removed had pinned pages set up, add
an entry to the rs->rs_znotify_queue so that the notifcation
will be sent up via rds_rm_zerocopy_callback() when the
rds_message is eventually freed by rds_message_purge.

Signed-off-by: Sowmini Varadhan <sowmini.varadhan at oracle.com>
---
 net/rds/af_rds.c  |    3 ++
 net/rds/message.c |   67 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
 net/rds/rds.h     |   13 +++++++++-
 net/rds/recv.c    |    3 ++
 net/rds/send.c    |    7 +++++
 5 files changed, 91 insertions(+), 2 deletions(-)

diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c
index b405f77..23126db 100644
--- a/net/rds/af_rds.c
+++ b/net/rds/af_rds.c
@@ -183,6 +183,8 @@ static unsigned int rds_poll(struct file *file, struct socket *sock,
 		mask |= (POLLIN | POLLRDNORM);
 	if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
 		mask |= (POLLOUT | POLLWRNORM);
+	if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue))
+		mask |= POLLERR;
 	read_unlock_irqrestore(&rs->rs_recv_lock, flags);
 
 	/* clear state any time we wake a seen-congested socket */
@@ -511,6 +513,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
 	INIT_LIST_HEAD(&rs->rs_send_queue);
 	INIT_LIST_HEAD(&rs->rs_recv_queue);
 	INIT_LIST_HEAD(&rs->rs_notify_queue);
+	INIT_LIST_HEAD(&rs->rs_znotify_queue);
 	INIT_LIST_HEAD(&rs->rs_cong_list);
 	spin_lock_init(&rs->rs_rdma_lock);
 	rs->rs_rdma_keys = RB_ROOT;
diff --git a/net/rds/message.c b/net/rds/message.c
index ef3daaf..25c74b3 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -33,6 +33,9 @@
 #include <linux/kernel.h>
 #include <linux/slab.h>
 #include <linux/export.h>
+#include <linux/skbuff.h>
+#include <linux/list.h>
+#include <linux/errqueue.h>
 
 #include "rds.h"
 
@@ -53,6 +56,64 @@ void rds_message_addref(struct rds_message *rm)
 }
 EXPORT_SYMBOL_GPL(rds_message_addref);
 
+static void rds_rm_zerocopy_callback(struct rds_sock *rs)
+{
+	struct sock *sk = rds_rs_to_sk(rs);
+	struct sk_buff *skb;
+	struct sock_exterr_skb *serr;
+	struct sk_buff_head *q;
+	unsigned long flags;
+	struct sk_buff *tail;
+	u32 *ptr;
+	int ncookies = 0, i;
+	struct rds_znotifier *znotif, *ztmp;
+	LIST_HEAD(tmp_list);
+
+	spin_lock_irqsave(&rs->rs_lock, flags);
+	list_splice(&rs->rs_znotify_queue, &tmp_list);
+	INIT_LIST_HEAD(&rs->rs_znotify_queue);
+	spin_unlock_irqrestore(&rs->rs_lock, flags);
+
+	list_for_each_entry_safe(znotif, ztmp, &tmp_list, z_list)
+		ncookies++;
+	if (ncookies == 0)
+		return;
+	skb = alloc_skb(ncookies * sizeof(u32), GFP_ATOMIC);
+	if (!skb) {
+		spin_lock_irqsave(&rs->rs_lock, flags);
+		list_splice(&tmp_list, &rs->rs_znotify_queue);
+		spin_unlock_irqrestore(&rs->rs_lock, flags);
+		return;
+	}
+	serr = SKB_EXT_ERR(skb);
+	serr->ee.ee_errno = 0;
+	serr->ee.ee_origin = SO_EE_ORIGIN_ZEROCOPY;
+	serr->ee.ee_data = ncookies;
+	serr->ee.ee_info = 0;
+	serr->ee.ee_code |= SO_EE_CODE_ZEROCOPY_COPIED;
+	ptr = skb_put(skb, ncookies * sizeof(u32));
+
+	i = 0;
+	list_for_each_entry_safe(znotif, ztmp, &tmp_list, z_list) {
+		list_del(&znotif->z_list);
+		ptr[i++] = znotif->z_cookie;
+		mm_unaccount_pinned_pages(&znotif->z_mmp);
+		kfree(znotif);
+	}
+	WARN_ON(!list_empty(&tmp_list));
+	q = &sk->sk_error_queue;
+	spin_lock_irqsave(&q->lock, flags);
+	tail = skb_peek_tail(q);
+	if (!tail ||
+	    SKB_EXT_ERR(tail)->ee.ee_origin != SO_EE_ORIGIN_ZEROCOPY)  {
+		__skb_queue_tail(q, skb);
+		skb = NULL;
+	}
+	spin_unlock_irqrestore(&q->lock, flags);
+	sk->sk_error_report(sk);
+	consume_skb(skb);
+}
+
 /*
  * This relies on dma_map_sg() not touching sg[].page during merging.
  */
@@ -66,11 +127,15 @@ static void rds_message_purge(struct rds_message *rm)
 	for (i = 0; i < rm->data.op_nents; i++) {
 		rdsdebug("putting data page %p\n", (void *)sg_page(&rm->data.op_sg[i]));
 		/* XXX will have to put_page for page refs */
-		__free_page(sg_page(&rm->data.op_sg[i]));
+		if (!rm->data.op_zcopy)
+			__free_page(sg_page(&rm->data.op_sg[i]));
+		else
+			put_page(sg_page(&rm->data.op_sg[i]));
 	}
 	rm->data.op_nents = 0;
 	spin_lock_irqsave(&rm->m_rs_lock, flags);
 	if (rm->m_rs) {
+		rds_rm_zerocopy_callback(rm->m_rs);
 		sock_put(rds_rs_to_sk(rm->m_rs));
 		rm->m_rs = NULL;
 	}
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 374ae83..de5015a 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -356,6 +356,12 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie)
 #define RDS_MSG_PAGEVEC		7
 #define RDS_MSG_FLUSH		8
 
+struct rds_znotifier {
+	struct list_head	z_list;
+	u32			z_cookie;
+	struct mmpin		z_mmp;
+};
+
 struct rds_message {
 	refcount_t		m_refcount;
 	struct list_head	m_sock_item;
@@ -431,11 +437,14 @@ struct rds_message {
 		} rdma;
 		struct rm_data_op {
 			unsigned int		op_active:1;
-			unsigned int		op_notify:1;
+			unsigned int		op_notify:1,
+						op_zcopy:1,
+						op_pad_to_32:30;
 			unsigned int		op_nents;
 			unsigned int		op_count;
 			unsigned int		op_dmasg;
 			unsigned int		op_dmaoff;
+			struct rds_znotifier	*op_mmp_znotifier;
 			struct scatterlist	*op_sg;
 		} data;
 	};
@@ -588,6 +597,8 @@ struct rds_sock {
 	/* Socket receive path trace points*/
 	u8			rs_rx_traces;
 	u8			rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX];
+
+	struct list_head	rs_znotify_queue; /* zerocopy completion */
 };
 
 static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)
diff --git a/net/rds/recv.c b/net/rds/recv.c
index b25bcfe..043f667 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -594,6 +594,9 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
 
 	if (msg_flags & MSG_OOB)
 		goto out;
+	if (msg_flags & MSG_ERRQUEUE)
+		return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR,
+					  msg_flags);
 
 	while (1) {
 		/* If there are pending notifications, do those - and nothing else */
diff --git a/net/rds/send.c b/net/rds/send.c
index 5ac0925..5c38ce3 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -635,7 +635,14 @@ static void rds_send_remove_from_sock(struct list_head *messages, int status)
 		if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) {
 			struct rm_rdma_op *ro = &rm->rdma;
 			struct rds_notifier *notifier;
+			struct rds_znotifier *znotifier;
 
+			if (rm->data.op_zcopy) {
+				znotifier = rm->data.op_mmp_znotifier;
+				list_add_tail(&znotifier->z_list,
+					      &rs->rs_znotify_queue);
+				rm->data.op_mmp_znotifier = NULL;
+			}
 			list_del_init(&rm->m_sock_item);
 			rds_send_sndbuf_remove(rs, rm);
 
-- 
1.7.1




More information about the rds-devel mailing list