[rds-commits] zab commits r126 - trunk/linux/net/rds

svn-commits@oss.oracle.com svn-commits at oss.oracle.com
Wed Jul 5 17:40:24 CDT 2006


Author: zab
Date: 2006-07-05 17:40:23 -0500 (Wed, 05 Jul 2006)
New Revision: 126

Modified:
   trunk/linux/net/rds/ib.h
   trunk/linux/net/rds/ib_cm.c
   trunk/linux/net/rds/ib_recv.c
   trunk/linux/net/rds/ib_send.c
   trunk/linux/net/rds/rds.h
   trunk/linux/net/rds/threads.c
Log:
Fix many bugs in IB connection shutdown so that the module can be removed.

Allocate an entire rds_ib_connect, not just a pointer to it :/.

Disconnect the QP on shutdown and wait for the posted receive completions
to drain.  In recv completion notice if it's completing in error and don't
repost or process it in any way.

Make sure to reset the rds_ib_connection fields after clearing them in
rds_ib_conn_shutdown.  To be nice, though, we also modify rds_shutdown_worker
to only call shutdown once for each successfull conn_connect call.

Add some helpers for packing the pointer and op type into wr_id.

Add an rds_ib_connection field to throttle the warnings that come from
posted receive failure.


Modified: trunk/linux/net/rds/ib.h
===================================================================
--- trunk/linux/net/rds/ib.h	2006-07-05 18:17:20 UTC (rev 125)
+++ trunk/linux/net/rds/ib.h	2006-07-05 22:40:23 UTC (rev 126)
@@ -15,10 +15,6 @@
 #define RDS_IB_MAX_RECV_BUFS		32 /* XXX small for debugging */
 #define RDS_IB_MAX_SEND_BUFS		32
 
-#define RDS_IB_WR_ID_MASK		0x3
-#define RDS_IB_WR_ID_HEADER		0
-#define RDS_IB_WR_ID_DATA		1
-#define RDS_IB_WR_ID_RECV		2
 
 struct rds_ib_incoming {
 	struct list_head	ii_recvs;
@@ -66,6 +62,8 @@
 	struct ib_mr		*i_mr;
 	struct ib_cq		*i_cq;
 
+	unsigned long		i_warned;
+
 	/* tx */
 	struct rds_ib_data_send	*i_ds;
 
@@ -101,7 +99,8 @@
 			     size_t size);
 void rds_ib_inc_process_acks(struct rds_connection *conn,
 			      struct rds_incoming *inc, u16 nr);
-void rds_ib_recv_complete(struct rds_connection *conn, u64 wr_id);
+void rds_ib_recv_complete(struct rds_connection *conn, struct ib_wc *wc);
+extern wait_queue_head_t rds_ib_empty_wait;
 
 /* ib_send.c */
 int __init rds_ib_send_init(void);
@@ -113,7 +112,29 @@
 int rds_ib_xmit_data(struct rds_connection *conn,
 		      struct rds_message *rm, unsigned int sg,
 		      unsigned int off);
-void rds_ib_header_complete(struct rds_connection *conn, u64 wr_id);
-void rds_ib_data_complete(struct rds_connection *conn, u64 wr_id);
+void rds_ib_header_complete(struct rds_connection *conn, struct ib_wc *wc);
+void rds_ib_data_complete(struct rds_connection *conn, struct ib_wc *wc);
 
+#define RDS_IB_WR_ID_OP_MASK		0x3
+#define RDS_IB_WR_ID_OP_HEADER		0
+#define RDS_IB_WR_ID_OP_DATA		1
+#define RDS_IB_WR_ID_OP_RECV		2
+
+static inline void *rds_ib_wr_id_to_ptr(u64 wr_id)
+{
+	return (void *)((unsigned long)wr_id & ~RDS_IB_WR_ID_OP_MASK);
+}
+
+static inline unsigned long rds_ib_wr_id_to_op(u64 wr_id)
+{
+	return wr_id & RDS_IB_WR_ID_OP_MASK;
+}
+
+static inline u64 rds_ib_pack_wr_id(void *ptr, unsigned long op)
+{
+	BUG_ON((unsigned long)ptr & RDS_IB_WR_ID_OP_MASK);
+	BUG_ON(op & ~RDS_IB_WR_ID_OP_MASK);
+	return (unsigned long)ptr | op;
+}
+
 #endif

