[Ocfs2-commits] zab commits r2422 - in trunk/fs/ocfs2: cluster dlm
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Tue Jun 28 16:07:32 CDT 2005
Author: zab
Signed-off-by: mfasheh
Date: 2005-06-23 20:43:58 -0500 (Thu, 23 Jun 2005)
New Revision: 2422
Modified:
trunk/fs/ocfs2/cluster/heartbeat.c
trunk/fs/ocfs2/cluster/heartbeat.h
trunk/fs/ocfs2/cluster/masklog.c
trunk/fs/ocfs2/cluster/masklog.h
trunk/fs/ocfs2/cluster/nodemanager.c
trunk/fs/ocfs2/cluster/tcp.c
trunk/fs/ocfs2/cluster/tcp.h
trunk/fs/ocfs2/cluster/tcp_internal.h
trunk/fs/ocfs2/dlm/dlmdomain.c
Log:
Keep track of node connectivity and fence nodes who find out that they
can't talk to a majority of the heartbeating nodes in the cluster. Also
add the net protocol version handshake.
o turn on some tcp options to send keepalives and timeout after 10s
o count connected and hearbeating nodes. on disconnect, after hb timeout,
if we aren't connected to a majority of the cluster we panic.
o only reconnect until we have a valid connection.. if it fails hb must
go down and back up to connect again.
o get rid of the complicated reconnecting backoff; just reconnect every 2s
o add a timeout that triggers if we don't connect/accept in time after hb up
o enotconn is now fatal in the dlm join path
o trade version numbers on connect and fail if they don't match
o move to static hb timeouts so we can derive net timeouts from them
o turn down hb timeout to 14 seconds to make our 45 sec deadline
o fix up bad eof/handshake handling in rx_until_empty that could livelock
o add hb helpers which don't use the sem for calling while holding it
o give net the ability to stop all hb regions
o avoid infinite recursion by not flushing the workqueue from within it
Signed-off-by: mfasheh
Modified: trunk/fs/ocfs2/cluster/heartbeat.c
===================================================================
--- trunk/fs/ocfs2/cluster/heartbeat.c 2005-06-24 01:00:45 UTC (rev 2421)
+++ trunk/fs/ocfs2/cluster/heartbeat.c 2005-06-24 01:43:58 UTC (rev 2422)
@@ -57,18 +57,14 @@
static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
static u64 o2hb_generation;
+static LIST_HEAD(o2hb_all_regions);
+
static struct o2hb_callback {
struct list_head list;
} o2hb_callbacks[O2HB_NUM_CB];
static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
-#define O2HB_REGION_MIN_TIMEOUT_MS 500
-#define O2HB_REGION_MAX_TIMEOUT_MS jiffies_to_msecs(MAX_JIFFY_OFFSET)
-
-/* number of changes to be seen as live */
-#define O2HB_LIVE_THRESHOLD 2
-
#define O2HB_DEFAULT_BLOCK_BITS 9
struct o2hb_node_event {
@@ -93,6 +89,9 @@
struct o2hb_region {
struct config_item hr_item;
+ struct list_head hr_all_item;
+ unsigned hr_unclean_stop:1;
+
/* protected by the hr_callback_sem */
struct task_struct *hr_task;
@@ -791,7 +790,7 @@
mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
- while (!kthread_should_stop()) {
+ while (!kthread_should_stop() && !reg->hr_unclean_stop) {
o2hb_do_disk_heartbeat(reg);
/* the kthread api has blocked signals for us so no
@@ -799,7 +798,8 @@
msleep_interruptible(reg->hr_timeout_ms);
}
- for(i = 0; i < reg->hr_blocks; i++)
+ /* unclean stop is only used in very bad situation */
+ for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
o2hb_shutdown_slot(®->hr_slots[i]);
mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n");
@@ -827,18 +827,24 @@
get_random_bytes(&o2hb_generation, sizeof(o2hb_generation));
}
+/* if we're already in a callback then we're already serialized by the sem */
+void o2hb_fill_node_map_from_callback(unsigned long *map, unsigned bytes)
+{
+ BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
+
+ memcpy(map, &o2hb_live_node_bitmap, bytes);
+}
+
/*
* get a map of all nodes that are heartbeating in any regions
*/
void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
{
- BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
-
/* callers want to serialize this map and callbacks so that they
* can trust that they don't miss nodes coming to the party */
down_read(&o2hb_callback_sem);
spin_lock(&o2hb_live_lock);
- memcpy(map, &o2hb_live_node_bitmap, bytes);
+ o2hb_fill_node_map_from_callback(map, bytes);
spin_unlock(&o2hb_live_lock);
up_read(&o2hb_callback_sem);
}
@@ -877,6 +883,11 @@
if (reg->hr_slots)
kfree(reg->hr_slots);
+
+ spin_lock(&o2hb_live_lock);
+ list_del(®->hr_all_item);
+ spin_unlock(&o2hb_live_lock);
+
kfree(reg);
}
@@ -903,7 +914,8 @@
if (!tmp)
return -ERANGE;
- reg->hr_dead_iter = (unsigned int)tmp;
+ /* XXX */
+ reg->hr_dead_iter = O2HB_DEAD_THRESHOLD;
return count;
}
@@ -928,12 +940,9 @@
if (!p || (*p && (*p != '\n')))
return -EINVAL;
- if (tmp > O2HB_REGION_MAX_TIMEOUT_MS ||
- tmp < O2HB_REGION_MIN_TIMEOUT_MS)
- return -ERANGE;
+ /* XXX */
+ reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
- reg->hr_timeout_ms = (unsigned int)tmp;
-
return count;
}
@@ -1412,6 +1421,10 @@
config_item_init_type_name(®->hr_item, name, &o2hb_region_type);
ret = ®->hr_item;
+
+ spin_lock(&o2hb_live_lock);
+ list_add_tail(®->hr_all_item, &o2hb_all_regions);
+ spin_unlock(&o2hb_live_lock);
out:
if (ret == NULL)
kfree(reg);
@@ -1568,6 +1581,22 @@
}
EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating);
+int o2hb_check_node_heartbeating_from_callback(u8 node_num)
+{
+ unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
+
+ o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
+ if (!test_bit(node_num, testing_map)) {
+ mlog(ML_HEARTBEAT,
+ "node (%u) does not have heartbeating enabled.\n",
+ node_num);
+ return 0;
+ }
+
+ return 1;
+}
+EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
+
/* Makes sure our local node is configured with a node number, and is
* heartbeating. */
int o2hb_check_local_node_heartbeating(void)
@@ -1584,3 +1613,39 @@
return o2hb_check_node_heartbeating(node_num);
}
EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating);
+
+/* Makes sure our local node is configured with a node number, and is
+ * heartbeating. */
+int o2hb_check_local_node_heartbeating_from_callback(void)
+{
+ u8 node_num;
+
+ /* if this node was set then we have networking */
+ node_num = o2nm_this_node();
+ if (node_num == O2NM_MAX_NODES) {
+ mlog(ML_HEARTBEAT, "this node has not been configured.\n");
+ return 0;
+ }
+
+ return o2hb_check_node_heartbeating_from_callback(node_num);
+}
+EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating_from_callback);
+
+/*
+ * this is just a hack until we get the plumbing which flips file systems
+ * read only and drops the hb ref instead of killing the node dead.
+ */
+void o2hb_stop_all_regions(void)
+{
+ struct o2hb_region *reg;
+
+ mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
+
+ spin_lock(&o2hb_live_lock);
+
+ list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
+ reg->hr_unclean_stop = 1;
+
+ spin_unlock(&o2hb_live_lock);
+}
+EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
Modified: trunk/fs/ocfs2/cluster/heartbeat.h
===================================================================
--- trunk/fs/ocfs2/cluster/heartbeat.h 2005-06-24 01:00:45 UTC (rev 2421)
+++ trunk/fs/ocfs2/cluster/heartbeat.h 2005-06-24 01:43:58 UTC (rev 2422)
@@ -29,6 +29,13 @@
#include "ocfs2_heartbeat.h"
+#define O2HB_REGION_TIMEOUT_MS 2000
+
+/* number of changes to be seen as live */
+#define O2HB_LIVE_THRESHOLD 2
+/* number of equal samples to be seen as dead */
+#define O2HB_DEAD_THRESHOLD 7
+
#define O2HB_CB_MAGIC 0x51d1e4ec
/* callback stuff */
@@ -62,8 +69,12 @@
int o2hb_unregister_callback(struct o2hb_callback_func *hc);
void o2hb_fill_node_map(unsigned long *map,
unsigned bytes);
+void o2hb_fill_node_map_from_callback(unsigned long *map, unsigned bytes);
void o2hb_init(void);
int o2hb_check_node_heartbeating(u8 node_num);
+int o2hb_check_node_heartbeating_from_callback(u8 node_num);
int o2hb_check_local_node_heartbeating(void);
+int o2hb_check_local_node_heartbeating_from_callback(void);
+void o2hb_stop_all_regions(void);
#endif /* O2CLUSTER_HEARTBEAT_H */
Modified: trunk/fs/ocfs2/cluster/masklog.c
===================================================================
--- trunk/fs/ocfs2/cluster/masklog.c 2005-06-24 01:00:45 UTC (rev 2421)
+++ trunk/fs/ocfs2/cluster/masklog.c 2005-06-24 01:43:58 UTC (rev 2422)
@@ -210,6 +210,7 @@
set_a_string(VOTE);
set_a_string(DCACHE);
set_a_string(CONN);
+ set_a_string(QUORUM);
set_a_string(ERROR);
set_a_string(NOTICE);
set_a_string(KTHREAD);
Modified: trunk/fs/ocfs2/cluster/masklog.h
===================================================================
--- trunk/fs/ocfs2/cluster/masklog.h 2005-06-24 01:00:45 UTC (rev 2421)
+++ trunk/fs/ocfs2/cluster/masklog.h 2005-06-24 01:43:58 UTC (rev 2422)
@@ -107,6 +107,7 @@
#define ML_VOTE 0x0000000001000000ULL /* ocfs2 node messaging */
#define ML_DCACHE 0x0000000002000000ULL /* ocfs2 dcache operations */
#define ML_CONN 0x0000000004000000ULL /* net connection management */
+#define ML_QUORUM 0x0000000008000000ULL /* net connection quorum */
/* bits that are infrequently given and frequently matched in the high word */
#define ML_ERROR 0x0000000100000000ULL /* sent to KERN_ERR */
#define ML_NOTICE 0x0000000200000000ULL /* setn to KERN_NOTICE */
Modified: trunk/fs/ocfs2/cluster/nodemanager.c
===================================================================
--- trunk/fs/ocfs2/cluster/nodemanager.c 2005-06-24 01:00:45 UTC (rev 2421)
+++ trunk/fs/ocfs2/cluster/nodemanager.c 2005-06-24 01:43:58 UTC (rev 2422)
@@ -393,14 +393,14 @@
/* bring up the rx thread if we're setting the new local node. */
if (tmp && !cluster->cl_has_local) {
- ret = o2net_start_listening(node->nd_ipv4_port);
+ ret = o2net_start_listening(node);
if (ret)
return ret;
}
if (!tmp && cluster->cl_has_local &&
cluster->cl_local_node == node->nd_num) {
- o2net_stop_listening();
+ o2net_stop_listening(node);
cluster->cl_local_node = 0;
}
@@ -577,7 +577,7 @@
(cluster->cl_local_node == node->nd_num)) {
cluster->cl_has_local = 0;
cluster->cl_local_node = O2NM_MAX_NODES;
- o2net_stop_listening();
+ o2net_stop_listening(node);
}
/* XXX call into net to stop this node from trading messages */
@@ -750,6 +750,8 @@
mlog_remove_proc(o2nm_proc);
o2net_proc_exit(o2nm_proc);
remove_proc_entry(O2NM_PROC_PATH, NULL);
+
+ o2net_exit();
}
static int o2nm_proc_version(char *page, char **start, off_t off,
Modified: trunk/fs/ocfs2/cluster/tcp.c
===================================================================
--- trunk/fs/ocfs2/cluster/tcp.c 2005-06-24 01:00:45 UTC (rev 2421)
+++ trunk/fs/ocfs2/cluster/tcp.c 2005-06-24 01:43:58 UTC (rev 2422)
@@ -1,4 +1,5 @@
/* -*- mode: c; c-basic-offset: 8; -*-
+ *
* vim: noexpandtab sw=8 ts=8 sts=0:
*
* Copyright (C) 2004 Oracle. All rights reserved.
@@ -50,15 +51,6 @@
* higher node number gets a heartbeat callback which indicates that the lower
* numbered node has started heartbeating. The lower numbered node is passive
* and only accepts the connection if the higher numbered node is heartbeating.
- * The connection attempts continue as long as heartbeat is active until a
- * build-time max number of failed attempts.
- *
- * XXX
- * - don't allow node rmdir if it has socket and rx thread is running
- * - disable preemt before calling rx handler when debugging
- * - find explicit stack call to drain rx queue
- * - add trivial version trading message at the start of a conn
- * - add kerneldoc for the external interface
*/
#include <linux/kernel.h>
@@ -102,16 +94,32 @@
static rwlock_t o2net_handler_lock = RW_LOCK_UNLOCKED;
static struct rb_root o2net_handler_tree = RB_ROOT;
-static struct workqueue_struct *o2net_wq;
static struct o2net_node o2net_nodes[O2NM_MAX_NODES];
/* XXX someday we'll need better accounting */
static struct socket *o2net_listen_sock = NULL;
-static struct work_struct o2net_listen_work;
+/*
+ * listen work is only queued by the listening socket callbacks on the
+ * o2net_wq. teardown detaches the callbacks before destroying the workqueue.
+ * quorum work is queued as sock containers are shutdown.. stop_listening
+ * tears down all the node's sock containers, preventing future shutdowns
+ * and queued quroum work, before canceling delayed quorum work and
+ * destroying the work queue.
+ */
+static struct workqueue_struct *o2net_wq;
+static struct work_struct o2net_listen_work, o2net_quorum_work;
+
static struct o2hb_callback_func o2net_hb_up, o2net_hb_down;
#define O2NET_HB_PRI 0x1
+static struct o2net_handshake *o2net_hand;
+
+/* these node totals include our node. I think the hb and net threads
+ * sufficiently serialize things so that these don't need locking */
+static int o2net_heartbeating_nodes = 0, o2net_connected_nodes = 0;
+static unsigned long o2net_connected_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
+
static int o2net_sys_err_translations[O2NET_ERR_MAX] =
{[O2NET_ERR_NONE] = 0,
[O2NET_ERR_NO_HNDLR] = -ENOPROTOOPT,
@@ -119,7 +127,6 @@
[O2NET_ERR_DIED] = -EHOSTDOWN,};
/* can't quite avoid *all* internal declarations :/ */
-static void o2net_start_connect(void *arg);
static void o2net_sc_connect_completed(void *arg);
static void o2net_rx_until_empty(void *arg);
static void o2net_shutdown_sc(void *arg);
@@ -148,6 +155,28 @@
return nn - o2net_nodes;
}
+static void o2net_msg_to_net(o2net_msg *m)
+{
+ m->magic = htons(m->magic);
+ m->data_len = htons(m->data_len);
+ m->msg_type = htons(m->msg_type);
+ m->sys_status = htonl(m->sys_status);
+ m->status = htonl(m->status);
+ m->key = htonl(m->key);
+ m->msg_num = htonl(m->msg_num);
+}
+
+static void o2net_msg_to_host(o2net_msg *m)
+{
+ m->magic = ntohs(m->magic);
+ m->data_len = ntohs(m->data_len);
+ m->msg_type = ntohs(m->msg_type);
+ m->sys_status = ntohl(m->sys_status);
+ m->status = ntohl(m->status);
+ m->key = ntohl(m->key);
+ m->msg_num = ntohl(m->msg_num);
+}
+
/* ------------------------------------------------------------ */
static int o2net_prep_nsw(struct o2net_node *nn, struct o2net_status_wait *nsw)
@@ -276,6 +305,11 @@
sclog(sc, "put\n");
kref_put(&sc->sc_kref, sc_kref_release);
}
+static void sc_get(struct o2net_sock_container *sc)
+{
+ sclog(sc, "get\n");
+ kref_get(&sc->sc_kref);
+}
static struct o2net_sock_container *sc_alloc(struct o2nm_node *node)
{
struct o2net_sock_container *sc, *ret = NULL;
@@ -312,15 +346,33 @@
/* ------------------------------------------------------------ */
-/* make sure the work gets a ref to drop */
static void o2net_sc_queue_work(struct o2net_sock_container *sc,
struct work_struct *work)
{
- kref_get(&sc->sc_kref);
+ sc_get(sc);
if (!queue_work(o2net_wq, work))
sc_put(sc);
}
+static void o2net_mod_connected_nodes(u8 node, int delta)
+{
+ BUG_ON(delta == 0);
+ if (delta < 0) {
+ mlog_bug_on_msg(o2net_connected_nodes == 0,
+ "node %u delta %d\n", node, delta);
+ o2net_connected_nodes--;
+ clear_bit(node, o2net_connected_bitmap);
+ } else {
+ mlog_bug_on_msg(o2net_connected_nodes == O2NM_MAX_NODES,
+ "node %u delta %d\n", node, delta);
+ o2net_connected_nodes++;
+ set_bit(node, o2net_connected_bitmap);
+ }
+
+ mlog(ML_QUORUM, "node %u %sconnected, now %u connected\n", node,
+ delta < 0 ? "dis" : "", o2net_connected_nodes);
+}
+
static void o2net_set_nn_state(struct o2net_node *nn,
struct o2net_sock_container *sc,
unsigned valid, int err)
@@ -336,6 +388,11 @@
mlog_bug_on_msg(err && valid, "err %d valid %u\n", err, valid);
mlog_bug_on_msg(valid && !sc, "valid %u sc %p\n", valid, sc);
+ /* we won't reconnect after our valid conn goes away for
+ * this hb iteration.. here so it shows up in the logs */
+ if (was_valid && !valid && err == 0)
+ err = -ENOTCONN;
+
mlog(ML_CONN, "node %u sc: %p -> %p, valid %u -> %u, err %d -> %d\n",
o2net_num_from_nn(nn), nn->nn_sc, sc, nn->nn_sc_valid, valid,
nn->nn_persistent_error, err);
@@ -349,7 +406,12 @@
wake_up(&nn->nn_sc_wq);
if (was_valid && !valid) {
- mlog(ML_NOTICE, "ocfs2:tcp: no longer connected to "
+ o2net_mod_connected_nodes(old_sc->sc_node->nd_num, -1);
+ if (o2net_wq)
+ queue_delayed_work(o2net_wq, &o2net_quorum_work,
+ msecs_to_jiffies(O2NET_QUORUM_DELAY_MS));
+
+ mlog(ML_NOTICE, "no longer connected to "
"node %s at %u.%u.%u.%u:%d\n",
old_sc->sc_node->nd_name,
NIPQUAD(old_sc->sc_node->nd_ipv4_address),
@@ -357,26 +419,42 @@
o2net_complete_nodes_nsw(nn);
}
- if (!was_valid && valid)
- mlog(ML_NOTICE, "ocfs2:tcp: connected to node %s at "
- "%u.%u.%u.%u:%d\n", sc->sc_node->nd_name,
+ if (!was_valid && valid) {
+ /* this is a bit of a hack. we only try reconnecting
+ * when heartbeating starts until we get a connection.
+ * if that connection then dies we don't try reconnecting.
+ * the only way to start connecting again is to down
+ * heartbeat and bring it back up. */
+ cancel_delayed_work(&nn->nn_connect_expired);
+ o2net_mod_connected_nodes(sc->sc_node->nd_num, 1);
+ mlog(ML_NOTICE, "%s node %s num %u at %u.%u.%u.%u:%d\n",
+ o2nm_this_node() > sc->sc_node->nd_num ?
+ "connected to" : "accepted connection from",
+ sc->sc_node->nd_name, sc->sc_node->nd_num,
NIPQUAD(sc->sc_node->nd_ipv4_address),
ntohs(sc->sc_node->nd_ipv4_port));
+ }
/* trigger the connecting worker func as long as we're not valid,
* it will back off if it shouldn't connect. This can be called
* from node config teardown and so needs to be careful about
* the work queue actually being up. */
if (!valid && o2net_wq) {
- mlog(ML_CONN, "queueing conn with %u attempts\n",
- nn->nn_connect_attempts);
- queue_delayed_work(o2net_wq, &nn->nn_connect_work,
- nn->nn_connect_attempts * 2 * HZ);
+ unsigned long delay;
+ /* delay if we're withing a RECONNECT_DELAY of the
+ * last attempt */
+ delay = (nn->nn_last_connect_attempt +
+ msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS))
+ - jiffies;
+ if (delay > msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS))
+ delay = 0;
+ mlog(ML_CONN, "queueing conn attempt in %lu jiffies\n", delay);
+ queue_delayed_work(o2net_wq, &nn->nn_connect_work, delay);
}
/* keep track of the nn's sc ref for the caller */
if ((old_sc == NULL) && sc)
- kref_get(&sc->sc_kref);
+ sc_get(sc);
if (old_sc && (old_sc != sc)) {
o2net_sc_queue_work(old_sc, &old_sc->sc_shutdown_work);
sc_put(old_sc);
@@ -420,6 +498,7 @@
state_change = sc->sc_state_change;
switch(sk->sk_state) {
+ /* ignore connecting sockets as they make progress */
case TCP_SYN_SENT:
case TCP_SYN_RECV:
break;
@@ -427,22 +506,29 @@
o2net_sc_queue_work(sc, &sc->sc_connect_work);
break;
default:
+ if (sk->sk_err == ETIMEDOUT)
+ mlog(ML_NOTICE, "connection to node %s num %u "
+ "at %u.%u.%u.%u:%d has been idle for 10 "
+ "seconds, shutting it down.\n",
+ sc->sc_node->nd_name,
+ sc->sc_node->nd_num,
+ NIPQUAD(sc->sc_node->nd_ipv4_address),
+ ntohs(sc->sc_node->nd_ipv4_port));
o2net_sc_queue_work(sc, &sc->sc_shutdown_work);
break;
}
out:
read_unlock(&sk->sk_callback_lock);
state_change(sk);
-}
+}
-
/*
* we register callbacks so we can queue work on events before calling
* the original callbacks. our callbacks our careful to test user_data
- * to discover when they've reaced with sk_unregister_callbacks().
+ * to discover when they've reaced with o2net_unregister_callbacks().
*/
-static void sk_register_callbacks(struct sock *sk,
- struct o2net_sock_container *sc)
+static void o2net_register_callbacks(struct sock *sk,
+ struct o2net_sock_container *sc)
{
write_lock_bh(&sk->sk_callback_lock);
@@ -454,7 +540,7 @@
BUG_ON(sk->sk_user_data != NULL);
sk->sk_user_data = sc;
- kref_get(&sc->sc_kref);
+ sc_get(sc);
sc->sc_data_ready = sk->sk_data_ready;
sc->sc_state_change = sk->sk_state_change;
@@ -464,7 +550,7 @@
write_unlock_bh(&sk->sk_callback_lock);
}
-static int sk_unregister_callbacks(struct sock *sk,
+static int o2net_unregister_callbacks(struct sock *sk,
struct o2net_sock_container *sc)
{
int ret = 0;
@@ -484,14 +570,16 @@
/*
* this is a little helper that is called by callers who have seen a problem
* with an sc and want to detach it from the nn if someone already hasn't beat
- * them to it..
+ * them to it. if an error is given then the shutdown will be persistent
+ * and pending transmits will be canceled.
*/
static void o2net_ensure_shutdown(struct o2net_node *nn,
- struct o2net_sock_container *sc)
+ struct o2net_sock_container *sc,
+ int err)
{
spin_lock(&nn->nn_lock);
if (nn->nn_sc == sc)
- o2net_set_nn_state(nn, NULL, 0, 0);
+ o2net_set_nn_state(nn, NULL, 0, err);
spin_unlock(&nn->nn_lock);
}
@@ -511,14 +599,17 @@
sclog(sc, "shutting down\n");
/* drop the callbacks ref and call shutdown only once */
- if (sk_unregister_callbacks(sc->sc_sock->sk, sc)) {
- flush_workqueue(o2net_wq);
+ if (o2net_unregister_callbacks(sc->sc_sock->sk, sc)) {
+ /* we shouldn't flush as we're in the thread, the
+ * races with pending sc work structs are harmless */
sc_put(sc);
sc->sc_sock->ops->shutdown(sc->sc_sock,
RCV_SHUTDOWN|SEND_SHUTDOWN);
}
- o2net_ensure_shutdown(nn, sc);
+ /* not fatal so failed connects before the other guy has our
+ * heartbeat can be retried */
+ o2net_ensure_shutdown(nn, sc, 0);
sc_put(sc);
}
@@ -996,12 +1087,51 @@
return ret;
}
+static int o2net_check_handshake(struct o2net_sock_container *sc)
+{
+ struct o2net_handshake *hand = page_address(sc->sc_page);
+ struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
+
+ hand->protocol_version = be64_to_cpu(hand->protocol_version);
+ hand->connector_id = be64_to_cpu(hand->connector_id);
+
+ if (hand->protocol_version != O2NET_PROTOCOL_VERSION) {
+ mlog(ML_NOTICE, "node %s at %u.%u.%u.%u:%u advertised "
+ "net protocol version %llu but %llu is required,"
+ "disconnecting\n", sc->sc_node->nd_name,
+ NIPQUAD(sc->sc_node->nd_ipv4_address),
+ ntohs(sc->sc_node->nd_ipv4_port),
+ (unsigned long long)hand->protocol_version,
+ O2NET_PROTOCOL_VERSION);
+
+ /* don't bother reconnecting if its the wrong version. */
+ o2net_ensure_shutdown(nn, sc, -ENOTCONN);
+ return -1;
+ }
+
+ sc->sc_handshake_ok = 1;
+
+ spin_lock(&nn->nn_lock);
+ /* set valid if it hasn't been shutdown already.. */
+ if (nn->nn_sc == sc)
+ o2net_set_nn_state(nn, sc, 1, 0);
+ spin_unlock(&nn->nn_lock);
+
+ /* shift everything up as though it wasn't there */
+ sc->sc_page_off -= sizeof(struct o2net_handshake);
+ if (sc->sc_page_off)
+ memmove(hand, hand + 1, sc->sc_page_off);
+
+ return 0;
+}
+
/* this demuxes the queued rx bytes into header or payload bits and calls
- * handlers as each full message is read off the socket */
+ * handlers as each full message is read off the socket. it returns -error,
+ * == 0 eof, or > 0 for progress made.*/
static int o2net_advance_rx(struct o2net_sock_container *sc)
{
o2net_msg *hdr;
- int ret = 0, made_progress = 0;
+ int ret = 0;
void *data;
size_t datalen;
@@ -1013,8 +1143,16 @@
datalen = sizeof(o2net_msg) - sc->sc_page_off;
ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
if (ret > 0) {
- made_progress = 1;
sc->sc_page_off += ret;
+
+ /* this working relies on the handshake being
+ * smaller than the normal message header */
+ if (sc->sc_page_off >= sizeof(struct o2net_handshake)&&
+ !sc->sc_handshake_ok && o2net_check_handshake(sc)) {
+ ret = -EPROTO;
+ goto out;
+ }
+
/* only swab incoming here.. we can
* only get here once as we cross from
* being under to over */
@@ -1025,7 +1163,7 @@
ret = -EOVERFLOW;
}
}
- if (ret < 0)
+ if (ret <= 0)
goto out;
}
@@ -1046,32 +1184,30 @@
datalen = (sizeof(o2net_msg) + hdr->data_len) -
sc->sc_page_off;
ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
- if (ret > 0) {
- made_progress = 1;
+ if (ret > 0)
sc->sc_page_off += ret;
- }
- if (ret < 0)
+ if (ret <= 0)
goto out;
}
if (sc->sc_page_off - sizeof(o2net_msg) == hdr->data_len) {
- /* whooo peee, we have a full message */
- /* after calling this the message is toast */
+ /* we can only get here once, the first time we read
+ * the payload.. so set ret to progress if the handler
+ * works out. after calling this the message is toast */
ret = o2net_process_message(sc, hdr);
+ if (ret == 0)
+ ret = 1;
sc->sc_page_off = 0;
}
out:
- sclog(sc, "finished receiving: progress %d ret %d\n", made_progress,
- ret);
- if (made_progress)
- ret = made_progress;
+ sclog(sc, "ret = %d\n", ret);
return ret;
}
-/* this work func is triggerd by data ready. it reads until it can
- * read no more. if data_ready hits while we're doing our work the work
- * struct will be marked and we'll be called again. */
+/* this work func is triggerd by data ready. it reads until it can read no
+ * more. it interprets 0, eof, as fatal. if data_ready hits while we're doing
+ * our work the work struct will be marked and we'll be called again. */
static void o2net_rx_until_empty(void *arg)
{
struct o2net_sock_container *sc = arg;
@@ -1081,47 +1217,79 @@
ret = o2net_advance_rx(sc);
} while (ret > 0);
- if (ret && ret != -EAGAIN) {
+ if (ret <= 0 && ret != -EAGAIN) {
struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
sclog(sc, "saw error %d, closing\n", ret);
- o2net_ensure_shutdown(nn, sc);
+ /* not permanent so read failed handshake can retry */
+ o2net_ensure_shutdown(nn, sc, 0);
}
sc_put(sc);
}
-int o2net_set_nodelay(struct socket *sock)
+int o2net_set_options(struct socket *sock)
{
- int ret, opt = 1;
+ int ret, i;
mm_segment_t oldfs;
+ static struct optpairs {
+ int opt, val;
+ } pairs[] = {
+ {TCP_NODELAY, 1},
+ {TCP_KEEPCNT, O2NET_KEEPCNT},
+ {TCP_KEEPIDLE, O2NET_KEEPIDLE},
+ {TCP_KEEPINTVL, O2NET_KEEPINTVL},
+ };
oldfs = get_fs();
set_fs(KERNEL_DS);
- ret = sock->ops->setsockopt(sock, SOL_TCP, TCP_NODELAY,
- (char __user *)&opt, sizeof(opt));
+
+ i = 1;
+ /* SOL_SOCKET is magical */
+ ret = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
+ (char __user *)&i, sizeof(i));
+
+ for (i = 0; ret == 0 && i < ARRAY_SIZE(pairs); i++) {
+ ret = sock->ops->setsockopt(sock, SOL_TCP, pairs[i].opt,
+ (char __user *)&pairs[i].val,
+ sizeof(pairs[i].val));
+ if (ret)
+ break;
+ }
+
set_fs(oldfs);
-
return ret;
}
/* ------------------------------------------------------------ */
-/* state_change saw this sc go into _ESTABLISHED. */
-static void o2net_sc_connect_completed(void *arg)
+/* called when a connect completes and after a sock is accepted. the
+ * rx path will see the response and mark the sc valid */
+static void o2net_sc_send_handshake(struct o2net_sock_container *sc)
{
- struct o2net_sock_container *sc = arg;
struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
+ ssize_t ret;
- mlog(ML_CONN, "sc %p for node %s connected\n", sc,
- sc->sc_node->nd_name);
+ mlog(ML_MSG, "sc sending handshake with ver %llu id %llx\n",
+ (unsigned long long)O2NET_PROTOCOL_VERSION,
+ (unsigned long long)o2net_hand->connector_id);
- spin_lock(&nn->nn_lock);
- /* set valid if it hasn't been shutdown already.. */
- if (nn->nn_sc == sc) {
- nn->nn_connect_attempts = 0;
- o2net_set_nn_state(nn, sc, 1, 0);
+ ret = sc->sc_sock->ops->sendpage(sc->sc_sock,
+ virt_to_page(o2net_hand),
+ (long)o2net_hand & ~PAGE_MASK,
+ sizeof(*o2net_hand), MSG_DONTWAIT);
+ if (ret != sizeof(*o2net_hand)) {
+ if (ret >= 0)
+ ret = -EBADE;
+ mlog(ML_CONN, "sendpage failed with %zu\n", ret);
+ o2net_ensure_shutdown(nn, sc, 0);
}
- spin_unlock(&nn->nn_lock);
+}
+
+static void o2net_sc_connect_completed(void *arg)
+{
+ struct o2net_sock_container *sc = arg;
+
+ o2net_sc_send_handshake(sc);
sc_put(sc);
}
@@ -1154,21 +1322,12 @@
/* see if we already have one pending or have given up */
if (nn->nn_sc || nn->nn_persistent_error)
arg = NULL;
- else if (nn->nn_connect_attempts == O2NET_MAX_CONNECT_ATTEMPTS) {
- mlog(ML_NOTICE, "%u connections to node %u [%s] have failed "
- "since it started heartbeating. No more connections "
- "will be initiated. Network communication attempts will "
- "return errors.\n", nn->nn_connect_attempts,
- node->nd_num, node->nd_name);
-
- o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
- arg = NULL;
- } else
- nn->nn_connect_attempts++;
spin_unlock(&nn->nn_lock);
if (arg == NULL) /* *shrug*, needed some indicator */
goto out;
+ nn->nn_last_connect_attempt = jiffies;
+
sc = sc_alloc(node);
if (sc == NULL) {
mlog(0, "couldn't allocate sc\n");
@@ -1193,18 +1352,18 @@
goto out;
}
- o2net_set_nodelay(sc->sc_sock);
- sk_register_callbacks(sc->sc_sock->sk, sc);
+ o2net_set_options(sc->sc_sock);
+ o2net_register_callbacks(sc->sc_sock->sk, sc);
spin_lock(&nn->nn_lock);
- /* connect completion will set nn->nn_sc_valid */
+ /* handshake completion will set nn->nn_sc_valid */
o2net_set_nn_state(nn, sc, 0, 0);
spin_unlock(&nn->nn_lock);
remoteaddr.sin_family = AF_INET;
remoteaddr.sin_addr.s_addr = node->nd_ipv4_address;
remoteaddr.sin_port = node->nd_ipv4_port;
-
+
ret = sc->sc_sock->ops->connect(sc->sc_sock,
(struct sockaddr *)&remoteaddr,
sizeof(remoteaddr),
@@ -1213,11 +1372,16 @@
ret = 0;
out:
- mlog(ML_CONN, "finished with %d\n", ret);
if (ret) {
- /* XXX log? */
+ mlog(ML_NOTICE, "connect attempt to node %s num %u at "
+ "%u.%u.%u.%u:%d failed with errno %d\n",
+ sc->sc_node->nd_name, sc->sc_node->nd_num,
+ NIPQUAD(sc->sc_node->nd_ipv4_address),
+ ntohs(sc->sc_node->nd_ipv4_port), ret);
+ /* 0 err so that another will be queued and attempted
+ * from set_nn_state */
if (sc)
- o2net_ensure_shutdown(nn, sc);
+ o2net_ensure_shutdown(nn, sc, 0);
}
if (sc)
sc_put(sc);
@@ -1227,27 +1391,130 @@
return;
}
+static void o2net_connect_expired(void *arg)
+{
+ struct o2net_node *nn = arg;
+
+ spin_lock(&nn->nn_lock);
+ if (!nn->nn_sc_valid) {
+ mlog(ML_ERROR, "no connection established with node %u after "
+ "%u seconds, giving up and returning errors.\n",
+ o2net_num_from_nn(nn), O2NET_CONN_IDLE_SECS);
+
+ o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
+ }
+ spin_unlock(&nn->nn_lock);
+}
+
/* ------------------------------------------------------------ */
+/* this work func is queued whenever a disconnection makes it such that
+ * we have half or fewer nodes connected than we do heartbeating. if by
+ * the time we get here we still have that imbalance we decide that we
+ * cannot reach the majority of the cluster and cut ourselves off. this
+ * work is delayed so that heartbeat has a chance to notice dead nodes
+ * and account for them in the heartbeating count before getting here */
+static void o2net_check_quorum(void *arg)
+{
+ int quorum;
+ unsigned long heartbeating[BITS_TO_LONGS(O2NM_MAX_NODES)];
+ int lowest_hb, lowest_reachable = 0, fence = 0;
+
+ /* usually _callback would be used from a hb callback to avoid
+ * deadlocking on the hb_callback_sem. we might as well be
+ * in a callback sem. our hb callback func will flush the work
+ * queue until we're finished and it holds the callback sem.. */
+ o2hb_fill_node_map_from_callback(heartbeating, sizeof(heartbeating));
+ lowest_hb = find_first_bit(heartbeating, O2NM_MAX_NODES);
+ if (lowest_hb != O2NM_MAX_NODES)
+ lowest_reachable = test_bit(lowest_hb, o2net_connected_bitmap);
+
+ mlog(ML_QUORUM, "heartbeating: %u, connected: %u, "
+ "lowest: %u (%sreachable)\n", o2net_heartbeating_nodes,
+ o2net_connected_nodes, lowest_hb,
+ lowest_reachable ? "" : "un");
+
+ if (!o2hb_check_local_node_heartbeating_from_callback() ||
+ o2net_heartbeating_nodes == 1)
+ goto out;
+
+ if (o2net_heartbeating_nodes & 1) {
+ /* the odd numbered cluster case is straight forward --
+ * if we can't talk to the majority we're hosed */
+ quorum = (o2net_heartbeating_nodes + 1)/2;
+ if (o2net_connected_nodes < quorum) {
+ mlog(ML_ERROR, "fencing this node because it is "
+ "only connected to %u nodes and %u is needed "
+ "to make a quorum out of %u heartbeating nodes\n",
+ o2net_connected_nodes, quorum,
+ o2net_heartbeating_nodes);
+ fence = 1;
+ }
+ } else {
+ /* the even numbered cluster adds the possibility of each half
+ * of the cluster being able to talk amongst themselves.. in
+ * that case we're hosed if we can't talk to the group that has
+ * the lowest numbered node */
+ quorum = o2net_heartbeating_nodes / 2;
+ if (o2net_connected_nodes < quorum) {
+ mlog(ML_ERROR, "fencing this node because it is "
+ "only connected to %u nodes and %u is needed "
+ "to make a quorum out of %u heartbeating nodes\n",
+ o2net_connected_nodes, quorum,
+ o2net_heartbeating_nodes);
+ }
+ else if ((o2net_connected_nodes == quorum) &&
+ !lowest_reachable) {
+ mlog(ML_ERROR, "fencing this node because it is "
+ "connected to a half-quorum of %u out of %u "
+ "nodes which doesn't include the lowest active "
+ "node %u\n", quorum, o2net_heartbeating_nodes,
+ lowest_hb);
+ fence = 1;
+ }
+ }
+
+out:
+ /* this is horribly heavy-handed. It should instead flip the file
+ * system RO and call some userspace script */
+ if (fence) {
+ /* panic spins with interrupts enabled. with preempt
+ * threads can still schedule, etc, etc */
+ o2hb_stop_all_regions();
+ panic("ocfs2 is very sorry to be fencing this system by "
+ "panicing\n");
+ }
+}
+
void o2net_disconnect_node(struct o2nm_node *node)
{
struct o2net_node *nn = o2net_nn_from_num(node->nd_num);
- /* don't bother trying to connect until it's heartbeating again */
+ /* don't reconnecting until it's heartbeating again */
spin_lock(&nn->nn_lock);
- nn->nn_connect_attempts = O2NET_MAX_CONNECT_ATTEMPTS;
- o2net_set_nn_state(nn, NULL, 0, -EINVAL);
- cancel_delayed_work(&nn->nn_connect_work);
+ o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
spin_unlock(&nn->nn_lock);
- if (o2net_wq)
+ if (o2net_wq) {
+ cancel_delayed_work(&nn->nn_connect_work);
flush_workqueue(o2net_wq);
+ }
}
static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num,
void *data)
{
- o2net_disconnect_node(node);
+ struct o2net_node *nn = o2net_nn_from_num(node_num);
+
+ o2net_heartbeating_nodes--;
+ mlog(ML_QUORUM, "node %u, now %u heartbeating\n", node_num,
+ o2net_heartbeating_nodes);
+
+ if (node_num != o2nm_this_node()) {
+ cancel_delayed_work(&nn->nn_connect_expired);
+ o2net_disconnect_node(node);
+ } else
+ cancel_delayed_work(&o2net_quorum_work);
}
static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
@@ -1255,11 +1522,30 @@
{
struct o2net_node *nn = o2net_nn_from_num(node_num);
- /* ok, we can try to connect again */
- spin_lock(&nn->nn_lock);
- nn->nn_connect_attempts = 0;
- o2net_set_nn_state(nn, NULL, 0, 0);
- spin_unlock(&nn->nn_lock);
+ o2net_heartbeating_nodes++;
+ mlog(ML_QUORUM, "node %u, now %u heartbeating\n", node_num,
+ o2net_heartbeating_nodes);
+
+ /* ensure an immediate connect attempt */
+ nn->nn_last_connect_attempt = jiffies -
+ (msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS) + 1);
+
+ if (node_num != o2nm_this_node()) {
+ /* heartbeat doesn't work unless a local node number is
+ * configured and doing so brings up the o2net_wq, so we can
+ * use it.. */
+ queue_delayed_work(o2net_wq, &nn->nn_connect_expired,
+ O2NET_CONN_IDLE_SECS * HZ);
+
+ /* believe it or not, accept and node hearbeating testing
+ * can succeed for this node before we got here.. so
+ * only use set_nn_state to clear the persistent error
+ * if that hasn't already happened */
+ spin_lock(&nn->nn_lock);
+ if (nn->nn_persistent_error)
+ o2net_set_nn_state(nn, NULL, 0, 0);
+ spin_unlock(&nn->nn_lock);
+ }
}
void o2net_unregister_hb_callbacks(void)
@@ -1335,7 +1621,7 @@
if (ret < 0)
goto out;
- o2net_set_nodelay(new_sock);
+ o2net_set_options(new_sock);
slen = sizeof(sin);
ret = new_sock->ops->getname(new_sock, (struct sockaddr *) &sin,
@@ -1361,8 +1647,10 @@
goto out;
}
- if (!o2hb_check_node_heartbeating(node->nd_num)) {
- mlog(ML_NOTICE, "attempt to connect from node '%s' at "
+ /* this happens all the time when the other node sees our heartbeat
+ * and tries to connect before we see their heartbeat */
+ if (!o2hb_check_node_heartbeating_from_callback(node->nd_num)) {
+ mlog(ML_CONN, "attempt to connect from node '%s' at "
"%u.%u.%u.%u:%d but it isn't heartbeating\n",
node->nd_name, NIPQUAD(sin.sin_addr.s_addr),
ntohs(sin.sin_port));
@@ -1396,12 +1684,14 @@
new_sock = NULL;
spin_lock(&nn->nn_lock);
- o2net_set_nn_state(nn, sc, 1, 0);
+ o2net_set_nn_state(nn, sc, 0, 0);
spin_unlock(&nn->nn_lock);
- sk_register_callbacks(sc->sc_sock->sk, sc);
+ o2net_register_callbacks(sc->sc_sock->sk, sc);
o2net_sc_queue_work(sc, &sc->sc_rx_work);
+ o2net_sc_send_handshake(sc);
+
out:
if (new_sock)
sock_release(new_sock);
@@ -1436,7 +1726,7 @@
* socket */
if (sk->sk_state == TCP_LISTEN) {
mlog(ML_TCP, "bytes: %d\n", bytes);
- schedule_work(&o2net_listen_work);
+ queue_work(o2net_wq, &o2net_listen_work);
}
out:
@@ -1494,9 +1784,11 @@
/*
* called from node manager when we should bring up our network listening
* socket. node manager handles all the serialization to only call this
- * once and to match it with o2net_stop_listening().
+ * once and to match it with o2net_stop_listening(). note,
+ * o2nm_this_node() doesn't work yet as we're being called while it
+ * is being set up.
*/
-int o2net_start_listening(u16 port)
+int o2net_start_listening(struct o2nm_node *node)
{
int ret = 0;
@@ -1510,15 +1802,19 @@
return -ENOMEM; /* ? */
}
- ret = o2net_open_listening_sock(port);
+ ret = o2net_open_listening_sock(node->nd_ipv4_port);
if (ret) {
destroy_workqueue(o2net_wq);
o2net_wq = NULL;
- }
+ } else
+ o2net_mod_connected_nodes(node->nd_num, 1);
+
return ret;
}
-void o2net_stop_listening(void)
+/* again, o2nm_this_node() doesn't work here as we're involved in
+ * tearing it down */
+void o2net_stop_listening(struct o2nm_node *node)
{
struct socket *sock = o2net_listen_sock;
size_t i;
@@ -1542,26 +1838,39 @@
/* finish all work and tear down the work queue */
mlog(ML_KTHREAD, "waiting for o2net thread to exit....\n");
+ cancel_delayed_work(&o2net_quorum_work);
destroy_workqueue(o2net_wq);
o2net_wq = NULL;
sock_release(o2net_listen_sock);
o2net_listen_sock = NULL;
+
+ o2net_mod_connected_nodes(node->nd_num, -1);
}
/* ------------------------------------------------------------ */
-int __init o2net_init(void)
+int o2net_init(void)
{
unsigned long i;
+ o2net_hand = kcalloc(1, sizeof(struct o2net_handshake), GFP_KERNEL);
+ if (o2net_hand == NULL)
+ return -ENOMEM;
+
+ o2net_hand->protocol_version = cpu_to_be64(O2NET_PROTOCOL_VERSION);
+ o2net_hand->connector_id = cpu_to_be64(1);
+
+ INIT_WORK(&o2net_quorum_work, o2net_check_quorum, NULL);
+
for (i = 0; i < ARRAY_SIZE(o2net_nodes); i++) {
struct o2net_node *nn = o2net_nn_from_num(i);
spin_lock_init(&nn->nn_lock);
INIT_WORK(&nn->nn_connect_work, o2net_start_connect, nn);
+ INIT_WORK(&nn->nn_connect_expired, o2net_connect_expired, nn);
/* until we see hb from a node we'll return einval */
- nn->nn_persistent_error = -EINVAL;
+ nn->nn_persistent_error = -ENOTCONN;
init_waitqueue_head(&nn->nn_sc_wq);
idr_init(&nn->nn_status_idr);
INIT_LIST_HEAD(&nn->nn_status_list);
@@ -1569,3 +1878,8 @@
return 0;
}
+
+void o2net_exit(void)
+{
+ kfree(o2net_hand);
+}
Modified: trunk/fs/ocfs2/cluster/tcp.h
===================================================================
--- trunk/fs/ocfs2/cluster/tcp.h 2005-06-24 01:00:45 UTC (rev 2421)
+++ trunk/fs/ocfs2/cluster/tcp.h 2005-06-24 01:43:58 UTC (rev 2422)
@@ -37,14 +37,6 @@
#include <linux/inet.h>
#include <linux/in.h>
-/*
- * FIXME: no need for nodemanager.h except for
- * NM_MAX_NAME_LEN...shouldn't that be something or somewhere else?
- */
-#include "nodemanager.h"
-
-#define O2NET_MSG_MAGIC ((u16)0xfa55)
-#define O2NET_MSG_STATUS_MAGIC ((u16)0xfa56)
typedef struct _o2net_msg
{
__u16 magic;
@@ -58,36 +50,10 @@
__u8 buf[0];
} o2net_msg;
-static inline void o2net_msg_to_net(o2net_msg *m)
-{
- m->magic = htons(m->magic);
- m->data_len = htons(m->data_len);
- m->msg_type = htons(m->msg_type);
- m->sys_status = htonl(m->sys_status);
- m->status = htonl(m->status);
- m->key = htonl(m->key);
- m->msg_num = htonl(m->msg_num);
-}
-
-static inline void o2net_msg_to_host(o2net_msg *m)
-{
- m->magic = ntohs(m->magic);
- m->data_len = ntohs(m->data_len);
- m->msg_type = ntohs(m->msg_type);
- m->sys_status = ntohl(m->sys_status);
- m->status = ntohl(m->status);
- m->key = ntohl(m->key);
- m->msg_num = ntohl(m->msg_num);
-}
-
typedef int (o2net_msg_handler_func)(o2net_msg *msg, u32 len, void *data);
#define O2NET_MAX_PAYLOAD_BYTES (4096 - sizeof(o2net_msg))
-/* RESERVED */
-#define O2NET_ALREADY_CONNECTED (0xfff0)
-#define O2NET_UNKNOWN_HOST (0xfff1)
-
/* TODO: figure this out.... */
static inline int o2net_link_down(int err, struct socket *sock)
{
@@ -132,13 +98,15 @@
struct list_head *unreg_list);
void o2net_unregister_handler_list(struct list_head *list);
+struct o2nm_node;
int o2net_register_hb_callbacks(void);
void o2net_unregister_hb_callbacks(void);
-int o2net_start_listening(u16 port);
-void o2net_stop_listening(void);
+int o2net_start_listening(struct o2nm_node *node);
+void o2net_stop_listening(struct o2nm_node *node);
void o2net_disconnect_node(struct o2nm_node *node);
-int __init o2net_init(void);
+int o2net_init(void);
+void o2net_exit(void);
int o2net_proc_init(struct proc_dir_entry *parent);
void o2net_proc_exit(struct proc_dir_entry *parent);
Modified: trunk/fs/ocfs2/cluster/tcp_internal.h
===================================================================
--- trunk/fs/ocfs2/cluster/tcp_internal.h 2005-06-24 01:00:45 UTC (rev 2421)
+++ trunk/fs/ocfs2/cluster/tcp_internal.h 2005-06-24 01:43:58 UTC (rev 2422)
@@ -24,6 +24,31 @@
#define O2NET_MAX_CONNECT_ATTEMPTS 5
+#define O2NET_MSG_MAGIC ((u16)0xfa55)
+#define O2NET_MSG_STATUS_MAGIC ((u16)0xfa56)
+
+/* same as hb delay, we're waiting for another node to recognize our hb */
+#define O2NET_RECONNECT_DELAY_MS O2HB_REGION_TIMEOUT_MS
+
+/* we're delaying our quorum decision so that heartbeat will have timed
+ * out truly dead nodes by the time we come around to making decisions
+ * on their number */
+#define O2NET_QUORUM_DELAY_MS ((O2HB_DEAD_THRESHOLD + 2) * O2HB_REGION_TIMEOUT_MS)
+/* send 5 keepalives every 1 second after 5 seconds of idle. this
+ * is *so short* because we have to wait for quorum then wait for hb
+ * timeouts again and have recovery complete all within 45 seconds */
+#define O2NET_KEEPCNT (5)
+#define O2NET_KEEPIDLE (5)
+#define O2NET_KEEPINTVL (1)
+#define O2NET_CONN_IDLE_SECS (O2NET_KEEPIDLE + \
+ (O2NET_KEEPCNT * O2NET_KEEPINTVL))
+
+#define O2NET_PROTOCOL_VERSION 1ULL
+struct o2net_handshake {
+ u64 protocol_version;
+ u64 connector_id;
+};
+
struct o2net_node {
/* this is never called from int/bh */
spinlock_t nn_lock;
@@ -35,7 +60,6 @@
/* if this is set tx just returns it */
int nn_persistent_error;
-
/* threads waiting for an sc to arrive wait on the wq for generation
* to increase. it is increased when a connecting socket succeeds
* or fails or when an accepted socket is attached. */
@@ -44,8 +68,20 @@
struct idr nn_status_idr;
struct list_head nn_status_list;
+ /* connects are attempted from when heartbeat comes up until either hb
+ * goes down, the node is unconfigured, no connect attempts succeed
+ * before O2NET_CONN_IDLE_DELAY, or a connect succeeds. connect_work
+ * is queued from set_nn_state both from hb up and from itself if a
+ * connect attempt fails and so can be self-arming. shutdown is
+ * careful to first mark the nn such that no connects will be attempted
+ * before canceling delayed connect work and flushing the queue. */
struct work_struct nn_connect_work;
- unsigned nn_connect_attempts;
+ unsigned long nn_last_connect_attempt;
+
+ /* this is queued as nodes come up and is canceled when a connection is
+ * established. this expiring gives up on the node and errors out
+ * transmits */
+ struct work_struct nn_connect_expired;
};
struct o2net_sock_container {
@@ -54,10 +90,30 @@
struct socket *sc_sock;
struct o2nm_node *sc_node;
+ /* all of these sc work structs hold refs on the sc while they are
+ * queued. they should not be able to ref a freed sc. the teardown
+ * race is with o2net_wq destruction in o2net_stop_listening() */
+
+ /* rx and connect work are generated from socket callbacks. sc
+ * shutdown removes the callbacks and then flushes the work queue */
struct work_struct sc_rx_work;
+ struct work_struct sc_connect_work;
+ /* shutdown work is triggered in two ways. the simple way is
+ * for a code path calls ensure_shutdown which gets a lock, removes
+ * the sc from the nn, and queues the work. in this case the
+ * work is single-shot. the work is also queued from a sock
+ * callback, though, and in this case the work will find the sc
+ * still on the nn and will call ensure_shutdown itself.. this
+ * ends up triggering the shutdown work again, though nothing
+ * will be done in that second iteration. so work queue teardown
+ * has to be careful to remove the sc from the nn before waiting
+ * on the work queue so that the shutdown work doesn't remove the
+ * sc and rearm itself.
+ */
struct work_struct sc_shutdown_work;
- struct work_struct sc_connect_work;
+ unsigned sc_handshake_ok:1;
+
struct page *sc_page;
size_t sc_page_off;
Modified: trunk/fs/ocfs2/dlm/dlmdomain.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmdomain.c 2005-06-24 01:00:45 UTC (rev 2421)
+++ trunk/fs/ocfs2/dlm/dlmdomain.c 2005-06-24 01:43:58 UTC (rev 2422)
@@ -752,7 +752,7 @@
status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
sizeof(join_msg), node, &retval);
- if (status < 0 && status != -ENOPROTOOPT && status != -ENOTCONN) {
+ if (status < 0 && status != -ENOPROTOOPT) {
mlog_errno(status);
goto bail;
}
@@ -760,12 +760,8 @@
/* -ENOPROTOOPT from the net code means the other side isn't
listening for our message type -- that's fine, it means
his dlm isn't up, so we can consider him a 'yes' but not
- joined into the domain.
- -ENOTCONN is treated similarly -- it's returned from the
- core kernel net code however and indicates that they don't
- even have their cluster networking module loaded (bad
- user!) */
- if (status == -ENOPROTOOPT || status == -ENOTCONN) {
+ joined into the domain. */
+ if (status == -ENOPROTOOPT) {
status = 0;
*response = JOIN_OK_NO_MAP;
} else if (retval == JOIN_DISALLOW ||
More information about the Ocfs2-commits
mailing list