[Ocfs2-commits] zab commits r2138 - trunk/fs/ocfs2/cluster
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Tue Apr 12 16:08:06 CDT 2005
Author: zab
Signed-off-by: mfasheh
Date: 2005-04-12 16:08:05 -0500 (Tue, 12 Apr 2005)
New Revision: 2138
Modified:
trunk/fs/ocfs2/cluster/nodemanager.c
trunk/fs/ocfs2/cluster/nodemanager.h
trunk/fs/ocfs2/cluster/tcp.c
trunk/fs/ocfs2/cluster/tcp.h
Log:
Strengthen the lifecycle management of sockets to remote nodes.
o replace 'net_inode_private' with a kref-managed 'sock_container'. see the
tcp.c comment for a description of the relationship between nodes and
sock containers.
o simplify how tranmitters wait for connect to finish and have them satisified
by an accept that comes in while they're waiting
o deal with racing connects by comparing node numbers to decide whose initiated
socket gets disconnected
o use the kernel's idr allocator to manage status replies
o wake status waiters when a sock is detached and won't be received from
again
o factor out sk callback registration and catch errors with state_change,
not error_report
Signed-off-by: mfasheh
Modified: trunk/fs/ocfs2/cluster/nodemanager.c
===================================================================
--- trunk/fs/ocfs2/cluster/nodemanager.c 2005-04-12 20:45:35 UTC (rev 2137)
+++ trunk/fs/ocfs2/cluster/nodemanager.c 2005-04-12 21:08:05 UTC (rev 2138)
@@ -184,6 +184,12 @@
}
EXPORT_SYMBOL(nm_node_put);
+void nm_node_get(struct nm_node *node)
+{
+ config_item_get(&node->nd_item);
+}
+EXPORT_SYMBOL(nm_node_get);
+
u8 nm_this_node(void)
{
u8 node_num = NM_MAX_NODES;
@@ -513,7 +519,6 @@
{
struct nm_node *node = NULL;
struct config_item *ret = NULL;
- net_inode_private *nip;
if (strlen(name) > NM_MAX_NAME_LEN)
goto out; /* ENAMETOOLONG */
@@ -523,21 +528,13 @@
goto out; /* ENOMEM */
strcpy(node->nd_name, name); /* use item.ci_namebuf instead? */
-
- /* this should be somewhere else */
- nip = &node->nd_net_inode_private;
- spin_lock_init(&nip->sock_lock);
- INIT_LIST_HEAD(&nip->pending_waiters);
- init_waitqueue_head(&nip->waitq);
- INIT_LIST_HEAD(&nip->handlers);
- INIT_LIST_HEAD(&nip->active_item);
- nip->page = alloc_page(GFP_KERNEL);
- if (nip->page == NULL) {
- nmprintk("page allocation failed\n");
- goto out; /* ENOMEM */
- }
-
config_item_init_type_name(&node->nd_item, name, &nm_node_type);
+ spin_lock_init(&node->nd_lock);
+ atomic_set(&node->nd_pending_connects, 0);
+ atomic_set(&node->nd_sc_generation, 0);
+ init_waitqueue_head(&node->nd_sc_wq);
+ idr_init(&node->nd_status_idr);
+ INIT_LIST_HEAD(&node->nd_status_list);
ret = &node->nd_item;
@@ -548,12 +545,13 @@
return ret;
}
-static void nm_node_group_drop_item(struct config_group *group, struct config_item *item)
+static void nm_node_group_drop_item(struct config_group *group,
+ struct config_item *item)
{
struct nm_node *node = to_nm_node(item);
struct nm_cluster *cluster = to_nm_cluster(group->cg_item.ci_parent);
- net_stop_node_sock(node);
+ net_detach_sc(NULL, node);
if (cluster->cl_has_local &&
(cluster->cl_local_node == node->nd_num)) {
Modified: trunk/fs/ocfs2/cluster/nodemanager.h
===================================================================
--- trunk/fs/ocfs2/cluster/nodemanager.h 2005-04-12 20:45:35 UTC (rev 2137)
+++ trunk/fs/ocfs2/cluster/nodemanager.h 2005-04-12 21:08:05 UTC (rev 2138)
@@ -27,53 +27,14 @@
#ifndef CLUSTER_NODEMANAGER_H
#define CLUSTER_NODEMANAGER_H
-#define NM_ASSERT(x) ({ if (!(x)) { printk("nm: assert failed! %s:%d\n", __FILE__, __LINE__); BUG(); } })
-
#include "ocfs2_nodemanager.h"
/* This totally doesn't belong here. */
#include "configfs.h"
+#include <linux/idr.h>
-
-/* TODO: move this */
-/*
- * this stores the per-socket state for each socket that we associate
- * with a node. for remote nodes this is a socket that is established
- * on demand and trades messages. For a local node this is just a listening
- * socket that spawns message sockets from other nodes.
- */
-struct sock;
-/* this is still called net_inode_private for hysterical raisins. one
- * has to draw the cleanup line somewhere.. */
-typedef struct _net_inode_private
-{
- /* only used by the local node. */
- struct task_struct *rx_thread;
- /* the rest is for remote nodes */
-
- /* sockets themselves don't seem to have a nice way to refcount them
- * above sock_release. one could use iget/iput, but that seems
- * to interact poory with sock_release() itself calling iput. */
- spinlock_t sock_lock;
- struct socket *sock;
- unsigned long sock_refs;
- unsigned sock_pending:1, /* wait before using ->sock */
- defer_release:1; /* sock busted,release soon */
- struct list_head pending_waiters;
- wait_queue_head_t waitq;
-
- struct list_head handlers;
- struct list_head active_item;
- struct page *page;
- size_t page_off;
-
-
- void (*orig_state_change)(struct sock *sk);
- void (*orig_error_report)(struct sock *sk);
- void (*orig_data_ready)(struct sock *sk, int bytes);
-} net_inode_private;
-
struct nm_node {
+ spinlock_t nd_lock;
struct config_item nd_item;
char nd_name[NM_MAX_NAME_LEN+1]; /* replace? */
__u8 nd_num;
@@ -87,9 +48,20 @@
unsigned long nd_set_attributes;
- /* we're making simple assertions that a node can only have one network
- * identity and report at one place in a heartbeat */
- net_inode_private nd_net_inode_private;
+ /* only used by the local node. */
+ struct task_struct *nd_rx_thread;
+
+ /* protected by nd_lock. It isn't so hot that we have all these
+ * net-private things out here in nodemanager.h. */
+ struct net_sock_container *nd_sc;
+ /* threads waiting for an sc to arrive wait on the wq for generation
+ * to increase. it is increased when a connecting socket succeeds
+ * or fails or when an accepted socket is attached. */
+ atomic_t nd_pending_connects;
+ atomic_t nd_sc_generation;
+ wait_queue_head_t nd_sc_wq;
+ struct idr nd_status_idr;
+ struct list_head nd_status_list;
};
u8 nm_this_node(void);
@@ -97,6 +69,7 @@
int nm_configured_node_map(unsigned long *map, unsigned bytes);
struct nm_node * nm_get_node_by_num(u8 node_num);
struct nm_node * nm_get_node_by_ip(u32 addr);
+void nm_node_get(struct nm_node *node);
void nm_node_put(struct nm_node *node);
#endif /* CLUSTER_NODEMANAGER_H */
Modified: trunk/fs/ocfs2/cluster/tcp.c
===================================================================
--- trunk/fs/ocfs2/cluster/tcp.c 2005-04-12 20:45:35 UTC (rev 2137)
+++ trunk/fs/ocfs2/cluster/tcp.c 2005-04-12 21:08:05 UTC (rev 2138)
@@ -37,19 +37,24 @@
* referenced beyond the call. Handlers may block but are discouraged from
* doing so.
*
- * Any framing errors (bad magic, unknown message types, large payload lengths)
- * closes a connection.
+ * Any framing errors (bad magic, large payload lengths) close a connection.
*
- * struct socket pointers live in the net_inode_private structure off of the
- * node manager inodes. Only one socket is active under an inode at a time and
- * callers refcount the socket in members of n_i_p. While a connect() is
- * pending transmitters will block on the connect state machine until it
- * completes. tx and rx get references to the socket while they're doing their
- * work. If they see an error they mark the socket for release at the next
- * final decref. sk_error_report doesn't get a reference, it just marks the
- * socket for release and kicks the rx thread. This means that new references
- * can get the known-dead socket and see errors.
+ * Our sock_container holds the state we associate with a socket. It's current
+ * framing state is held there as well as the refcounting we do around when
+ * it is safe to tear down the socket.
+ *
+ * A caller who wants to communicate with a node gets a ref to the sock
+ * container by finding it the nm_node that it wants to communicate with.
+ * The sock container will only be found on the node once there is a valid
+ * socket in the container. The socket is only finally torn down from
+ * the container when the container loses all of its references -- so as
+ * long as you hold a ref on the container you can trust that the socket
+ * is valid for use with kernel socket APIs.
*
+ * Usually blocking paths get and drop refs on the container, which is easy.
+ * The exception to this are the socket callbacks. They push their work
+ * back up to the rx thread who takes it from there.
+ *
* One can imagine the direction a more sophisticated API would head in:
* (there are certainly a half dozen examples in the kernel)
* * tx
@@ -63,22 +68,14 @@
* - handers must be callable from bh context
* but it really depends on what the semantics and messages are.
*
- * asap
- * - only have lookup succeed for active nodes (fully configured)
- * - only initiate connections if rx thread is running?
- * - don't allow node rmdir if it has socket and rx thread is running
+ * XXX
* - tear down all node sockets on rx thread exit
* - have rx thread stop active tx and wait for them
- * - make sure ->net.page gets torn down with net_inode_private
- * - tear down sockets on exit.. via removing their inodes?
- *
- * XXX
+ * - don't allow node rmdir if it has socket and rx thread is running
* - disable preemt before calling rx handler when debugging
* - find explicit stack call to drain rx queue
* - add trivial version trading message at the start of a conn
* - go nuts adding static
- * - nsc waiting is buggy, should be on socket.. wake w/err if socket dies
- * - compare socks in attach_sock so both size don't close
*/
#include <linux/module.h>
@@ -114,10 +111,12 @@
#define netprintk0(x)
#define msgprintk(hdr, fmt, args...)
#define msgprintk0(hdr, fmt)
+#define scprintk(sc, fmt, args...)
+#define scprintk0(sc, fmt)
#else
#define netprintk(x, arg...) printk("(tcp:%d)(%s:%d) " x, current->pid, __FUNCTION__, __LINE__, ##arg)
#define netprintk0(x) printk("(tcp:%d)(%s:%d) " x, current->pid, __FUNCTION__, __LINE__)
-/* yeah, a little gross, but it gets the job done */
+
#define __msg_fmt "[mag %u len %u typ %u status %d key %u num %u] "
#define __msg_args __hdr->magic, __hdr->data_len, __hdr->msg_type, \
__hdr->status, __hdr->key, __hdr->msg_num
@@ -129,6 +128,22 @@
typeof(hdr) __hdr = (hdr); \
printk(__msg_fmt fmt, __msg_args); \
} while (0)
+
+#define __sc_fmt \
+ "[sc %p refs %d sock %p node %p from_con %u item_on %d " \
+ "page %p pg_off %zu] "
+#define __sc_args __sc, atomic_read(&__sc->sc_kref.refcount), \
+ __sc->sc_sock, __sc->sc_node, __sc->sc_from_connect, \
+ !list_empty(&__sc->sc_item), __sc->sc_page, __sc->sc_page_off
+#define scprintk(sc, fmt, args...) do { \
+ typeof(sc) __sc = (sc); \
+ printk(__sc_fmt fmt, __sc_args, args); \
+} while (0)
+#define scprintk0(sc, fmt) do { \
+ typeof(sc) __sc = (sc); \
+ printk(__sc_fmt fmt, __sc_args); \
+} while (0)
+
#endif
/* let's only pollute this unit with these ridiculous definitions */
@@ -143,10 +158,32 @@
#define sk_callback_lock callback_lock
#define sk_user_data user_data
#define sk_data_ready data_ready
-#define sk_error_report error_report
#define sk_state_change state_change
#endif
+struct net_sock_container {
+ /* sockets themselves don't seem to have a nice way to refcount them
+ * above sock_release. one could use iget/iput, but that seems
+ * to interact poorly with sock_release() itself calling iput. */
+
+ struct kref sc_kref;
+ spinlock_t sc_lock;
+ struct socket *sc_sock;
+ struct nm_node *sc_node;
+ unsigned sc_from_connect:1,
+ sc_pending_connect:1;
+
+ struct list_head sc_item;
+
+ struct list_head sc_handlers;
+ struct page *sc_page;
+ size_t sc_page_off;
+
+ /* original handlers for the sockets */
+ void (*sc_state_change)(struct sock *sk);
+ void (*sc_data_ready)(struct sock *sk, int bytes);
+};
+
struct net_msg_handler {
struct rb_node nh_node;
u32 nh_max_len;
@@ -158,59 +195,51 @@
struct list_head nh_unregister_item;
};
-typedef struct _net_status_ctxt {
- u8 target_node;
- struct list_head list;
- enum net_system_error sys_status;
- s32 status;
- u64 msg_num;
- wait_queue_head_t wq;
- atomic_t woken;
-} net_status_ctxt;
+struct net_status_wait {
+ enum net_system_error ns_sys_status;
+ s32 ns_status;
+ int ns_id;
+ wait_queue_head_t ns_wq;
+ struct list_head ns_node_item;
+};
-
-/* all this state should eventually be brought up by object activation
- * and tied to that object rather than being globally valid at insmod */
-static spinlock_t net_status_lock = SPIN_LOCK_UNLOCKED;
-static LIST_HEAD(net_status_list);
-
static rwlock_t net_handler_lock = RW_LOCK_UNLOCKED;
static struct rb_root net_handler_tree = RB_ROOT;
/* this lock is also grabbed from bh context, non-bh use _bh() locking */
static spinlock_t net_active_lock = SPIN_LOCK_UNLOCKED;
static LIST_HEAD(net_active_list);
+static LIST_HEAD(net_detach_list);
+static LIST_HEAD(net_attach_list);
/* XXX someday we'll need better accounting */
static struct task_struct *net_recv_task = NULL;
-static inline void net_abort_status_return(net_status_ctxt *nsc)
-{
- spin_lock(&net_status_lock);
- if (!list_empty(&nsc->list))
- list_del_init(&nsc->list);
- spin_unlock(&net_status_lock);
-}
-
/////////////////////
static struct socket *net_init_tcp_recv_sock(u16 port);
static int net_receive_thread(void *data);
static int net_receive(void);
static void net_try_accept(struct socket *sock);
-static int net_process_message(struct socket *sock, net_msg *hdr);
+static int net_process_message(struct nm_node *node, struct socket *sock,
+ net_msg *hdr);
-static int net_sock_addref_or_connect(u8 node_num, struct socket **sock_ret);
-static void net_sock_decref(struct socket *sock, int error);
+static void net_data_ready(struct sock *sk, int bytes);
+static int net_sc_from_node(struct nm_node *node,
+ struct net_sock_container **sc_ret);
+static int net_attach_sc(struct net_sock_container *sc);
+static void net_finish_connect(struct nm_node *node);
+static struct net_sock_container *sc_alloc(struct nm_node *node, int from_conn);
+static void net_state_change(struct sock *sk);
+static void net_complete_nodes_nsw(struct nm_node *node);
//////////////////////
int net_start_rx_thread(struct nm_node *node)
{
struct socket *sock;
- net_inode_private *net = &node->nd_net_inode_private;
int ret = 0;
- BUG_ON(net->rx_thread != NULL);
+ BUG_ON(node->nd_rx_thread != NULL);
BUG_ON(net_recv_task != NULL);
/* if the thread was setting up the rx socket we'd like to have it
@@ -219,16 +248,17 @@
sock = net_init_tcp_recv_sock(node->nd_ipv4_port);
if (IS_ERR(sock)) {
ret = PTR_ERR(sock);
+ sock = NULL;
goto out;
}
netprintk0("starting net receive thread...\n");
- net->rx_thread = kthread_run(net_receive_thread, sock,
+ node->nd_rx_thread = kthread_run(net_receive_thread, sock,
"netrecv-%s", node->nd_name);
- if (IS_ERR(net->rx_thread)) {
- ret = PTR_ERR(net->rx_thread);
- net->rx_thread = NULL;
+ if (IS_ERR(node->nd_rx_thread)) {
+ ret = PTR_ERR(node->nd_rx_thread);
+ node->nd_rx_thread = NULL;
netprintk("unable to launch net receive thread, error=%ld\n",
(long)ret);
goto out;
@@ -236,7 +266,7 @@
/* once the thread is running it has ownership of the sock */
sock = NULL;
- net_recv_task = net->rx_thread;
+ net_recv_task = node->nd_rx_thread;
out:
if (sock)
@@ -246,11 +276,10 @@
void net_stop_rx_thread(struct nm_node *node)
{
- net_inode_private *net = &node->nd_net_inode_private;
- if (net->rx_thread) {
+ if (node->nd_rx_thread) {
netprintk("waiting for net thread to exit....\n");
- kthread_stop(net->rx_thread);
- net->rx_thread = NULL;
+ kthread_stop(node->nd_rx_thread);
+ node->nd_rx_thread = NULL;
net_recv_task = NULL;
}
@@ -261,36 +290,196 @@
* references.. ugh. */
}
-void net_stop_node_sock(struct nm_node *node)
+/*
+ * we must be detached at this point because the state detach tears down
+ * holds a ref.. this could well be called from detach.
+ */
+static void sc_kref_release(struct kref *kref)
{
- net_inode_private *net = &node->nd_net_inode_private;
- struct socket *sock = NULL;
+ struct net_sock_container *sc = container_of(kref,
+ struct net_sock_container, sc_kref);
+ scprintk0(sc, "releasing\n");
+ if (sc->sc_sock) {
+ sock_release(sc->sc_sock);
+ sc->sc_sock = NULL;
+ }
+}
- /* make sure this deals with all aspects of the net->sock
- * life time */
- spin_lock_bh(&net->sock_lock);
- if (net->sock) {
- sock = net->sock;
- net->sock_refs++;
+static void sc_put(struct net_sock_container *sc)
+{
+ scprintk0(sc, "put\n");
+ kref_put(&sc->sc_kref, sc_kref_release);
+}
+
+static void sk_register_callbacks(struct sock *sk,
+ struct net_sock_container *sc)
+{
+ write_lock_bh(&sk->sk_callback_lock);
+ if (sk->sk_user_data != sc) {
+ sk->sk_user_data = sc;
+ kref_get(&sc->sc_kref);
+ sc->sc_data_ready = sk->sk_data_ready;
+ sc->sc_state_change = sk->sk_state_change;
+ sk->sk_data_ready = net_data_ready;
+ sk->sk_state_change = net_state_change;
}
- spin_unlock_bh(&net->sock_lock);
- if (sock) {
- printk("shutting down sock %p\n", sock);
- net_sock_decref(sock, -ESHUTDOWN);
+ write_unlock_bh(&sk->sk_callback_lock);
+}
+
+static int sk_unregister_callbacks(struct sock *sk,
+ struct net_sock_container *sc)
+{
+ int ret = 0;
+
+ write_lock_bh(&sk->sk_callback_lock);
+ if (sk->sk_user_data == sc) {
+ ret = 1;
+ sk->sk_user_data = NULL;
+ sk->sk_data_ready = sc->sc_data_ready;
+ sk->sk_state_change = sc->sc_state_change;
}
- if (net->page) {
- __free_page(net->page);
- net->page = NULL;
+ write_unlock_bh(&sk->sk_callback_lock);
+
+ return ret;
+}
+
+/*
+ * When a sock_container is detached it will no longer see traffic once
+ * its current users are done. The sc is removed from the node so
+ * transmitting tasks can't find it. It is removed from the active lists
+ * and its callbacks are unregistered so that the receive thread can't
+ * find it. After this point the last person to drop their kref will
+ * free the container.
+ *
+ * If an sc is specified the caller must hold a reference. A node can be
+ * specified instead of an sc such that any sc currently attached to the
+ * node will be detached.
+ */
+void net_detach_sc(struct net_sock_container *sc, struct nm_node *node)
+{
+ int nr_puts = 0;
+ struct sock *sk;
+
+ /* node is only used to get the sc if it isn't specified */
+ BUG_ON(sc && node);
+ BUG_ON(sc == NULL && node == NULL);
+
+ /* no sc, get it from the node */
+ if (sc == NULL) {
+ spin_lock_bh(&node->nd_lock);
+ if (node->nd_sc) {
+ sc = node->nd_sc;
+ node->nd_sc = NULL;
+ net_complete_nodes_nsw(node);
+ /* we have this ref now */
+ nr_puts++;
+ }
+ spin_unlock_bh(&node->nd_lock);
+ node = NULL;
}
+
+ if (sc == NULL)
+ goto out;
+
+ scprintk(sc, "detaching with node %p\n", node);
+
+ spin_lock_bh(&sc->sc_lock);
+ if (sc->sc_node) {
+ /* drop our ref to the node below */
+ node = sc->sc_node;
+ sc->sc_node = NULL;
+ /* resolve our pending connect under sc_lock and while
+ * we have a node ref. this is done in the error case
+ * where state_change puts us on the detach list. */
+ if (sc->sc_pending_connect) {
+ sc->sc_pending_connect = 0;
+ net_finish_connect(node);
+ }
+ }
+ if (!list_empty(&sc->sc_item)) {
+ list_del_init(&sc->sc_item);
+ nr_puts++;
+ }
+ spin_unlock_bh(&sc->sc_lock);
+
+ if (node) {
+ spin_lock_bh(&node->nd_lock);
+ /* if the node is still pointing to us, drop that */
+ if (node->nd_sc == sc) {
+ printk(KERN_NOTICE "ocfs2:tcp: no longer connected to "
+ "node %s at %u.%u.%u.%u:%d\n", node->nd_name,
+ NIPQUAD(node->nd_ipv4_address),
+ ntohs(node->nd_ipv4_port));
+ node->nd_sc = NULL;
+ net_complete_nodes_nsw(node);
+ nr_puts++;
+ }
+ spin_unlock_bh(&node->nd_lock);
+ nm_node_put(node);
+ node = NULL;
+ }
+
+ /* drop user_data's ref and tear down the callbacks. the
+ * callbacks themselves execute under the callback_lock so
+ * this serializes with them. */
+ sk = sc->sc_sock->sk;
+ if (sk_unregister_callbacks(sk, sc))
+ nr_puts++;
+
+out:
+ if (sc) {
+ scprintk(sc, "detach droping %d refs\n", nr_puts);
+ while(nr_puts--)
+ sc_put(sc);
+ }
}
+EXPORT_SYMBOL(net_detach_sc);
+static void net_check_cb_lists(void)
+{
+ struct net_sock_container *sc;
+ /* when we get the sc off the list we inherit the ref that was
+ * created when the sc was put on the list */
+ spin_lock_bh(&net_active_lock);
+ while (!list_empty(&net_attach_list)) {
+ sc = list_entry(net_attach_list.next,
+ struct net_sock_container, sc_item);
+ list_del_init(&sc->sc_item);
+ spin_unlock_bh(&net_active_lock);
+
+ scprintk0(sc, "found on connect list\n");
+
+ net_attach_sc(sc);
+ sc_put(sc);
+
+ spin_lock_bh(&net_active_lock);
+ }
+
+ while (!list_empty(&net_detach_list)) {
+ sc = list_entry(net_detach_list.next,
+ struct net_sock_container, sc_item);
+
+ list_del_init(&sc->sc_item);
+ spin_unlock_bh(&net_active_lock);
+
+ scprintk0(sc, "found on detach list\n");
+
+ net_detach_sc(sc, NULL);
+ sc_put(sc);
+
+ spin_lock_bh(&net_active_lock);
+ }
+ spin_unlock_bh(&net_active_lock);
+}
+
static int net_rx_should_wake(struct socket *sock)
{
int empty;
spin_lock_bh(&net_active_lock);
- empty = list_empty(&net_active_list);
+ empty = list_empty(&net_active_list) && list_empty(&net_detach_list) &&
+ list_empty(&net_attach_list);
spin_unlock_bh(&net_active_lock);
return !empty || tcp_sk(sock->sk)->accept_queue;
@@ -304,6 +493,7 @@
while(!kthread_should_stop()) {
net_try_accept(sock);
+ net_check_cb_lists();
net_receive();
wait_event_interruptible(*sock->sk->sk_sleep,
@@ -521,35 +711,119 @@
return ret;
}
-static u64 net_next_msg_num(void)
+static int net_prep_nsw(struct nm_node *node, struct net_status_wait *nsw)
{
- static spinlock_t net_msg_num_lock = SPIN_LOCK_UNLOCKED;
- static u64 net_msg_num = 1;
- u64 ret;
+ int ret = 0;
- spin_lock(&net_msg_num_lock);
- ret = net_msg_num++;
- spin_unlock(&net_msg_num_lock);
+ do {
+ if (!idr_pre_get(&node->nd_status_idr, GFP_NOFS)) {
+ ret = -EAGAIN;
+ break;
+ }
+ spin_lock_bh(&node->nd_lock);
+ ret = idr_get_new(&node->nd_status_idr, nsw, &nsw->ns_id);
+ if (ret == 0)
+ list_add_tail(&nsw->ns_node_item,
+ &node->nd_status_list);
+ spin_unlock_bh(&node->nd_lock);
+ } while (ret == -EAGAIN);
+ if (ret == 0) {
+ init_waitqueue_head(&nsw->ns_wq);
+ nsw->ns_sys_status = NET_ERR_NONE;
+ nsw->ns_status = 0;
+ }
+
return ret;
}
+static void net_complete_nsw_locked(struct nm_node *node,
+ struct net_status_wait *nsw,
+ enum net_system_error sys_status,
+ s32 status)
+{
+ assert_spin_locked(&node->nd_lock);
+
+ if (!list_empty(&nsw->ns_node_item)) {
+ list_del_init(&nsw->ns_node_item);
+ nsw->ns_sys_status = sys_status;
+ nsw->ns_status = status;
+ idr_remove(&node->nd_status_idr, nsw->ns_id);
+ wake_up(&nsw->ns_wq);
+ }
+}
+
+static void net_complete_nsw(struct nm_node *node, struct net_status_wait *nsw,
+ u64 id, enum net_system_error sys_status,
+ s32 status)
+{
+ spin_lock_bh(&node->nd_lock);
+ if (nsw == NULL) {
+ if (id > INT_MAX)
+ goto out;
+
+ nsw = idr_find(&node->nd_status_idr, id);
+ if (nsw == NULL)
+ goto out;
+ }
+
+ net_complete_nsw_locked(node, nsw, sys_status, status);
+
+out:
+ spin_unlock_bh(&node->nd_lock);
+ return;
+}
+
+static void net_complete_nodes_nsw(struct nm_node *node)
+{
+ struct list_head *iter, *tmp;
+ unsigned int num_kills = 0;
+ struct net_status_wait *nsw;
+
+ assert_spin_locked(&node->nd_lock);
+
+ list_for_each_safe(iter, tmp, &node->nd_status_list) {
+ nsw = list_entry(iter, struct net_status_wait, ns_node_item);
+ net_complete_nsw_locked(node, nsw, NET_ERR_DIED, 0);
+ num_kills++;
+ }
+
+ netprintk("node %s (%u) died, killed %d messages\n", node->nd_name,
+ node->nd_num, num_kills);
+}
+
+static int net_nsw_completed(struct nm_node * node,
+ struct net_status_wait *nsw)
+{
+ int completed;
+ spin_lock_bh(&node->nd_lock);
+ completed = list_empty(&nsw->ns_node_item);
+ spin_unlock_bh(&node->nd_lock);
+ return completed;
+}
+
int net_send_message_iov(u32 msg_type, u32 key, struct iovec *caller_iov,
size_t caller_iovlen, u8 target_node,
int *status)
{
int ret;
- int cleanup_wq = 0;
- int cleanup_sock = 1;
net_msg *msg = NULL;
- net_status_ctxt nsc;
- wait_queue_t sleep;
size_t i, iovlen, caller_bytes = 0;
struct iovec *iov = NULL;
- struct socket *sock = NULL;
+ struct net_sock_container *sc = NULL;
+ struct nm_node *node = NULL;
+ struct net_status_wait nsw = {
+ .ns_node_item = LIST_HEAD_INIT(nsw.ns_node_item),
+ };
BUG_ON(net_recv_task && (current == net_recv_task));
+ if (net_recv_task == NULL) {
+ netprintk0("attempt to tx without a setup rx thread\n");
+ ret = -ESRCH;
+ goto out;
+ }
+
if (caller_iovlen == 0) {
netprintk0("bad iovec array length\n");
ret = -EINVAL;
@@ -565,12 +839,22 @@
goto out;
}
- ret = net_sock_addref_or_connect(target_node, &sock);
+ if (target_node == nm_this_node()) {
+ ret = -ELOOP;
+ goto out;
+ }
+
+ node = nm_get_node_by_num(target_node);
+ if (node == NULL) {
+ netprintk("node %u unknown\n", target_node);
+ ret = -EINVAL;
+ goto out;
+ }
+
+ ret = net_sc_from_node(node, &sc);
if (ret)
goto out;
- netprintk0("returned from net_sock_addref_or_connect, building msg\n");
-
/* build up our iovec */
iovlen = caller_iovlen + 1;
iov = kmalloc(sizeof(struct iovec) * iovlen, GFP_KERNEL);
@@ -593,66 +877,52 @@
msg->sys_status = NET_ERR_NONE;
msg->status = 0;
msg->key = key;
- msg->msg_num = net_next_msg_num();
iov[0].iov_len = sizeof(net_msg);
iov[0].iov_base = msg;
memcpy(&iov[1], caller_iov, caller_iovlen * sizeof(struct iovec));
- /* Setup for status return wait */
- init_waitqueue_head(&nsc.wq);
- atomic_set(&nsc.woken, 0);
- nsc.msg_num = msg->msg_num;
- nsc.sys_status = NET_ERR_NONE;
- nsc.status = 0;
- nsc.target_node = target_node;
+ ret = net_prep_nsw(node, &nsw);
+ if (ret)
+ goto out;
- init_waitqueue_entry(&sleep, current);
- add_wait_queue(&nsc.wq, &sleep);
- cleanup_wq = 1;
+ msg->msg_num = nsw.ns_id;
- spin_lock(&net_status_lock);
- list_add_tail(&nsc.list, &net_status_list);
- spin_unlock(&net_status_lock);
-
/* finally, convert the message header to network byte-order
* and send */
net_msg_to_net(msg);
- ret = net_send_tcp_msg(sock, iov, iovlen,
+ ret = net_send_tcp_msg(sc->sc_sock, iov, iovlen,
sizeof(net_msg) + caller_bytes);
net_msg_to_host(msg); /* just swapping for printk, its unused now */
msgprintk(msg, "sending returned %d\n", ret);
if (ret < 0) {
- net_abort_status_return(&nsc);
netprintk("error returned from net_send_tcp_msg=%d\n", ret);
goto out;
}
/* wait on other node's handler */
- wait_event(nsc.wq, (atomic_read(&nsc.woken) == 1));
+ wait_event(nsw.ns_wq, net_nsw_completed(node, &nsw));
/* Note that we avoid overwriting the callers status return
* variable if a system error was reported on the other
* side. Callers beware. */
- ret = net_sys_err_to_errno(nsc.sys_status);
+ ret = net_sys_err_to_errno(nsw.ns_sys_status);
if (status && !ret)
- *status = nsc.status;
+ *status = nsw.ns_status;
netprintk("woken, returning system status %d, user status %d",
- ret, nsc.status);
-
- /* At this point only destroy the socket if the other node died. */
- if (ret != -EHOSTDOWN)
- cleanup_sock = 0;
+ ret, nsw.ns_status);
out:
- if (cleanup_wq)
- remove_wait_queue(&nsc.wq, &sleep);
- if (sock)
- net_sock_decref(sock, cleanup_sock);
+ if (sc)
+ sc_put(sc);
if (iov)
kfree(iov);
if (msg)
kfree(msg);
+ if (node) {
+ net_complete_nsw(node, &nsw, 0, 0, 0);
+ nm_node_put(node);
+ }
return ret;
}
EXPORT_SYMBOL(net_send_message_iov);
@@ -708,45 +978,21 @@
return net_send_tcp_msg(sock, &iov, 1, sizeof(net_msg));
}
-static inline int net_is_valid_error_type(u32 err_type)
+/* a callback would like the rx thread to do some work on its behalf
+ * in process context.. the callback has a ref from sk->user_data,
+ * we grab a ref as we put the sc on the list that the rx thread will
+ * inherit when it removes from the list */
+static void net_sc_list_add(struct net_sock_container *sc,
+ struct list_head *list)
{
- if (err_type == NET_ALREADY_CONNECTED ||
- err_type == NET_UNKNOWN_HOST)
- return 1;
- return 0;
-}
+ assert_spin_locked(&net_active_lock);
-static void net_send_error(struct socket *sock, u16 err_type)
-{
- net_msg hdr = {
- .magic = NET_MSG_MAGIC,
- .msg_type = err_type,
- .data_len = 0,
- };
- struct iovec iov = {
- .iov_base = &hdr,
- .iov_len = sizeof(hdr),
- };
-
- if (!net_is_valid_error_type(err_type)) {
- netprintk("bug! bad error type! %u\n", err_type);
- goto out;
+ if (list_empty(&sc->sc_item)) {
+ scprintk(sc, "adding to list %p\n", list);
+ kref_get(&sc->sc_kref);
+ list_add_tail(&sc->sc_item, list);
}
- msgprintk(&hdr, "about to send error %u\n", err_type);
- net_msg_to_net(&hdr);
- net_send_tcp_msg(sock, &iov, 1, sizeof(net_msg));
-out:
- return;
-}
-
-static void net_make_active(net_inode_private *net)
-{
- assert_spin_locked(&net_active_lock);
-
- if (list_empty(&net->active_item))
- list_add_tail(&net->active_item, &net_active_list);
-
if (net_recv_task)
wake_up_process(net_recv_task);
}
@@ -757,114 +1003,83 @@
static void net_data_ready(struct sock *sk, int bytes)
{
void (*ready)(struct sock *sk, int bytes);
- net_inode_private *net;
+ struct net_sock_container *sc;
read_lock(&sk->sk_callback_lock);
- net = sk->sk_user_data;
- if (net == NULL) {
+ sc = sk->sk_user_data;
+ if (sc == NULL) {
ready = sk->sk_data_ready;
goto out;
}
- netprintk("data_ready hit for net %p\n", net);
+ scprintk0(sc, "data_ready hit\n");
spin_lock(&net_active_lock);
- net_make_active(net);
+ net_sc_list_add(sc, &net_active_list);
spin_unlock(&net_active_lock);
- ready = net->orig_data_ready;
+ ready = sc->sc_data_ready;
out:
read_unlock(&sk->sk_callback_lock);
ready(sk, bytes);
}
-static void net_error_report(struct sock *sk)
-{
- void (*report)(struct sock *sk);
- net_inode_private *net;
- read_lock(&sk->sk_callback_lock);
- net = sk->sk_user_data;
- if (net == NULL) {
- report = sk->sk_error_report;
- goto out;
- }
-
- netprintk("error_report hit for net %p\n", net);
-
- spin_lock(&net_active_lock);
- net_make_active(net);
- spin_unlock(&net_active_lock);
-
- report = net->orig_error_report;
-out:
- read_unlock(&sk->sk_callback_lock);
- report(sk);
-}
-
+/*
+ * sk callbacks put sockets with activity on a list and we pluck off the
+ * them and call into the stack. we get the ref from their entry on
+ * the list.
+ */
static int net_receive(void)
{
- LIST_HEAD(snapshot_list);
- net_inode_private *net;
- struct socket *sock;
+ struct net_sock_container *sc;
net_msg *hdr;
int err = 0, read_eagain, read_some;
void *data;
size_t datalen;
+ struct nm_node *node = NULL;
- /* process in batches so that the receive thread gets
- * a chance to accept new sockets now and again */
spin_lock_bh(&net_active_lock);
- list_splice_init(&net_active_list, &snapshot_list);
- spin_unlock_bh(&net_active_lock);
+ while (!list_empty(&net_active_list)) {
+ sc = list_entry(net_active_list.next,
+ struct net_sock_container, sc_item);
- /* we don't need locks to test our list because we're the
- * only people who remove active_items from lists */
- while (!list_empty(&snapshot_list)) {
- net = list_entry(snapshot_list.next, net_inode_private,
- active_item);
-
- /* remove the net from the active list so that data_ready
- * can put it back on if it hits just after we read */
- spin_lock_bh(&net_active_lock);
- list_del_init(&net->active_item);
+ /* we now have the ref that adding created */
+ list_del_init(&sc->sc_item);
spin_unlock_bh(&net_active_lock);
- sock = NULL;
+ scprintk0(sc, "found on active list\n");
err = 0;
read_eagain = 0;
read_some = 0;
- /* basically a manual addref that doesn't connect :/ */
- spin_lock_bh(&net->sock_lock);
- if (net->sock && !net->sock_pending) {
- sock = net->sock;
- net->sock_refs++;
- if (net->defer_release)
- err = -ENOTCONN;
+ /* catch a race with detach or get a ref on the node
+ * so we can process status messages */
+ spin_lock_bh(&sc->sc_lock);
+ if (sc->sc_node == NULL)
+ err = -ENOTCONN;
+ else {
+ nm_node_get(sc->sc_node);
+ node = sc->sc_node;
}
- spin_unlock_bh(&net->sock_lock);
-
- if (sock == NULL)
- continue;
-
+ spin_unlock_bh(&sc->sc_lock);
if (err)
goto done;
/* do we need more header? */
- if (net->page_off < sizeof(net_msg)) {
- data = page_address(net->page) + net->page_off;
- datalen = sizeof(net_msg) - net->page_off;
- err = net_recv_tcp_msg(sock, data, datalen);
+ if (sc->sc_page_off < sizeof(net_msg)) {
+ data = page_address(sc->sc_page) + sc->sc_page_off;
+ datalen = sizeof(net_msg) - sc->sc_page_off;
+ err = net_recv_tcp_msg(sc->sc_sock, data, datalen);
if (err > 0) {
- net->page_off += err;
+ sc->sc_page_off += err;
read_some = 1;
/* only swab incoming here.. we can
* only get here once as we cross from
* being under to over */
- if (net->page_off == sizeof(net_msg)) {
- hdr = page_address(net->page);
+ if (sc->sc_page_off == sizeof(net_msg)) {
+ hdr = page_address(sc->sc_page);
net_msg_to_host(hdr);
if (hdr->data_len > NET_MAX_PAYLOAD_BYTES)
err = -EOVERFLOW;
@@ -877,26 +1092,26 @@
}
}
- if (net->page_off < sizeof(net_msg)) {
+ if (sc->sc_page_off < sizeof(net_msg)) {
/* oof, still don't have a header */
goto done;
}
/* this was swabbed above when we first read it */
- hdr = page_address(net->page);
+ hdr = page_address(sc->sc_page);
- msgprintk(hdr, "at page_off %zu\n", net->page_off);
+ msgprintk(hdr, "at page_off %zu\n", sc->sc_page_off);
/* do we need more payload? */
- if (net->page_off - sizeof(net_msg) < hdr->data_len) {
+ if (sc->sc_page_off - sizeof(net_msg) < hdr->data_len) {
/* need more payload */
- data = page_address(net->page) + net->page_off;
+ data = page_address(sc->sc_page) + sc->sc_page_off;
datalen = (sizeof(net_msg) + hdr->data_len) -
- net->page_off;
- err = net_recv_tcp_msg(sock, data, datalen);
+ sc->sc_page_off;
+ err = net_recv_tcp_msg(sc->sc_sock, data, datalen);
if (err > 0) {
read_some = 1;
- net->page_off += err;
+ sc->sc_page_off += err;
}
if (err < 0) {
if (err == -EAGAIN)
@@ -905,88 +1120,56 @@
}
}
- if (net->page_off - sizeof(net_msg) == hdr->data_len) {
+ if (sc->sc_page_off - sizeof(net_msg) == hdr->data_len) {
/* whooo peee, we have a full message */
/* after calling this the message is toast */
- err = net_process_message(sock, hdr);
- net->page_off = 0;
+ err = net_process_message(node, sc->sc_sock, hdr);
+ sc->sc_page_off = 0;
}
done:
+
+ if (node) {
+ nm_node_put(node);
+ node = NULL;
+ }
+
+ scprintk(sc, "finished reading with %d\n", err);
+ if (err == -EAGAIN)
+ err = 0;
+
/* we might not have consumed all the data that has been
* announced to us through data_ready.. keep the net active
* as long as there may still be remaining data.
* data_ready might have been called after we saw eagain */
- spin_lock_bh(&net_active_lock);
- if (read_some && !read_eagain)
- net_make_active(net);
- spin_unlock_bh(&net_active_lock);
-
- netprintk("net %p finished reading with %d\n", net, err);
- if (sock && err < 0 && err != -EAGAIN) {
- netprintk("socket saw err %d, closing\n", err);
- net_sock_decref(sock, err);
+ if (!err && read_some && !read_eagain) {
+ spin_lock_bh(&net_active_lock);
+ net_sc_list_add(sc, &net_active_list);
+ spin_unlock_bh(&net_active_lock);
}
- }
- return 0;
-}
-
-static void net_do_status_return(net_msg *hdr)
-{
- net_status_ctxt *nsc = NULL;
- struct list_head *iter;
-
- spin_lock(&net_status_lock);
- list_for_each(iter, &net_status_list) {
- nsc = list_entry(iter, net_status_ctxt, list);
- if (nsc->msg_num == hdr->msg_num) {
- nsc->sys_status = hdr->sys_status;
- nsc->status = hdr->status;
- atomic_set(&nsc->woken, 1);
- list_del_init(&nsc->list);
- wake_up(&nsc->wq);
- break;
+ /* this exists to catch the framing errors, all other
+ * errors should come through state_change, really. */
+ if (err) {
+ scprintk(sc, "saw err %d, closing\n", err);
+ net_detach_sc(sc, NULL);
}
- nsc = NULL;
- }
- spin_unlock(&net_status_lock);
- msgprintk(hdr, "sent to nsc %p\n", nsc);
-}
-
-static void net_kill_node_messages(u8 node)
-{
- unsigned int num_kills = 0;
- net_status_ctxt *nsc = NULL;
- struct list_head *iter, *tmp;
-
- spin_lock(&net_status_lock);
- list_for_each_safe(iter, tmp, &net_status_list) {
- nsc = list_entry(iter, net_status_ctxt, list);
- if (nsc->target_node == node) {
- nsc->sys_status = NET_ERR_DIED;
- atomic_set(&nsc->woken, 1);
- list_del_init(&nsc->list);
- wake_up(&nsc->wq);
-
- num_kills++;
- }
+ sc_put(sc);
+ spin_lock_bh(&net_active_lock);
}
- spin_unlock(&net_status_lock);
-
- netprintk("node %u died, killed %d messages\n", node, num_kills);
+ spin_unlock_bh(&net_active_lock);
+ return 0;
}
-/* this callback is registered on insmod and torn down on rmmod.
- * the list and locks that it uses to kill messages are statically
- * defined so it should be ok.. it just has to carefully be called
- * after hb is ready and before hb is torn down */
+/* if we get a message that the node has gone down then we detach its
+ * active socket. This will send error codes to any transmitters that
+ * were waiting for status replies on that node. */
static void net_hb_node_down_cb(struct nm_node *node,
int node_num,
void *data)
{
- net_kill_node_messages(node_num);
+ net_detach_sc(NULL, node);
}
static struct hb_callback_func *net_hb_down = NULL;
@@ -1020,7 +1203,8 @@
/* this returns -errno if the header was unknown or too large, etc.
* after this is called the buffer us reused for the next message */
-static int net_process_message(struct socket *sock, net_msg *hdr)
+static int net_process_message(struct nm_node *node, struct socket *sock,
+ net_msg *hdr)
{
int ret, handler_status;
enum net_system_error syserr;
@@ -1031,7 +1215,8 @@
if (hdr->magic == NET_MSG_STATUS_MAGIC) {
/* special type for returning message status */
- net_do_status_return(hdr);
+ net_complete_nsw(node, NULL, hdr->msg_num, hdr->sys_status,
+ hdr->status);
ret = 0;
goto out;
} else if (hdr->magic != NET_MSG_MAGIC) {
@@ -1040,17 +1225,6 @@
goto out;
}
- if (net_is_valid_error_type(hdr->msg_type)) {
- if (hdr->msg_type == NET_ALREADY_CONNECTED) {
- msgprintk0(hdr, "error: there is already a socket "
- "for this connection\n");
- } else if (hdr->msg_type == NET_UNKNOWN_HOST) {
- msgprintk0(hdr, "error: unknown host\n");
- }
- ret = 0;
- goto out;
- }
-
/* find a handler for it */
ret = 0;
handler_status = 0;
@@ -1084,145 +1258,187 @@
return ret;
}
-/*
- * The rest of the file is all about managing sockets.
+/*
+ * This is called once a socket has come through accept() or connect() and
+ * is ready for business. Once it is attached to the node
+ * then transmitting paths can get ahold of it and send messages.
*/
-
-static int net_attach_sock(net_inode_private *net, struct socket *sock)
+static int net_attach_sc(struct net_sock_container *sc)
{
struct sock *sk;
int ret = 0;
+ u8 this_node = nm_this_node(); /* :( */
+ struct net_sock_container *detach = NULL;
+ struct nm_node *node = sc->sc_node;
- netprintk("attaching sock %p to net %p\n", sock, net);
+ scprintk(sc, "attaching with node %p\n", node);
- /* this could be racing with an active connect, it needs to
- * compare the socks consistently so both sides agree to close
- * the same socket */
- spin_lock_bh(&net->sock_lock);
- if (net->sock != NULL && net->sock != sock)
- ret = -EEXIST;
- else
- net->sock = sock;
- spin_unlock_bh(&net->sock_lock);
+ BUG_ON(node == NULL);
+
+ spin_lock_bh(&node->nd_lock);
+ if (node->nd_sc) {
+ /* this is a little exciting. If we have racing sockets we
+ * need to agree which to prefer with the remote node. We only
+ * get here if both sockets are successfully connected. The
+ * node with the lower number loses the socket it initiated
+ * with connect(). */
+ if (this_node < node->nd_num) {
+ /* lose the one we started with connect() */
+ if (sc->sc_from_connect)
+ ret = -EEXIST;
+ } else {
+ /* lose the one we accepted */
+ if (!sc->sc_from_connect)
+ ret = -EEXIST;
+ }
+ /* we're keeping the new sc and the old one has to go */
+ if (ret == 0) {
+ detach = node->nd_sc;
+ node->nd_sc = NULL;
+ /* we get its ref and drop it below */
+ }
+ }
+ if (node->nd_sc == NULL) {
+ node->nd_sc = sc;
+ kref_get(&sc->sc_kref);
+ /* resolve our pending connect before we drop the node lock */
+ if (sc->sc_pending_connect) {
+ sc->sc_pending_connect = 0;
+ net_finish_connect(node);
+ } else {
+ /* accepts arriving should wake sleepers, too */
+ atomic_inc(&node->nd_sc_generation);
+ wake_up_all(&node->nd_sc_wq);
+ }
+ printk(KERN_NOTICE "ocfs2:tcp: %s connection to node %s at "
+ "%u.%u.%u.%u:%d\n",
+ sc->sc_from_connect ? "initiated" : "accepted",
+ node->nd_name, NIPQUAD(node->nd_ipv4_address),
+ ntohs(node->nd_ipv4_port));
+ }
+ spin_unlock_bh(&node->nd_lock);
if (ret)
goto out;
+ if (detach) {
+ net_detach_sc(detach, NULL);
+ sc_put(detach);
+ detach = NULL;
+ }
- sk = net->sock->sk;
+ sk = sc->sc_sock->sk;
tcp_sk(sk)->nonagle = 1;
+ sk_register_callbacks(sk, sc);
- write_lock_bh(&sk->sk_callback_lock);
- net->orig_data_ready = sk->sk_data_ready;
- net->orig_error_report = sk->sk_error_report;
-
- sk->sk_user_data = net;
- sk->sk_data_ready = net_data_ready;
- sk->sk_error_report = net_error_report;
- write_unlock_bh(&sk->sk_callback_lock);
-
/* record it as active initially to make sure we didn't miss
* any incoming data while we were setting it up */
spin_lock_bh(&net_active_lock);
- net_make_active(net);
+ net_sc_list_add(sc, &net_active_list);
spin_unlock_bh(&net_active_lock);
out:
- netprintk("attaching sock %p to net %p returned: %d\n", sock, net,
- ret);
+ scprintk(sc, "attaching now node %p returned %d\n", node, ret);
return ret;
}
-struct waiting_for_sock {
- int rc;
- struct list_head waiting_item;
- wait_queue_t entry;
-};
-
-static void net_wake_sock_waiters(net_inode_private *net, int rc)
-{
- struct list_head *pos, *tmp;
- struct waiting_for_sock *wfs;
-
- assert_spin_locked(&net->sock_lock);
-
- netprintk("net %p waking waiters with rc %d\n", net, rc);
-
- list_for_each_safe(pos, tmp, &net->pending_waiters) {
- wfs = list_entry(pos, struct waiting_for_sock, waiting_item);
- list_del_init(&wfs->waiting_item);
-
- wfs->rc = rc;
- }
-
- wake_up(&net->waitq);
-}
-
-/* we register this callback when we start a connect. once we're done
- * with the connect state machine we unregister ourselves. teardown of
- * active connected sockets only happens in the rx thread. this socket
- * can only make it to the rx thread through here and we reset _state_change.
- * this can't race with teardown */
static void net_state_change(struct sock *sk)
{
- net_inode_private *net;
void (*state_change)(struct sock *sk);
- int rc = 0, should_wake = 1;
+ struct net_sock_container *sc;
+ struct list_head *list = NULL;
write_lock(&sk->sk_callback_lock);
- net = sk->sk_user_data;
- BUG_ON(net == NULL);
- BUG_ON(net->sock == NULL);
+ /* we might have raced with the node being torn down and this
+ * sc being detached from the node */
+ sc = sk->sk_user_data;
+ if (sc == NULL) {
+ state_change = sk->sk_state_change;
+ goto out;
+ }
- state_change = net->orig_state_change;
+ scprintk(sc, "state_change to %d\n", sk->sk_state);
+ state_change = sc->sc_state_change;
+
switch(sk->sk_state) {
case TCP_SYN_SENT:
case TCP_SYN_RECV:
- should_wake = 0;
break;
case TCP_ESTABLISHED:
- rc = 0;
+ list = &net_attach_list;
break;
default:
- rc = -ENOTCONN;
+ list = &net_detach_list;
break;
}
- netprintk("net %p sock %p went to state %d; should_wake %d rc %d\n",
- net, net->sock, sk->sk_state, should_wake, rc);
+ /* if we've finished a connect or have seen an error we let
+ * the rx thread know so it can act accordingly on our behalf */
+ if (list) {
+ spin_lock(&net_active_lock);
+ net_sc_list_add(sc, list);
+ spin_unlock(&net_active_lock);
+ }
+out:
+ write_unlock(&sk->sk_callback_lock);
+ state_change(sk);
+}
- if (should_wake) {
- spin_lock(&net->sock_lock);
- if (net->sock_pending) {
- net->sock_pending = 0;
- /* let the rx thread do the final _release when
- * they go to grab this guy */
- if (rc)
- net->defer_release = 1;
+static struct net_sock_container *sc_alloc(struct nm_node *node, int from_conn)
+{
+ struct net_sock_container *sc, *ret = NULL;
+ struct page *page = NULL;
- sk->sk_state_change = net->orig_state_change;
- net_wake_sock_waiters(net, rc);
- } else
- should_wake = 0;
- spin_unlock(&net->sock_lock);
- }
+ page = alloc_page(GFP_NOFS);
+ sc = kcalloc(1, sizeof(*sc), GFP_NOFS);
+ if (sc == NULL || page == NULL)
+ goto out;
- write_unlock(&sk->sk_callback_lock);
+ kref_init(&sc->sc_kref, sc_kref_release);
+ spin_lock_init(&sc->sc_lock);
+ nm_node_get(node);
+ sc->sc_node = node;
+ sc->sc_from_connect = from_conn;
+ sc->sc_pending_connect = from_conn;
- /* net_attach grabs every lock in the known universe so we do it
- * out here */
- if (should_wake)
- net_attach_sock(net, net->sock);
+ INIT_LIST_HEAD(&sc->sc_item);
+ INIT_LIST_HEAD(&sc->sc_handlers);
- state_change(sk);
-}
+ scprintk0(sc, "alloced\n");
-static int net_start_connect(net_inode_private *net, u32 addr, u16 port)
+ ret = sc;
+ sc->sc_page = page;
+ sc = NULL;
+ page = NULL;
+
+out:
+ if (page)
+ __free_page(page);
+ kfree(sc);
+
+ return ret;
+}
+
+static void net_finish_connect(struct nm_node *node)
{
+ atomic_dec(&node->nd_pending_connects);
+ atomic_inc(&node->nd_sc_generation);
+ /* don't care about this rare thundering herd */
+ wake_up_all(&node->nd_sc_wq);
+}
+
+static int net_start_connect(struct nm_node *node, u32 addr, u16 port)
+{
struct socket *sock = NULL;
- struct sock *sk;
struct sockaddr_in myaddr, remoteaddr;
+ struct net_sock_container *sc = NULL;
int ret;
+ sc = sc_alloc(node, 1);
+ if (sc == NULL) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
if (ret < 0) {
netprintk("can't create socket: %d\n", ret);
@@ -1239,36 +1455,41 @@
goto out;
}
- memset (&remoteaddr, 0, sizeof (remoteaddr));
+ memset(&remoteaddr, 0, sizeof(remoteaddr));
remoteaddr.sin_family = AF_INET;
remoteaddr.sin_addr.s_addr = addr;
remoteaddr.sin_port = port;
- net->sock = sock;
+ /* from this point on sc_kref_release has the responsibility
+ * of dropping the socket */
+ sc->sc_sock = sock;
+ sock = NULL;
- sk = sock->sk;
- write_lock_bh(&sk->sk_callback_lock);
- sk->sk_user_data = net;
- net->orig_state_change = sk->sk_state_change;
- sk->sk_state_change = net_state_change;
- write_unlock_bh(&sk->sk_callback_lock);
+ sk_register_callbacks(sc->sc_sock->sk, sc);
- ret = sock->ops->connect(sock, (struct sockaddr *)&remoteaddr,
- sizeof(remoteaddr), O_NONBLOCK);
+ ret = sc->sc_sock->ops->connect(sc->sc_sock,
+ (struct sockaddr *)&remoteaddr,
+ sizeof(remoteaddr),
+ O_NONBLOCK);
if (ret == -EINPROGRESS)
ret = 0;
- netprintk("starting connect for net %p sock %p gave %d\n", net, sock,
- ret);
+
+ scprintk(sc, "->connect gave %d\n", ret);
out:
+ /* if ret == 0 then the callback has the responsibility of calling
+ * net_finish_connect */
if (ret) {
- spin_lock_bh(&net->sock_lock);
- net->sock_pending = 0;
- net->sock = NULL;
- net_wake_sock_waiters(net, ret);
- spin_unlock_bh(&net->sock_lock);
+ net_finish_connect(node);
+ if (sc) {
+ /* stop detach from trying to finish again */
+ sc->sc_pending_connect = 0;
+ net_detach_sc(sc, NULL);
+ }
if (sock)
sock_release(sock);
}
+ if (sc)
+ sc_put(sc);
return ret;
}
@@ -1295,197 +1516,130 @@
set_fs(oldfs);
}
-static void net_sock_decref(struct socket *sock, int error)
+
+/*
+ * some tx code path would like to send a message. For them to do so
+ * we need to follow an nm_node struct to find an active sock container.
+ * If we find a node without a sock container then we try and issue a
+ * connect and wait for its outcome. We only really block waiting for
+ * the node to have an active socket. The rx thread might race with
+ * the connect() to accept() a socket that we'll happily use.
+ */
+static int net_sc_from_node(struct nm_node *node,
+ struct net_sock_container **sc_ret)
{
- net_inode_private *net = NULL;
- int release = 0;
+ struct net_sock_container *sc = NULL;
+ int ret = 0, gen = 0, issue_connect = 0;
- /* we hold a ref, this should be stable */
- net = sock->sk->sk_user_data;
- BUG_ON(net == NULL);
-
- spin_lock_bh(&net->sock_lock);
-
- BUG_ON(net->sock_pending);
- BUG_ON(net->sock_refs == 0);
- BUG_ON(net->sock == NULL);
-
- netprintk("decref for net %p ->sock %p err %d: refs %lu defer %u\n",
- net, net->sock, error, net->sock_refs, net->defer_release);
-
- if (error)
- net->defer_release = 1;
- if (--net->sock_refs == 0 && net->defer_release) {
- sock = net->sock;
- net->sock = NULL;
- net->defer_release = 0;
- release = 1;
- }
- spin_unlock_bh(&net->sock_lock);
-
- if (release) {
- /* stop any callbacks from hitting before we tear down
- * this sock */
- if (sock->sk && sock->sk->sk_user_data) {
- struct sock *sk = sock->sk;
-
- write_lock_bh(&sk->sk_callback_lock);
- sk->sk_data_ready = net->orig_data_ready;
- sk->sk_error_report = net->orig_error_report;
- sk->sk_user_data = NULL;
- write_unlock_bh(&sk->sk_callback_lock);
- /* XXX can we sync with bottom halves here? */
+ spin_lock_bh(&node->nd_lock);
+ if (node->nd_sc) {
+ sc = node->nd_sc;
+ kref_get(&sc->sc_kref);
+ } else {
+ if (atomic_read(&node->nd_pending_connects) == 0) {
+ atomic_inc(&node->nd_pending_connects);
+ issue_connect = 1;
}
- sock_release(sock);
+ gen = atomic_read(&node->nd_sc_generation);
}
-}
+ spin_unlock_bh(&node->nd_lock);
-static int wfs_complete(net_inode_private *net, struct waiting_for_sock *wfs)
-{
- int empty;
-
- spin_lock_bh(&net->sock_lock);
- empty = list_empty(&wfs->waiting_item);
- spin_unlock_bh(&net->sock_lock);
-
- return empty;
-}
-
-static int net_sock_addref_or_connect(u8 target_node, struct socket **sock_ret)
-{
- struct nm_node *node = NULL;
- net_inode_private *net = NULL;
- struct socket *sock = NULL;
- int ret = 0, wait = 0, set_pending = 0;
- struct waiting_for_sock wfs;
-
- /* XXX think about passing refs around.. */
- node = nm_get_node_by_num(target_node);
- if (node == NULL) {
- netprintk("node %u unknown\n", target_node);
- ret = -EINVAL;
+ if (sc)
goto out;
- }
- /* XXX verify that node is fully configured, rx thread is going */
- net = &node->nd_net_inode_private;
- spin_lock_bh(&net->sock_lock);
- if (net->sock && !net->sock_pending) {
- /* just get a ref. this could be a defer_release socket */
- sock = net->sock;
- net->sock_refs++;
- } else {
- netprintk("Initiating the connect!\n");
-
- if (!net->sock_pending) {
- /* ok, we'll be initiating the connect */
- net->sock_pending = 1;
- set_pending = 1;
- }
- list_add_tail(&wfs.waiting_item, &net->pending_waiters);
- init_waitqueue_entry(&wfs.entry, current);
- add_wait_queue(&net->waitq, &wfs.entry);
- wait = 1;
- }
- spin_unlock_bh(&net->sock_lock);
-
- if (set_pending) {
- ret = net_start_connect(net, node->nd_ipv4_address,
+ if (issue_connect) {
+ /* either net_start_connect or the callbacks it registers
+ * have the responsibility of calling _finish_connect() */
+ ret = net_start_connect(node, node->nd_ipv4_address,
node->nd_ipv4_port);
if (ret)
goto out;
}
- if (wait) {
- ret = wait_event_interruptible(net->waitq,
- wfs_complete(net, &wfs));
- if (ret == 0)
- ret = wfs.rc;
- netprintk("sleeping for net %p gave %d\n", net, ret);
- if (ret)
- goto out;
+ /* wait for nd_sc to change, either our connect finishing or
+ * an accept arriving */
+ ret = wait_event_interruptible(node->nd_sc_wq,
+ atomic_read(&node->nd_sc_generation) !=
+ gen);
+ if (ret)
+ goto out;
- /* try again to get a good socket. if we can't, just
- * forget about it. */
- spin_lock_bh(&net->sock_lock);
- if (net->sock && !net->sock_pending) {
- sock = net->sock;
- net->sock_refs++;
- } else
- ret = -ENOTCONN;
- spin_unlock_bh(&net->sock_lock);
- if (ret)
- goto out;
+ /* we only wait for one iteration right now */
+ spin_lock_bh(&node->nd_lock);
+ if (node->nd_sc) {
+ sc = node->nd_sc;
+ kref_get(&sc->sc_kref);
+ } else {
+ ret = -ENOTCONN;
}
-
+ spin_unlock_bh(&node->nd_lock);
+
out:
- if (wait) {
- spin_lock_bh(&net->sock_lock);
- if (!list_empty(&wfs.waiting_item))
- list_del_init(&wfs.waiting_item);
- remove_wait_queue(&net->waitq, &wfs.entry);
- spin_unlock_bh(&net->sock_lock);
+ if (sc) {
+ scprintk(sc, "returning for node %s (%u)\n", node->nd_name,
+ node->nd_num);
+ *sc_ret = sc;
}
- if (sock)
- *sock_ret = sock;
- if (node)
- nm_node_put(node);
- BUG_ON(ret == 0 && sock == NULL);
- netprintk("addref for net %p gave %d\n", net, ret);
+ BUG_ON(ret == 0 && sc == NULL);
+ BUG_ON(ret && sc);
+ netprintk("sc_get for node %s (%u) gave %d, %p\n", node->nd_name,
+ node->nd_num, ret, sc);
return ret;
}
static void net_try_accept(struct socket *sock)
{
- int error, slen;
+ int ret, slen;
struct sockaddr_in sin;
struct socket *new_sock = NULL;
struct nm_node *node = NULL;
+ struct net_sock_container *sc = NULL;
BUG_ON(sock == NULL);
- error = sock_create_lite(sock->sk->sk_family,
- sock->sk->sk_type,
- sock->sk->sk_protocol,
- &new_sock);
- if (error)
+ ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type,
+ sock->sk->sk_protocol, &new_sock);
+ if (ret)
goto out;
new_sock->type = sock->type;
new_sock->ops = sock->ops;
- error = sock->ops->accept(sock, new_sock, O_NONBLOCK);
- if (error < 0)
+ ret = sock->ops->accept(sock, new_sock, O_NONBLOCK);
+ if (ret < 0)
goto out;
slen = sizeof(sin);
- error = new_sock->ops->getname(new_sock, (struct sockaddr *) &sin,
+ ret = new_sock->ops->getname(new_sock, (struct sockaddr *) &sin,
&slen, 1);
- if (error < 0)
+ if (ret < 0)
goto out;
- netprintk("attempt to connect from %u.%u.%u.%u:%04x\n",
- NIPQUAD(sin.sin_addr.s_addr), ntohs(sin.sin_port));
-
node = nm_get_node_by_ip(sin.sin_addr.s_addr);
if (node == NULL) {
- netprintk0("connect from unknown host...\n");
- net_send_error(new_sock, NET_UNKNOWN_HOST);
+ printk(KERN_WARNING "attempt to connect from unknown node at "
+ "%u.%u.%u.%u:%d\n", NIPQUAD(sin.sin_addr.s_addr),
+ ntohs(sin.sin_port));
goto out;
}
- netprintk("connect from known host: %s\n", node->nd_name);
-
if (ntohs(sin.sin_port) >= 1024)
netprintk("warning: connect from unprivileged port: "
"%u.%u.%u.%u:%d\n", NIPQUAD(sin.sin_addr.s_addr),
ntohs(sin.sin_port));
- error = net_attach_sock(&node->nd_net_inode_private, new_sock);
- if (error == -EEXIST)
- net_send_error(new_sock, NET_ALREADY_CONNECTED);
+ sc = sc_alloc(node, 0);
+ if (sc == NULL) {
+ ret = -ENOMEM;
+ goto out;
+ }
+ sc->sc_sock = new_sock;
+ new_sock = NULL;
+
+ ret = net_attach_sc(sc);
+
out:
- if (error) {
+ if (ret) {
if (new_sock) {
net_sock_drain(new_sock);
sock_release(new_sock);
@@ -1493,6 +1647,8 @@
}
if (node)
nm_node_put(node);
+ if (sc)
+ sc_put(sc);
return;
}
Modified: trunk/fs/ocfs2/cluster/tcp.h
===================================================================
--- trunk/fs/ocfs2/cluster/tcp.h 2005-04-12 20:45:35 UTC (rev 2137)
+++ trunk/fs/ocfs2/cluster/tcp.h 2005-04-12 21:08:05 UTC (rev 2138)
@@ -167,6 +167,7 @@
void net_unregister_hb_callbacks(void);
int net_start_rx_thread(struct nm_node *node);
void net_stop_rx_thread(struct nm_node *node);
-void net_stop_node_sock(struct nm_node *node);
+struct net_sock_container;
+void net_detach_sc(struct net_sock_container *sc, struct nm_node *node);
#endif /* CLUSTER_TCP_H */
More information about the Ocfs2-commits
mailing list