Modified: trunk/linux/net/rds/ib_cm.c
===================================================================
--- trunk/linux/net/rds/ib_cm.c	2006-07-05 18:17:20 UTC (rev 125)
+++ trunk/linux/net/rds/ib_cm.c	2006-07-05 22:40:23 UTC (rev 126)
@@ -46,38 +46,44 @@
 static void rds_ib_cq_comp_handler(struct ib_cq *cq, void *context)
 {
 	struct rds_connection *conn = context;
+	struct rds_ib_connection *ic = conn->c_transport_data;
 	struct ib_wc wc;
-	u64 wr_id;
-	int id_type;
 
 	rdsdebug("cq %p conn %p\n", cq, conn);
 
 	ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
 
 	while (ib_poll_cq(cq, 1, &wc) > 0 ) {
-		rdsdebug("wc op %u wr_id 0x%llx\n", wc.opcode,
-			 (unsigned long long)wc.wr_id);
+		rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
+			 (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
+			 be32_to_cpu(wc.imm_data));
 
-		id_type = wc.wr_id & RDS_IB_WR_ID_MASK;
-		wr_id = wc.wr_id & ~RDS_IB_WR_ID_MASK;
-
 		/* XXX watch for allocation failures and kick the thread */
-		switch(id_type) {
-			case RDS_IB_WR_ID_HEADER:
-				rds_ib_header_complete(conn, wr_id);
+		switch(rds_ib_wr_id_to_op(wc.wr_id)) {
+			case RDS_IB_WR_ID_OP_HEADER:
+				rds_ib_header_complete(conn, &wc);
 				break;
-			case RDS_IB_WR_ID_DATA:
-				rds_ib_data_complete(conn, wr_id);
+			case RDS_IB_WR_ID_OP_DATA:
+				rds_ib_data_complete(conn, &wc);
 				break;
-			case RDS_IB_WR_ID_RECV:
-				rds_ib_recv_complete(conn, wr_id);
+			case RDS_IB_WR_ID_OP_RECV:
+				rds_ib_recv_complete(conn, &wc);
 				break;
 			default:
-				printk(KERN_ERR "bogus type %u from id "
-				       "0x%llx\n", id_type,
+				printk(KERN_ERR "bogus type from id 0x%llx\n",
 				       (unsigned long long)wc.wr_id);
 				BUG();
 		}
+
+		/* We expect errors as the qp is drained during shutdown */
+		if (wc.status != IB_WC_SUCCESS &&
+		    !test_and_set_bit(0, &ic->i_warned)) {
+			printk(KERN_WARNING "RDS/IB: completion on "
+			       "%u.%u.%u.%u had status %u, disconnecting and "
+			       "reconnecting\n", NIPQUAD(conn->c_faddr),
+			       wc.status);
+			queue_work(rds_wq, &conn->c_shutdown_work);
+		}
 	}
 }
 
@@ -359,10 +365,15 @@
 		 ic->i_pd, ic->i_cq, 
 		 ic->i_cm_id ? ic->i_cm_id->qp : NULL);
 
-	if (ic->i_pool)
-		dma_pool_destroy(ic->i_pool);
+	/* suppress warnings */
+	set_bit(0, &ic->i_warned);
 
 	if (ic->i_cm_id) {
+		rdsdebug("disconnectiong cm %p\n", ic->i_cm_id);
+		rdma_disconnect(ic->i_cm_id);
+		/* XXX can this ever hang indefinitely? */
+		wait_event(rds_ib_empty_wait, !atomic_read(&ic->i_recv_posted));
+
 		if (ic->i_cm_id->qp)
 			rdma_destroy_qp(ic->i_cm_id);
 		if (ic->i_cq)
@@ -370,11 +381,22 @@
 		if (ic->i_pd)
 			ib_dealloc_pd(ic->i_pd);
 		rdma_destroy_id(ic->i_cm_id);
+
+		if (ic->i_pool)
+			dma_pool_destroy(ic->i_pool);
+
+		ic->i_cm_id = NULL;
+		ic->i_pd = NULL;
+		ic->i_cq = NULL;
+		ic->i_pool = NULL;
 	}
 
-	ic->i_cm_id = NULL;
-	ic->i_pd = NULL;
-	ic->i_cq = NULL;
+	if (ic->i_ibinc) {
+		rds_inc_put(&ic->i_ibinc->ii_inc);
+		ic->i_ibinc = NULL;
+	}
+
+	clear_bit(0, &ic->i_warned);
 }
 
 int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
@@ -382,7 +404,7 @@
 	struct rds_ib_connection *ic;
 
 	/* XXX too lazy? */
-	ic = kzalloc(sizeof(struct rds_ib_connection *), GFP_KERNEL);
+	ic = kzalloc(sizeof(struct rds_ib_connection), GFP_KERNEL);
 	if (ic == NULL)
 		return -ENOMEM;
 

Modified: trunk/linux/net/rds/ib_recv.c
===================================================================
--- trunk/linux/net/rds/ib_recv.c	2006-07-05 18:17:20 UTC (rev 125)
+++ trunk/linux/net/rds/ib_recv.c	2006-07-05 22:40:23 UTC (rev 126)
@@ -31,6 +31,7 @@
 
 static kmem_cache_t *rds_ib_incoming_slab;
 static kmem_cache_t *rds_ib_recv_slab;
