[rds-commits] zab commits r113 - trunk/linux/net/rds
svn-commits@oss.oracle.com
svn-commits at oss.oracle.com
Fri Jun 23 17:51:40 CDT 2006
Author: zab
Date: 2006-06-23 17:51:38 -0500 (Fri, 23 Jun 2006)
New Revision: 113
Added:
trunk/linux/net/rds/ib.c
trunk/linux/net/rds/ib.h
trunk/linux/net/rds/ib_cm.c
trunk/linux/net/rds/ib_recv.c
trunk/linux/net/rds/ib_send.c
Modified:
trunk/linux/net/rds/Makefile
trunk/linux/net/rds/rds.h
Log:
Commit the start of the IB transport.
The basics of the paths are there but they still need a lot of work. This has
never been run.
While we're here, give rdsdebug() a non-debug variant which doesn't ignore its
arguments.
Modified: trunk/linux/net/rds/Makefile
===================================================================
--- trunk/linux/net/rds/Makefile 2006-06-23 17:26:55 UTC (rev 112)
+++ trunk/linux/net/rds/Makefile 2006-06-23 22:51:38 UTC (rev 113)
@@ -3,4 +3,5 @@
rds-y := af_rds.o ack.o bind.o connection.o flow.o message.o \
recv.o send.o stats.o sysctl.o threads.o transport.o\
tcp.o tcp_connect.o tcp_listen.o tcp_send.o tcp_recv.o \
+ ib.o ib_cm.o ib_send.o ib_recv.o \
loop.o
Added: trunk/linux/net/rds/ib.c
===================================================================
--- trunk/linux/net/rds/ib.c 2006-06-23 17:26:55 UTC (rev 112)
+++ trunk/linux/net/rds/ib.c 2006-06-23 22:51:38 UTC (rev 113)
@@ -0,0 +1,116 @@
+/*
+ * Copyright (C) 2006 Oracle. All rights reserved.
+ *
+ * Portions of this code are derived from code which was:
+ *
+ * Copyright (c) 2005 SilverStorm Technologies, Inc. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ */
+#include <linux/config.h>
+#include <linux/kernel.h>
+#include <linux/in.h>
+#include <linux/inetdevice.h>
+#include <linux/if_arp.h>
+
+#include "rds.h"
+#include "ib.h"
+
+/*
+ * Early RDS/IB was built to only bind to an address if there is an IPoIB
+ * device with that address set.
+ *
+ * If it were me, I'd advocate for something more flexible. Sending and
+ * receiving should be device-agnostic. Transports would try and maintain
+ * connections between peers who have messages queued. Userspace would be
+ * allowed to influence which paths have priority. We could call userspace
+ * asserting this policy "routing".
+ */
+static int rds_ib_laddr_check(__be32 addr)
+{
+ struct net_device *dev;
+ int ret;
+
+ dev = ip_dev_find(addr);
+ if (dev && dev->type == ARPHRD_INFINIBAND) {
+ dev_put(dev);
+ ret = 0;
+ } else
+ ret = -EADDRNOTAVAIL;
+
+ return ret;
+}
+
+struct rds_transport rds_ib_transport = {
+ .laddr_check = rds_ib_laddr_check,
+ .xmit = rds_send_xmit,
+ .xmit_prepare = rds_ib_xmit_prepare,
+ .xmit_complete = rds_ib_xmit_complete,
+ .xmit_header = rds_ib_xmit_header,
+ .xmit_data = rds_ib_xmit_data,
+ .conn_alloc = rds_ib_conn_alloc,
+ .conn_free = rds_ib_conn_free,
+ .conn_connect = rds_ib_conn_connect,
+ .conn_shutdown = rds_ib_conn_shutdown,
+ .inc_copy_to_user = rds_ib_inc_copy_to_user,
+ .inc_free = rds_ib_inc_free,
+ .inc_process_acks = rds_ib_inc_process_acks,
+ .listen_stop = rds_ib_listen_stop,
+ .ack_queue_inc = rds_ack_queue_inc,
+};
+
+int __init rds_ib_init(void)
+{
+ int ret;
+
+ ret = rds_ib_send_init();
+ if (ret)
+ goto out;
+
+// ret = rds_ib_conn_init();
+ if (ret)
+ goto out_send;
+
+ ret = rds_ib_recv_init();
+ if (ret)
+ goto out_conn;
+
+ ret = rds_ib_listen_init();
+ if (ret)
+ goto out_recv;
+
+ goto out;
+
+out_recv:
+ rds_ib_recv_exit();
+out_conn:
+// rds_ib_conn_exit();
+out_send:
+ rds_ib_send_exit();
+out:
+ return ret;
+}
+
+/*
+ * conns should have been freed up by the time we get here..
+ */
+void __exit rds_ib_exit(void)
+{
+// rds_ib_listen_exit();
+ rds_ib_recv_exit();
+// rds_ib_conn_exit();
+ rds_ib_send_exit();
+}
Added: trunk/linux/net/rds/ib.h
===================================================================
--- trunk/linux/net/rds/ib.h 2006-06-23 17:26:55 UTC (rev 112)
+++ trunk/linux/net/rds/ib.h 2006-06-23 22:51:38 UTC (rev 113)
@@ -0,0 +1,122 @@
+#ifndef _RDS_IB_H
+#define _RDS_IB_H
+
+#include <rdma/ib_verbs.h>
+
+/*
+ * XXX randomly chosen, but at least seems to be unused:
+ * # 18464-18768 Unassigned
+ * We should do better. We want a reserved port to discourage unpriv'ed
+ * userspace from listening.
+ */
+#define RDS_IB_KNOWN_PORT 18633
+#define RDS_IB_RESOLVE_TIMEOUT_MS 5000
+#define RDS_IB_MAX_RECV_BUFS 500
+#define RDS_IB_MAX_SEND_BUFS 100
+
+#define RDS_IB_WR_ID_MASK 0x11
+#define RDS_IB_WR_ID_HEADER 0
+#define RDS_IB_WR_ID_DATA 1
+#define RDS_IB_WR_ID_RECV 2
+
+struct rds_ib_incoming {
+ struct list_head ii_recvs;
+ struct rds_incoming ii_inc;
+};
+
+struct rds_ib_connect_private {
+ __be32 dp_saddr;
+ __be32 dp_daddr;
+};
+
+/* ih_dma and ih_sge.addr are redundant, but have different types */
+struct rds_ib_header_send {
+ struct rds_ib_connection *ih_ic;
+ void *ih_vaddr;
+ dma_addr_t ih_dma;
+ struct ib_sge ih_sge;
+ struct ib_send_wr ih_wr;
+};
+
+/*
+ * This tracks a message that is being sent. We map the entire message
+ * and then send out its fragments, perhaps across multiple posts.
+ */
+struct rds_ib_data_send {
+ struct rds_message *id_rm;
+ struct rds_ib_connection *id_ic;
+ atomic_t id_refcount;
+ struct ib_sge *id_sge;
+ struct ib_send_wr *id_wr;
+};
+
+struct rds_ib_recv {
+ struct list_head ir_item;
+ struct page *ir_page;
+ struct ib_sge ir_sge;
+ struct ib_recv_wr ir_wr;
+};
+
+struct rds_ib_connection {
+ struct dma_pool *i_pool;
+ struct device *i_device;
+
+ /* alphabet soup, IBTA style */
+ struct rdma_cm_id *i_cm_id;
+ struct ib_pd *i_pd;
+ struct ib_mr *i_mr;
+ struct ib_cq *i_cq;
+ struct ib_qp *i_qp;
+
+ /* tx */
+ struct rds_ib_data_send *i_ds;
+
+ /* rx */
+ struct rds_ib_incoming *i_ibinc;
+ u32 i_recv_data_rem;
+ atomic_t i_recv_posted;
+};
+
+extern struct workqueue_struct *rds_ib_wq;
+
+/* ib.c */
+int __init rds_ib_init(void);
+void __exit rds_ib_exit(void);
+extern struct rds_transport rds_ib_transport;
+
+/* ib_cm.c */
+int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp);
+void rds_ib_conn_free(void *arg);
+int rds_ib_conn_connect(struct rds_connection *conn);
+void rds_ib_conn_shutdown(struct rds_connection *conn);
+void rds_ib_state_change(struct sock *sk);
+int __init rds_ib_listen_init(void);
+void rds_ib_listen_stop(void);
+
+/* ib_recv.c */
+int __init rds_ib_recv_init(void);
+void __exit rds_ib_recv_exit(void);
+void rds_ib_data_ready(struct sock *sk, int bytes);
+void rds_ib_recv_worker(void *arg);
+void rds_ib_inc_free(struct rds_incoming *inc);
+int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *iov,
+ size_t size);
+void rds_ib_inc_process_acks(struct rds_connection *conn,
+ struct rds_incoming *inc, u16 nr);
+void rds_ib_recv_complete(struct rds_connection *conn, u64 wr_id);
+int rds_ib_recv_refill(struct rds_connection *conn);
+
+/* ib_send.c */
+int __init rds_ib_send_init(void);
+void __exit rds_ib_send_exit(void);
+void rds_ib_xmit_prepare(struct rds_connection *conn);
+void rds_ib_xmit_complete(struct rds_connection *conn);
+int rds_ib_xmit_header(struct rds_connection *conn,
+ struct rds_message *rm, unsigned int off);
+int rds_ib_xmit_data(struct rds_connection *conn,
+ struct rds_message *rm, unsigned int sg,
+ unsigned int off);
+void rds_ib_header_complete(struct rds_connection *conn, u64 wr_id);
+void rds_ib_data_complete(struct rds_connection *conn, u64 wr_id);
+
+#endif
Added: trunk/linux/net/rds/ib_cm.c
===================================================================
--- trunk/linux/net/rds/ib_cm.c 2006-06-23 17:26:55 UTC (rev 112)
+++ trunk/linux/net/rds/ib_cm.c 2006-06-23 22:51:38 UTC (rev 113)
@@ -0,0 +1,421 @@
+/*
+ * Copyright (C) 2006 Oracle. All rights reserved.
+ *
+ * Portions of this code are derived from code which was:
+ *
+ * Copyright (c) 2005 SilverStorm Technologies, Inc. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ */
+#include <linux/config.h>
+#include <linux/kernel.h>
+#include <linux/in.h>
+#include <rdma/rdma_cm.h>
+
+#include "rds.h"
+#include "ib.h"
+
+static struct rdma_cm_id *rds_ib_listen_id;
+
+static void rds_ib_cm_fill_conn_param(struct rdma_conn_param *conn_param)
+{
+ memset(conn_param, 0, sizeof(struct rdma_conn_param));
+ /* XXX tune these? */
+ conn_param->responder_resources = 1;
+ conn_param->initiator_depth = 1;
+ conn_param->retry_count = 7;
+ conn_param->rnr_retry_count = 7;
+}
+
+static void rds_ib_cq_comp_handler(struct ib_cq *cq, void *context)
+{
+ struct rds_connection *conn = context;
+ struct ib_wc wc;
+ u64 wr_id;
+ int id_type;
+
+ rdsdebug("cq %p conn %p\n", cq, conn);
+
+ ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
+
+ while (ib_poll_cq(cq, 1, &wc) > 0 ) {
+ rdsdebug("wc op %u wr_id 0x%llx\n", wc.opcode,
+ (unsigned long long)wc.wr_id);
+
+ id_type = wc.wr_id & RDS_IB_WR_ID_MASK;
+ wr_id = wc.wr_id & ~RDS_IB_WR_ID_MASK;
+
+ switch(id_type) {
+ case RDS_IB_WR_ID_HEADER:
+ rds_ib_header_complete(conn, wr_id);
+ break;
+ case RDS_IB_WR_ID_DATA:
+ rds_ib_data_complete(conn, wr_id);
+ break;
+ case RDS_IB_WR_ID_RECV:
+ rds_ib_recv_complete(conn, wr_id);
+ break;
+ default:
+ printk(KERN_ERR "bogus type %u from id "
+ "0x%llx\n", id_type,
+ (unsigned long long)wc.wr_id);
+ BUG();
+ }
+ }
+}
+
+static void rds_ib_cq_event_handler(struct ib_event *event, void *data)
+{
+ rdsdebug("event %u data %p\n", event->event, data);
+}
+
+static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
+{
+ struct rds_connection *conn = data;
+ struct rds_ib_connection *ic = conn->c_transport_data;
+
+ rdsdebug("conn %p ic %p event %u\n", conn, ic, event->event);
+
+ switch (event->event) {
+ case IB_EVENT_COMM_EST:
+ rds_connect_complete(conn);
+ break;
+ default:
+ printk(KERN_WARNING "RDS/ib: unhandled QP event %u "
+ "on connection to %u.%u.%u.%u\n", event->event,
+ NIPQUAD(conn->c_faddr));
+ break;
+ }
+}
+
+/*
+ * This needs to be very careful to not leave IS_ERR pointers around for
+ * cleanup to trip over.
+ */
+static int rds_ib_setup_qp(struct rds_connection *conn)
+{
+ struct rds_ib_connection *ic = conn->c_transport_data;
+ struct ib_qp_init_attr attr;
+ int ret;
+
+ ic->i_pd = ib_alloc_pd(ic->i_cm_id->device);
+ if (IS_ERR(ic->i_pd)) {
+ ret = PTR_ERR(ic->i_pd);
+ ic->i_pd = NULL;
+ rdsdebug("ib_alloc_pd failed: %d\n", ret);
+ goto out;
+ }
+
+ ic->i_mr = ib_get_dma_mr(ic->i_pd, IB_ACCESS_LOCAL_WRITE);
+ if (IS_ERR(ic->i_mr)) {
+ ret = PTR_ERR(ic->i_mr);
+ ic->i_mr = NULL;
+ rdsdebug("ib_get_dma_mr failed: %d\n", ret);
+ goto out;
+ }
+
+ ic->i_cq = ib_create_cq(ic->i_cm_id->device, rds_ib_cq_comp_handler,
+ rds_ib_cq_event_handler, conn,
+ RDS_IB_MAX_RECV_BUFS);
+ if (IS_ERR(ic->i_cq)) {
+ ret = PTR_ERR(ic->i_cq);
+ ic->i_cq = NULL;
+ rdsdebug("ib_create_cq failed: %d\n", ret);
+ goto out;
+ }
+
+ ret = ib_req_notify_cq(ic->i_cq, IB_CQ_NEXT_COMP);
+ if (ret) {
+ rdsdebug("ib_req_notify_cq failed: %d\n", ret);
+ goto out;
+ }
+
+ memset(&attr, 0, sizeof(attr));
+ attr.event_handler = rds_ib_qp_event_handler;
+ attr.qp_context = conn;
+ attr.cap.max_send_wr = RDS_IB_MAX_SEND_BUFS;
+ attr.cap.max_recv_wr = RDS_IB_MAX_RECV_BUFS;
+ attr.cap.max_send_sge = 1;
+ attr.cap.max_recv_sge = 1;
+ attr.sq_sig_type = IB_SIGNAL_REQ_WR;
+ attr.qp_type = IB_QPT_RC;
+ attr.send_cq = ic->i_cq;
+ attr.recv_cq = ic->i_cq;
+
+ ret = rdma_create_qp(ic->i_cm_id, ic->i_pd, &attr);
+ if (ret) {
+ rdsdebug("ib_req_notify_cq failed: %d\n", ret);
+ goto out;
+ }
+
+ rds_ib_recv_refill(conn);
+
+out:
+ return ret;
+}
+
+static int rds_ib_cm_handle_connect(struct rdma_cm_id *cm_id,
+ struct rdma_cm_event *event)
+{
+ __be64 lguid = cm_id->route.path_rec->sgid.global.interface_id;
+ __be64 fguid = cm_id->route.path_rec->dgid.global.interface_id;
+ struct rds_ib_connect_private *dp = event->private_data;
+ struct rds_connection *conn;
+ struct rds_ib_connection *ic;
+ struct rdma_conn_param conn_param;
+ int ret;
+
+ rdsdebug("saddr %u.%u.%u.%u daddr %u.%u.%u.%u lguid 0x%llx fguid "
+ "0x%llx\n", NIPQUAD(dp->dp_saddr), NIPQUAD(dp->dp_daddr),
+ (unsigned long long)be64_to_cpu(lguid),
+ (unsigned long long)be64_to_cpu(fguid));
+
+ /* XXX will have to notice race somehow, probably compare guids */
+ conn = rds_conn_create(dp->dp_daddr, dp->dp_saddr, &rds_ib_transport,
+ GFP_KERNEL);
+ if (IS_ERR(conn)) {
+ ret = PTR_ERR(conn);
+ goto out;
+ }
+
+ /* XXX this seems totally crazy. */
+ ic = conn->c_transport_data;
+ ic->i_cm_id = cm_id;
+
+ ret = rds_ib_setup_qp(conn);
+ if (ret)
+ goto out;
+
+ rds_ib_cm_fill_conn_param(&conn_param);
+ ret = rdma_accept(cm_id, &conn_param);
+out:
+ return ret;
+}
+
+static int rds_ib_cm_initiate_connect(struct rdma_cm_id *cm_id)
+{
+ struct rds_connection *conn = cm_id->context;
+ struct rdma_conn_param conn_param;
+ struct rds_ib_connect_private dp;
+ int ret;
+
+ ret = rds_ib_setup_qp(conn);
+ if (ret)
+ goto out;
+
+ dp.dp_saddr = conn->c_laddr;
+ dp.dp_daddr = conn->c_faddr;
+
+ rds_ib_cm_fill_conn_param(&conn_param);
+ conn_param.private_data = &dp;
+ conn_param.private_data_len = sizeof(struct rds_ib_connect_private);
+
+ ret = rdma_connect(cm_id, &conn_param);
+ if (ret)
+ rdsdebug("rdma_connect failed: %d\n", ret);
+
+out:
+ return ret;
+}
+
+static int rds_ib_cm_event_handler(struct rdma_cm_id *cm_id,
+ struct rdma_cm_event *event)
+{
+ struct rds_connection *conn = cm_id->context;
+ struct rds_ib_connection *ic = conn->c_transport_data;
+ int ret = 0;
+
+ rdsdebug("id %p handling event %u\n", cm_id, event->event);
+
+ switch (event->event) {
+ case RDMA_CM_EVENT_CONNECT_REQUEST:
+ ret = rds_ib_cm_handle_connect(cm_id, event);
+ break;
+
+ case RDMA_CM_EVENT_ADDR_RESOLVED:
+ /* XXX do we need to clean up if this fails? */
+ ret = rdma_resolve_route(ic->i_cm_id,
+ RDS_IB_RESOLVE_TIMEOUT_MS);
+ break;
+
+ case RDMA_CM_EVENT_ROUTE_RESOLVED:
+ /* XXX worry about racing with listen acceptance */
+ ret = rds_ib_cm_initiate_connect(cm_id);
+ break;
+
+ case RDMA_CM_EVENT_ESTABLISHED:
+ rds_connect_complete(conn);
+ break;
+
+ case RDMA_CM_EVENT_ADDR_ERROR:
+ case RDMA_CM_EVENT_ROUTE_ERROR:
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ case RDMA_CM_EVENT_UNREACHABLE:
+ case RDMA_CM_EVENT_REJECTED:
+ printk(KERN_WARNING "RDS/ib: connecting to "
+ "%u.%u.%u.%u failed with event %u, "
+ "reconnecting\n", NIPQUAD(conn->c_faddr),
+ event->event);
+ queue_work(rds_wq, &conn->c_shutdown_work);
+ break;
+
+ case RDMA_CM_EVENT_DISCONNECTED:
+ printk(KERN_WARNING "RDS/ib: connection to "
+ "%u.%u.%u.%u disconnected, reconnecting\n",
+ NIPQUAD(conn->c_faddr));
+ rdma_disconnect(cm_id);
+ queue_work(rds_wq, &conn->c_shutdown_work);
+ break;
+
+ default:
+ BUG(); /* things like device disconnect? */
+ break;
+ }
+
+ rdsdebug("id %p event %u handling ret %d\n", cm_id, event->event, ret);
+ return ret;
+}
+
+int rds_ib_conn_connect(struct rds_connection *conn)
+{
+ struct rds_ib_connection *ic = conn->c_transport_data;
+ struct sockaddr_in src, dest;
+ int ret;
+
+ /* XXX I wonder what affect the port space has */
+ ic->i_cm_id = rdma_create_id(rds_ib_cm_event_handler, conn,
+ RDMA_PS_TCP);
+ if (IS_ERR(ic->i_cm_id)) {
+ ret = PTR_ERR(ic->i_cm_id);
+ ic->i_cm_id = NULL;
+ pr_debug("rdma_create_id() failed: %d\n", ret);
+ goto out;
+ }
+
+ rdsdebug("created cm id %p for conn %p\n", ic->i_cm_id, conn);
+
+ src.sin_family = AF_INET;
+ src.sin_addr.s_addr = (__force u32)conn->c_laddr;
+ src.sin_port = (__force u16)htons(0);
+
+ dest.sin_family = AF_INET;
+ dest.sin_addr.s_addr = (__force u32)conn->c_faddr;
+ dest.sin_port = (__force u16)htons(RDS_IB_KNOWN_PORT);
+
+ ret = rdma_resolve_addr(ic->i_cm_id, (struct sockaddr *)&src,
+ (struct sockaddr *)&dest,
+ RDS_IB_RESOLVE_TIMEOUT_MS);
+ if (ret) {
+ rdsdebug("addr resolve failed for cm id %p: %d\n", ic->i_cm_id,
+ ret);
+ rdma_destroy_id(ic->i_cm_id);
+ ic->i_cm_id = NULL;
+ }
+
+out:
+ return ret;
+}
+
+void rds_ib_conn_shutdown(struct rds_connection *conn)
+{
+ struct rds_ib_connection *ic = conn->c_transport_data;
+
+ rdsdebug("cm %p pd %p cq %p qp %p\n", ic->i_cm_id,
+ ic->i_pd, ic->i_cq,
+ ic->i_cm_id ? ic->i_cm_id->qp : NULL);
+
+ if (ic->i_cm_id) {
+ if (ic->i_cm_id->qp)
+ rdma_destroy_qp(ic->i_cm_id);
+ if (ic->i_cq)
+ ib_destroy_cq(ic->i_cq);
+ if (ic->i_pd)
+ ib_dealloc_pd(ic->i_pd);
+ rdma_destroy_id(ic->i_cm_id);
+ }
+
+ ic->i_cm_id = NULL;
+ ic->i_pd = NULL;
+ ic->i_cq = NULL;
+}
+
+int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
+{
+ struct rds_ib_connection *ic;
+
+ /* XXX too lazy? */
+ ic = kzalloc(sizeof(struct rds_ib_connection *), GFP_KERNEL);
+ if (ic == NULL)
+ return -ENOMEM;
+
+ atomic_set(&ic->i_recv_posted, 0);
+
+ conn->c_transport_data = ic;
+
+ rdsdebug("ic %p\n", conn->c_transport_data);
+ return 0;
+}
+
+void rds_ib_conn_free(void *arg)
+{
+ struct rds_ib_connection *ic = arg;
+ rdsdebug("ic %p\n", ic);
+ kfree(ic);
+}
+
+int __init rds_ib_listen_init(void)
+{
+ struct sockaddr_in sin;
+ struct rdma_cm_id *cm_id;
+ int ret;
+
+ cm_id = rdma_create_id(rds_ib_cm_event_handler, NULL, RDMA_PS_TCP);
+ if (IS_ERR(cm_id)) {
+ ret = PTR_ERR(cm_id);
+ printk(KERN_ERR "RDS/ib: failed to setup listener, "
+ "rdma_create_id() returned %d\n", ret);
+ goto out;
+ }
+
+ sin.sin_family = PF_INET,
+ sin.sin_addr.s_addr = (__force u32)htonl(INADDR_ANY);
+ sin.sin_port = (__force u16)htons(RDS_IB_KNOWN_PORT);
+
+ ret = rdma_bind_addr(cm_id, (struct sockaddr *)&sin);
+ if (ret) {
+ printk(KERN_ERR "RDS/ib: failed to setup listener, "
+ "rdma_bind_addr() returned %d\n", ret);
+ goto out;
+ }
+
+ ret = rdma_listen(cm_id, 128);
+ if (ret) {
+ printk(KERN_ERR "RDS/ib: failed to setup listener, "
+ "rdma_listen() returned %d\n", ret);
+ goto out;
+ }
+
+ rds_ib_listen_id = cm_id;
+ cm_id = NULL;
+out:
+ if (cm_id)
+ rdma_destroy_id(cm_id);
+ return ret;
+}
+
+void rds_ib_listen_stop(void) {
+ rdma_disconnect(rds_ib_listen_id);
+}
Added: trunk/linux/net/rds/ib_recv.c
===================================================================
--- trunk/linux/net/rds/ib_recv.c 2006-06-23 17:26:55 UTC (rev 112)
+++ trunk/linux/net/rds/ib_recv.c 2006-06-23 22:51:38 UTC (rev 113)
@@ -0,0 +1,301 @@
+/*
+ * Copyright (C) 2006 Oracle. All rights reserved.
+ *
+ * Portions of this code are derived from code which was:
+ *
+ * Copyright (c) 2005 SilverStorm Technologies, Inc. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ */
+#include <linux/config.h>
+#include <linux/kernel.h>
+#include <linux/pci.h>
+#include <linux/dma-mapping.h>
+#include <rdma/rdma_cm.h>
+
+#include "rds.h"
+#include "ib.h"
+
+static kmem_cache_t *rds_ib_incoming_slab;
+static kmem_cache_t *rds_ib_recv_slab;
+
+void rds_ib_inc_free(struct rds_incoming *inc)
+{
+ struct rds_ib_incoming *ibinc;
+ ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
+ pr_debug("freeing ibinc %p inc %p\n", ibinc, inc);
+ BUG_ON(!list_empty(&ibinc->ii_recvs));
+ kmem_cache_free(rds_ib_incoming_slab, ibinc);
+}
+
+static void rds_ib_recv_free(struct rds_ib_recv *recv)
+{
+ BUG_ON(!list_empty(&recv->ir_item));
+ BUG_ON(recv->ir_sge.addr != 0);
+ __free_page(recv->ir_page);
+ kmem_cache_free(rds_ib_recv_slab, recv);
+}
+
+static void rds_ib_recv_unmap(struct rds_ib_connection *ic,
+ struct rds_ib_recv *recv)
+{
+ dma_unmap_page(ic->i_cm_id->device->dma_device, recv->ir_sge.addr,
+ PAGE_SIZE, DMA_FROM_DEVICE);
+ recv->ir_sge.addr = 0;
+}
+
+/*
+ * allcates a new rds_ib_incoming and initializes its header from the
+ * header at the start of the give page.
+ */
+static struct rds_ib_incoming *rds_ib_inc_new(struct rds_connection *conn,
+ struct page *page, gfp_t gfp)
+{
+ struct rds_ib_incoming *ibinc;
+ void *addr;
+
+ ibinc = kmem_cache_alloc(rds_ib_incoming_slab, gfp);
+ /*
+ * XXX punt to thread, suggests refactoring core threading
+ */
+ BUG_ON(ibinc == NULL);
+
+ rds_inc_init(&ibinc->ii_inc, conn, conn->c_faddr);
+ addr = kmap_atomic(page, KM_SOFTIRQ0);
+ memcpy(&ibinc->ii_inc.i_hdr, addr, sizeof(struct rds_header));
+ kunmap_atomic(addr, KM_SOFTIRQ0);
+
+ return ibinc;
+}
+
+void rds_ib_inc_process_acks(struct rds_connection *conn,
+ struct rds_incoming *inc, u16 nr)
+{
+ struct rds_ib_incoming *ibinc;
+ struct rds_ack_entry *ent, *end;
+ struct rds_ib_recv *recv;
+ void *addr;
+
+ ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
+ list_for_each_entry(recv, &ibinc->ii_recvs, ir_item) {
+ if (nr == 0)
+ break;
+ /* XXX pass in km? */
+ addr = kmap_atomic(recv->ir_page, KM_SOFTIRQ0);
+ /* offset is RDS_FRAG_SIZE aligned */
+ ent = addr; /* XXX need + offset when many recvs per page */
+ end = ent + (RDS_FRAG_SIZE / sizeof(struct rds_ack_entry));
+
+ /*
+ * we're going to say that calling ack_process while holding
+ * a kmap will be ok.. we'll see!
+ */
+ for(; ent < end && nr; ent++, nr--)
+ rds_ack_process(conn, ent);
+
+ kunmap_atomic(addr, KM_SOFTIRQ0);
+ }
+}
+
+int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *first_iov,
+ size_t size)
+{
+ struct rds_ib_incoming *ibinc;
+ struct rds_ib_recv *recv;
+ struct iovec *iov = first_iov;
+ unsigned long to_copy;
+ unsigned long iov_off = 0;
+ unsigned long recv_off = 0;
+ int ret, copied = 0;
+ u32 len;
+
+ ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
+ recv = list_entry(ibinc->ii_recvs.next, struct rds_ib_recv, ir_item);
+ len = be32_to_cpu(inc->i_hdr.h_len);
+
+ while (copied < size && copied < len) {
+ if (recv_off == RDS_FRAG_SIZE) {
+ recv = list_entry(recv->ir_item.next,
+ struct rds_ib_recv, ir_item);
+ recv_off = 0;
+ }
+ while (iov_off == iov->iov_len) {
+ iov_off = 0;
+ iov++;
+ }
+
+ to_copy = min(iov->iov_len - iov_off, RDS_FRAG_SIZE - recv_off);
+ to_copy = min_t(size_t, to_copy, size - copied);
+ to_copy = min_t(unsigned long, to_copy, len - copied);
+
+ pr_debug("copying %lu bytes to user %p (base %p len %zu "
+ "off %lu) from recv %p page %p voff %u off %lu\n",
+ to_copy, iov->iov_base + iov_off, iov->iov_base,
+ iov->iov_len, iov_off, recv, recv->ir_page,
+ recv->ir_sge.offset, recv_off);
+
+ /* XXX could look more like filemap_copy_from_user() */
+ /* XXX needs + offset for multiple recvs per page */
+ ret = copy_to_user(iov->iov_base + iov_off,
+ kmap(recv->ir_page) + recv_off,
+ to_copy);
+ kunmap(recv->ir_page);
+ if (ret) {
+ copied = -EFAULT;
+ break;
+ }
+
+ iov_off += to_copy;
+ recv_off += to_copy;
+ copied += to_copy;
+ }
+
+ return copied;
+}
+
+int rds_ib_recv_post_or_free(struct rds_connection *conn,
+ struct rds_ib_recv *recv)
+{
+ struct rds_ib_connection *ic = conn->c_transport_data;
+ struct ib_recv_wr *failed;
+ int ret;
+
+ /* XXX we could be posting lots of these at once */
+ atomic_inc(&ic->i_recv_posted);
+ ret = ib_post_recv(ic->i_qp, &recv->ir_wr, &failed);
+ if (ret) {
+ rds_ib_recv_unmap(ic, recv);
+ rds_ib_recv_free(recv);
+ atomic_dec(&ic->i_recv_posted);
+ }
+ return ret;
+}
+
+/*
+ * We're relying on RC completions arriving in post order so that we always
+ * get an orderly stream of message headers and then their data fragments.
+ */
+void rds_ib_recv_callback(struct rds_connection *conn, u64 wr_id)
+{
+ struct rds_ib_connection *ic = conn->c_transport_data;
+ struct rds_ib_recv *recv = (void *)(unsigned long)wr_id;
+ struct rds_ib_incoming *ibinc = ic->i_ibinc;
+
+ pr_debug("recv ic %p recv %p sge len %u\n", ic, recv,
+ recv->ir_sge.length);
+
+ /*
+ * XXX worry about how we clean these up.. if qp shutdown leads
+ * to a stream of completions then we'd want to not post again
+ * in that case.
+ */
+
+ if (ibinc == NULL) {
+ /* XXX pretty sure comp callbacks are in softirq? */
+ ibinc = rds_ib_inc_new(conn, recv->ir_page, GFP_ATOMIC);
+ ic->i_ibinc = ibinc;
+ ic->i_recv_data_rem = be32_to_cpu(ibinc->ii_inc.i_hdr.h_len);
+ rds_ib_recv_post_or_free(conn, recv);
+ return;
+ }
+
+ list_add_tail(&recv->ir_item, &ibinc->ii_recvs);
+
+ /* XXX use sge len in case sender does weird things */
+ if (ic->i_recv_data_rem > RDS_FRAG_SIZE)
+ ic->i_recv_data_rem -= RDS_FRAG_SIZE;
+ else {
+ ic->i_recv_data_rem = 0;
+ ic->i_ibinc = NULL;
+ rds_recv_incoming(conn, conn->c_faddr, conn->c_laddr,
+ &ibinc->ii_inc, GFP_ATOMIC, KM_SOFTIRQ0);
+ }
+
+ rds_ib_recv_refill(conn);
+}
+
+int rds_ib_recv_refill(struct rds_connection *conn)
+{
+ struct rds_ib_connection *ic = conn->c_transport_data;
+ struct rds_ib_recv *recv;
+ int ret = 0;
+
+ /* eh, we don't really care about this race */
+ while (atomic_read(&ic->i_recv_posted) < RDS_IB_MAX_RECV_BUFS) {
+ recv = kmem_cache_alloc(rds_ib_recv_slab, GFP_KERNEL);
+ if (recv == NULL) {
+ ret = -ENOMEM;
+ break;
+ }
+ /*
+ * XXX right now we're only doing one page per frag, which
+ * is pretty wasteful on large machines. Perhaps we'd be
+ * able to rely on recv completion happening in recv post
+ * order so that we know to unmap only as the final offset
+ * is completed.
+ */
+ recv->ir_page = alloc_page(GFP_KERNEL);
+ if (recv->ir_page == NULL) {
+ kmem_cache_free(rds_ib_recv_slab, recv);
+ ret = -ENOMEM;
+ break;
+ }
+
+ INIT_LIST_HEAD(&recv->ir_item);
+
+ /* XXX doesn't fail? can block forever? Hmm. */
+ recv->ir_sge.addr = dma_map_page(ic->i_cm_id->device->dma_device,
+ recv->ir_page, 0, PAGE_SIZE,
+ DMA_TO_DEVICE);
+ recv->ir_sge.length = PAGE_SIZE;
+ recv->ir_sge.lkey = ic->i_mr->lkey;
+
+ recv->ir_wr.next = NULL;
+ recv->ir_wr.wr_id = ((unsigned long)recv) | RDS_IB_WR_ID_RECV;
+ recv->ir_wr.sg_list = &recv->ir_sge;
+ recv->ir_wr.num_sge = 1;
+
+ ret = rds_ib_recv_post_or_free(conn, recv);
+ if (ret)
+ break;
+ }
+ if (ret)
+ queue_delayed_work(rds_wq, &conn->c_recv_work, 2);
+ return ret;
+}
+
+int __init rds_ib_recv_init(void)
+{
+ rds_ib_incoming_slab = kmem_cache_create("rds_ib_incoming",
+ sizeof(struct rds_ib_incoming),
+ 0, 0, NULL, NULL);
+ if (rds_ib_incoming_slab == NULL)
+ return -ENOMEM;
+ rds_ib_recv_slab = kmem_cache_create("rds_ib_recv",
+ sizeof(struct rds_ib_recv),
+ 0, 0, NULL, NULL);
+ if (rds_ib_recv_slab == NULL) {
+ kmem_cache_destroy(rds_ib_incoming_slab);
+ return -ENOMEM;
+ }
+ return 0;
+}
+
+void __exit rds_ib_recv_exit(void)
+{
+ kmem_cache_destroy(rds_ib_incoming_slab);
+ kmem_cache_destroy(rds_ib_recv_slab);
+}
Added: trunk/linux/net/rds/ib_send.c
===================================================================
--- trunk/linux/net/rds/ib_send.c 2006-06-23 17:26:55 UTC (rev 112)
+++ trunk/linux/net/rds/ib_send.c 2006-06-23 22:51:38 UTC (rev 113)
@@ -0,0 +1,262 @@
+/*
+ * Copyright (C) 2006 Oracle. All rights reserved.
+ *
+ * Portions of this code are derived from code which was:
+ *
+ * Copyright (c) 2005 SilverStorm Technologies, Inc. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ */
+#include <linux/config.h>
+#include <linux/kernel.h>
+#include <linux/in.h>
+#include <linux/device.h>
+#include <linux/dmapool.h>
+
+#include "rds.h"
+#include "ib.h"
+
+static kmem_cache_t *rds_ib_header_send_slab;
+
+void rds_ib_xmit_prepare(struct rds_connection *conn)
+{
+}
+
+void rds_ib_xmit_complete(struct rds_connection *conn)
+{
+}
+
+void rds_ib_header_complete(struct rds_connection *conn, u64 wr_id)
+{
+ struct rds_ib_header_send *ih = (void *)(long)wr_id;
+ struct rds_ib_connection *ic = conn->c_transport_data;
+
+ rdsdebug("conn %p wr_id 0x%llx\n", conn, (unsigned long long)wr_id);
+
+ dma_pool_free(ic->i_pool, ih->ih_vaddr, ih->ih_dma);
+ kmem_cache_free(rds_ib_header_send_slab, ih);
+}
+
+/* the core send_sem serializes this with other xmit and shutdown */
+int rds_ib_xmit_header(struct rds_connection *conn,
+ struct rds_message *rm, unsigned int off)
+{
+ struct rds_ib_connection *ic = conn->c_transport_data;
+ struct rds_ib_header_send *ih;
+ struct ib_send_wr *failed_wr;
+ int ret;
+
+ BUG_ON(off != 0);
+
+ ih = kmem_cache_alloc(rds_ib_header_send_slab, GFP_KERNEL);
+ if (ih == NULL) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ ih->ih_vaddr = dma_pool_alloc(ic->i_pool, GFP_KERNEL, &ih->ih_dma);
+ if (ih->ih_vaddr == NULL) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ memcpy(ih->ih_vaddr, &rm->m_inc.i_hdr, sizeof(struct rds_header));
+ /* XXX need a sync to the device here? */
+
+ ih->ih_sge.addr = ih->ih_dma;
+ ih->ih_sge.length = sizeof(struct rds_header);
+ ih->ih_sge.lkey = ic->i_mr->lkey;
+
+ ih->ih_wr.next = NULL;
+ ih->ih_wr.sg_list = &ih->ih_sge;
+ ih->ih_wr.num_sge = 1;
+ ih->ih_wr.opcode = IB_WR_SEND_WITH_IMM;
+ ih->ih_wr.send_flags = 0; /* XXX what could these be? */
+ ih->ih_wr.imm_data = 0;
+
+ /*
+ * For now we're sending these as their own wr. The receiver
+ * knows from the state of the RC that they'll be a header. This
+ * means small messages take two work reqs, which isn't so nice,
+ * but maybe it'll be ok. We can complex it up if their overhead
+ * is too high.
+ */
+ ret = ib_post_send(ic->i_qp, &ih->ih_wr, &failed_wr);
+ if (ret == 0)
+ ret = sizeof(struct rds_header);
+ /* XXX disconnect and reconnect on error? */
+
+out:
+ if (ret < 0 && ih) {
+ if (ih->ih_vaddr)
+ dma_pool_free(ic->i_pool, ih->ih_vaddr,
+ ih->ih_dma);
+ kmem_cache_free(rds_ib_header_send_slab, ih);
+ }
+ return ret;
+}
+
+void rds_ib_ds_put(struct rds_ib_data_send *ds)
+{
+ if (atomic_dec_and_test(&ds->id_refcount)) {
+ if (ds->id_rm) {
+ struct rds_ib_connection *ic = ds->id_ic;
+ struct rds_message *rm = ds->id_rm;
+
+ dma_unmap_sg(ic->i_device, rm->m_sg,
+ rm->m_nents, DMA_TO_DEVICE);
+ rds_message_put(rm);
+ }
+ kfree(ds);
+ }
+}
+
+static struct rds_ib_data_send *rds_ib_ds_get(struct rds_ib_connection *ic,
+ struct rds_message *rm,
+ unsigned int sg,
+ unsigned int off,
+ unsigned long nr_work)
+{
+ struct rds_ib_data_send *ds;
+ struct ib_send_wr *wr;
+ struct scatterlist *scat;
+ struct ib_sge *sge;
+ unsigned long len;
+ unsigned long i;
+ int ret;
+
+ ds = kzalloc(sizeof(struct rds_ib_data_send) + (nr_work *
+ (sizeof(struct ib_send_wr) +
+ sizeof(struct ib_send_wr))), GFP_KERNEL);
+ if (ds == NULL)
+ goto out;
+
+ /* only set id_rm after we map it */
+ ret = dma_map_sg(ic->i_device, rm->m_sg, rm->m_nents,
+ DMA_TO_DEVICE);
+ if (ret != rm->m_nents) {
+ if (ret > 0) /* XXX not sure this is possible */
+ dma_unmap_sg(ic->i_device, rm->m_sg,
+ rm->m_nents, DMA_TO_DEVICE);
+ kfree(ds);
+ ds = NULL;
+ goto out;
+ }
+
+ rds_message_addref(rm);
+ ds->id_rm = rm;
+ ds->id_ic = ic;
+ atomic_set(&ds->id_refcount, nr_work);
+ ds->id_sge = (void *)ds + sizeof(struct rds_ib_data_send);
+ ds->id_wr = (void *)ds->id_sge +
+ (nr_work * sizeof(struct ib_sge));
+
+
+ for (i = 0; i < nr_work; i++) {
+ sge = &ds->id_sge[i];
+ wr = &ds->id_wr[i];
+ scat = &rm->m_sg[sg];
+
+ len = min(RDS_FRAG_SIZE, sg_dma_len(scat) - off);
+
+ sge->addr = sg_dma_address(scat) + off;
+ sge->length = len;
+ sge->lkey = ic->i_mr->lkey;
+
+ off += len;
+ if (len == sg_dma_len(scat)) {
+ off = 0;
+ sg++;
+ }
+
+ if (i != nr_work - 1)
+ wr->next = wr + 1;
+ else
+ wr->next = NULL;
+ wr->sg_list = sge;
+ wr->num_sge = 1;
+ wr->opcode = IB_WR_SEND_WITH_IMM;
+ }
+out:
+ return ds;
+}
+
+/*
+ * We simplify the moving pieces a little by allocating all the IB sending
+ * goo for a message at once. We try to send the message in one post.
+ * We might run into large allocation problems here with large messages
+ * in the future, but maybe it just won't be a problem.
+ *
+ * The core send_sem serializes this with other xmit and shutdown.
+ */
+int rds_ib_xmit_data(struct rds_connection *conn,
+ struct rds_message *rm, unsigned int sg,
+ unsigned int off)
+{
+ struct rds_ib_connection *ic = conn->c_transport_data;
+ struct rds_ib_data_send *ds = ic->i_ds;
+ struct ib_send_wr *wr;
+ unsigned long nr_work;
+ unsigned long first;
+ int ret;
+
+ nr_work = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE);
+
+ if (ds == NULL) {
+ ds = rds_ib_ds_get(ic, rm, sg, off, nr_work);
+ ic->i_ds = ds;
+ atomic_set(&ds->id_refcount, nr_work + 1);
+ }
+
+ first = (sg * (PAGE_SIZE / RDS_FRAG_SIZE)) + (off / RDS_FRAG_SIZE);
+
+ /* we use wr to track how many suceeded */
+ wr = &ds->id_wr[first];
+ ret = ib_post_send(ic->i_qp, &ds->id_wr[first], &wr);
+ pr_debug("data post ic %p first wr %p nr %lu returned ret %d wr %p\n",
+ ic, &ds->id_wr[first], nr_work - first, ret, wr);
+ if (ret) {
+ printk(KERN_WARNING "RDS/IB: ib_post_send to %u.%u.%u.%u "
+ "returned %d\n", NIPQUAD(conn->c_faddr), ret);
+ /* XXX fail the connection? */
+ if (wr != &ds->id_wr[first])
+ ret = (wr - &ds->id_wr[first]) * RDS_FRAG_SIZE;
+ goto out;
+ }
+
+ /* ok, we finished the message, drop our ds ref */
+ /* XXX this leaves the ds in flight, conn reset will have to
+ * force completion of posted work reqs somehow */
+ ic->i_ds = NULL;
+ rds_ib_ds_put(ds);
+out:
+ return ret;
+}
+
+int __init rds_ib_send_init(void)
+{
+ rds_ib_header_send_slab = kmem_cache_create("rds_ib_header_send",
+ sizeof(struct rds_ib_header_send),
+ 0, 0, NULL, NULL);
+ if (rds_ib_header_send_slab == NULL)
+ return -ENOMEM;
+ return 0;
+}
+
+void __exit rds_ib_send_exit(void)
+{
+ kmem_cache_destroy(rds_ib_header_send_slab);
+}
Modified: trunk/linux/net/rds/rds.h
===================================================================
--- trunk/linux/net/rds/rds.h 2006-06-23 17:26:55 UTC (rev 112)
+++ trunk/linux/net/rds/rds.h 2006-06-23 22:51:38 UTC (rev 113)
@@ -7,7 +7,15 @@
/* x86-64 doesn't include kmap_types.h from anywhere */
#include <asm/kmap_types.h>
+#ifdef DEBUG
#define rdsdebug(fmt, args...) pr_debug("%s(): " fmt, __func__ , ##args)
+#else
+/* sigh, pr_debug() causes unused variable warnings */
+static inline void __attribute__ ((format (printf, 1, 2)))
+rdsdebug(char *fmt, ...)
+{
+}
+#endif
/* XXX is there one of these somewhere? */
#define ceil(x, y) \
More information about the rds-commits
mailing list