+DECLARE_WAIT_QUEUE_HEAD(rds_ib_empty_wait);
 
 static void rds_ib_recv_free(struct rds_ib_recv *recv)
 {
@@ -203,38 +204,37 @@
  * We're relying on RC completions arriving in post order so that we always
  * get an orderly stream of message headers and then their data fragments.
  */
-void rds_ib_recv_complete(struct rds_connection *conn, u64 wr_id)
+void rds_ib_recv_complete(struct rds_connection *conn, struct ib_wc *wc)
 {
 	struct rds_ib_connection *ic = conn->c_transport_data;
-	struct rds_ib_recv *recv = (void *)(unsigned long)wr_id;
+	struct rds_ib_recv *recv = rds_ib_wr_id_to_ptr(wc->wr_id);
 	struct rds_ib_incoming *ibinc = ic->i_ibinc;
 
 	rdsdebug("ic %p recv %p sge len %u\n", ic, recv, recv->ir_sge.length);
 
-	atomic_dec(&ic->i_recv_posted);
+	/*
+	 * If the completion failed we only have to free it.  The caller
+	 * is going to tear down and reconnect so we'll be reposting all
+	 * new receive buffers.
+	 */
+	if (wc->status != IB_WC_SUCCESS) {
+		rds_ib_recv_unmap(ic, recv);
+		rds_ib_recv_free(recv);
+		goto out;
+	}
 
-	/* 
-	 * XXX worry about how we clean these up.. if qp shutdown leads
-	 * to a stream of completions then we'd want to not post again 
-	 * in that case.
+	/*
+	 * If this is just a header allocate an inc to track the upcoming
+	 * message payloads and repost this header page.
 	 */
-
 	if (ibinc == NULL) {
-		/* XXX pretty sure comp callbacks are in softirq? */
 		ibinc = rds_ib_inc_new(conn, recv->ir_page, GFP_ATOMIC);
 		ic->i_ibinc = ibinc;
 		ic->i_recv_data_rem = be32_to_cpu(ibinc->ii_inc.i_hdr.h_len);
 		rds_ib_recv_post_or_free(conn, recv);
-		return;
+		goto out;
 	}
 
-	/* 
-	 * pretty sure we shouldn't call dma_map_page from int, so we 
-	 * fall back to having the thread refill
-	 */
-	if (atomic_read(&ic->i_recv_posted) < RDS_IB_MAX_RECV_BUFS / 2)
-		queue_work(rds_wq, &conn->c_recv_work);
-
 	rds_ib_recv_unmap(ic, recv);
 	list_add_tail(&recv->ir_item, &ibinc->ii_recvs);
 
@@ -248,6 +248,16 @@
 				  &ibinc->ii_inc, GFP_ATOMIC, KM_SOFTIRQ0);
 		rds_inc_put(&ibinc->ii_inc);
 	}
+
+out:
+	if (atomic_dec_and_test(&ic->i_recv_posted))
+		wake_up(&rds_ib_empty_wait);
+	/* 
+	 * pretty sure we shouldn't call dma_map_page from int, so we 
+	 * fall back to having the thread refill
+	 */
+	if (atomic_read(&ic->i_recv_posted) < RDS_IB_MAX_RECV_BUFS / 2)
+		queue_work(rds_wq, &conn->c_recv_work);
 }
 
 int rds_ib_recv_refill(struct rds_connection *conn, gfp_t gfp)
@@ -287,7 +297,8 @@
 		recv->ir_sge.lkey = ic->i_mr->lkey;
 
 		recv->ir_wr.next = NULL;
-		recv->ir_wr.wr_id = ((unsigned long)recv) | RDS_IB_WR_ID_RECV;
+		recv->ir_wr.wr_id = rds_ib_pack_wr_id(recv,
+						      RDS_IB_WR_ID_OP_RECV);
 		recv->ir_wr.sg_list = &recv->ir_sge;
 		recv->ir_wr.num_sge = 1;
 

Modified: trunk/linux/net/rds/ib_send.c
===================================================================
--- trunk/linux/net/rds/ib_send.c	2006-07-05 18:17:20 UTC (rev 125)
+++ trunk/linux/net/rds/ib_send.c	2006-07-05 22:40:23 UTC (rev 126)
@@ -39,12 +39,12 @@
 {
 }
 
-void rds_ib_header_complete(struct rds_connection *conn, u64 wr_id)
+void rds_ib_header_complete(struct rds_connection *conn, struct ib_wc *wc)
 {
-	struct rds_ib_header_send *ih = (void *)(long)wr_id;
+	struct rds_ib_header_send *ih = rds_ib_wr_id_to_ptr(wc->wr_id);
 	struct rds_ib_connection *ic = conn->c_transport_data;
 
-	rdsdebug("conn %p wr_id 0x%llx\n", conn, (unsigned long long)wr_id);
+	rdsdebug("conn %p ih %p\n", conn, ih);
 
 	dma_pool_free(ic->i_pool, ih->ih_vaddr, ih->ih_dma);
 	kmem_cache_free(rds_ib_header_send_slab, ih);
@@ -81,7 +81,7 @@
 	ih->ih_sge.lkey = ic->i_mr->lkey;
 
 	ih->ih_wr.next = NULL;
-	ih->ih_wr.wr_id = ((unsigned long)ih) | RDS_IB_WR_ID_HEADER;
+	ih->ih_wr.wr_id = rds_ib_pack_wr_id(ih, RDS_IB_WR_ID_OP_HEADER);
 	ih->ih_wr.sg_list = &ih->ih_sge;
 	ih->ih_wr.num_sge = 1;
 	ih->ih_wr.opcode = IB_WR_SEND;
@@ -126,9 +126,9 @@
 	}
 }
 
-void rds_ib_data_complete(struct rds_connection *conn, u64 wr_id)
+void rds_ib_data_complete(struct rds_connection *conn, struct ib_wc *wc)
 {
-	struct rds_ib_data_send *ds = (void *)(unsigned long)wr_id;
+	struct rds_ib_data_send *ds = rds_ib_wr_id_to_ptr(wc->wr_id);
 	rds_ib_ds_put(conn, ds);
 }
 
@@ -191,7 +191,7 @@
 			wr->next = wr + 1;
 		else
 			wr->next = NULL;
-		wr->wr_id = ((unsigned long)ds) | RDS_IB_WR_ID_DATA;
+		wr->wr_id = rds_ib_pack_wr_id(ds, RDS_IB_WR_ID_OP_DATA);
 		wr->sg_list = sge;
 		wr->num_sge = 1;
 		wr->opcode = IB_WR_SEND;
@@ -254,9 +254,7 @@
 	if (first < nr_work - 1)
 		ret += (nr_work - 1 - first) * RDS_FRAG_SIZE;
 
-	/* ok, we finished the message, drop our ds ref */
-	/* XXX this leaves the ds in flight, conn reset will have to 
-	 * force completion of posted work reqs somehow */
+	/* once the message is posted the completion handlers will clean up */
 	ic->i_ds = NULL;
 	rds_ib_ds_put(conn, ds);
 out:

Modified: trunk/linux/net/rds/rds.h
===================================================================
--- trunk/linux/net/rds/rds.h	2006-07-05 18:17:20 UTC (rev 125)
+++ trunk/linux/net/rds/rds.h	2006-07-05 22:40:23 UTC (rev 126)
@@ -159,10 +159,9 @@
  *
  * @conn_shutdown: conn_shutdown stops traffic on the given connection.  Once
  *                 it returns the connection can not call rds_recv_incoming().
- *                 This can be called before conn_connect and it should be safe
- *                 for it to be called multiple times without intervening
- *                 conn_connect calls.  The caller serializes this with the
- *                 send and connecting paths (xmit_* and conn_*).  The
+ *                 This will only be called once after conn_connect returns
+ *                 non-zero success and will The caller serializes this with
+ *                 the send and connecting paths (xmit_* and conn_*).  The
  *                 transport is responsible for other serialization, including
  *                 rds_recv_incoming().  This is called in process context but
  *                 should try hard not to block.

Modified: trunk/linux/net/rds/threads.c
===================================================================
--- trunk/linux/net/rds/threads.c	2006-07-05 18:17:20 UTC (rev 125)
+++ trunk/linux/net/rds/threads.c	2006-07-05 22:40:23 UTC (rev 126)
@@ -87,19 +87,17 @@
 	struct rds_connection *conn = arg;
 	int was_conn;
 
-	/* first shut it down */
-	down(&conn->c_send_sem);
-
-	clear_bit(RDS_CONN_CONNECTING, &conn->c_status);
+	/* shut it down if .conn_connect succeeded */
 	was_conn = test_and_clear_bit(RDS_CONN_CONNECTED, &conn->c_status);
-	cancel_delayed_work(&conn->c_connect_work);
+	if (test_and_clear_bit(RDS_CONN_CONNECTING, &conn->c_status)) {
+		down(&conn->c_send_sem);
+		conn->c_trans->conn_shutdown(conn);
+		rds_conn_reset(conn);
+		up(&conn->c_send_sem);
+	}
 
-	conn->c_trans->conn_shutdown(conn);
-	rds_conn_reset(conn);
-
-	up(&conn->c_send_sem);
-
 	/* then reconnect if it's still live */
+	cancel_delayed_work(&conn->c_connect_work);
 	if (!hlist_unhashed(&conn->c_hash_node)) {
 		if (!was_conn)
 			rds_queue_delayed_reconnect(conn);




More information about the rds-commits mailing list