[Ocfs2-commits] khackel commits r1751 - trunk/cluster

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Mon Jan 10 16:08:59 CST 2005


Author: khackel
Date: 2005-01-10 16:08:58 -0600 (Mon, 10 Jan 2005)
New Revision: 1751

Modified:
   trunk/cluster/dlmmaster.c
   trunk/cluster/dlmmod.c
   trunk/cluster/dlmmod.h
   trunk/cluster/dlmrecovery.c
   trunk/cluster/tcp.c
   trunk/cluster/tcp.h
Log:
* removed useless net_package_message function
* changed net_dispatch_message to take a length instead of net_msg*
* proper handling of network byteorder in dlm and tcp messages



Modified: trunk/cluster/dlmmaster.c
===================================================================
--- trunk/cluster/dlmmaster.c	2005-01-08 02:58:35 UTC (rev 1750)
+++ trunk/cluster/dlmmaster.c	2005-01-10 22:08:58 UTC (rev 1751)
@@ -453,10 +453,13 @@
 	dlm_lock_resource *res;
 	dlm_master_request *request = (dlm_master_request *) msg->buf;
 	dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
-	struct qstr lockname = { .name=request->name, .len=request->namelen };
+	struct qstr lockname;
 	int found;
 	struct list_head *iter;
 
+	dlm_master_request_to_host(request);
+	lockname.name = request->name;
+	lockname.len = request->namelen;
 	lockname.hash = full_name_hash(lockname.name, lockname.len);
 
 way_up_top:	
@@ -606,9 +609,11 @@
 	dlm_master_request_resp *resp = (dlm_master_request_resp *) msg->buf;
 	int found = 0, wake = 0;
 	struct list_head *iter;
-	struct qstr lockname = { .name=resp->name, .len=resp->namelen };
-	
+	struct qstr lockname;
 
+	dlm_master_request_resp_to_host(resp);
+	lockname.name = resp->name;
+	lockname.len = resp->namelen;
 	lockname.hash = full_name_hash(lockname.name, lockname.len);
 
 	spin_lock(&dlm_master_lock);
@@ -689,8 +694,11 @@
 	dlm_lock_resource *res;
 	int bit;
 	struct list_head *iter;
-	struct qstr lockname = { .name=assert->name, .len=assert->namelen };
+	struct qstr lockname;
 
+	dlm_assert_master_to_host(assert);	
+	lockname.name = assert->name;
+	lockname.len = assert->namelen;
 	lockname.hash = full_name_hash(lockname.name, lockname.len);
 
 	spin_lock(&dlm->spinlock);
@@ -787,6 +795,7 @@
 	ret = -EINVAL;
 	inode = nm_get_group_node_by_index(dlm->group, to);
 	if (inode) {
+		dlm_master_request_to_net(&request);
 		ret = net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request, sizeof(request), inode, &response);
 		iput(inode);
 		if (ret >= 0) {
@@ -840,6 +849,8 @@
 	inode = nm_get_group_node_by_index(dlm->group, to);
 	if (!inode)
 		return -EINVAL;
+
+	dlm_master_request_resp_to_net(&resp);
 	ret = net_send_message(DLM_MASTER_REQUEST_RESP_MSG, dlm->key, &resp, sizeof(resp), inode, NULL);
 	iput(inode);
 	return ret;
@@ -882,6 +893,8 @@
 			ret = tmpret;
 			break;
 		}
+
+		dlm_assert_master_to_net(&assert);
 		tmpret = net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &assert, sizeof(assert), inode, NULL);
 		iput(inode);
 

Modified: trunk/cluster/dlmmod.c
===================================================================
--- trunk/cluster/dlmmod.c	2005-01-08 02:58:35 UTC (rev 1750)
+++ trunk/cluster/dlmmod.c	2005-01-10 22:08:58 UTC (rev 1751)
@@ -95,62 +95,6 @@
 
 extern spinlock_t dlm_master_lock;
 extern struct list_head dlm_master_list;
-
-typedef struct _dlm_create_lock
-{
-	u32 flags;   // TODO: reduce the size of this
-	u16 node_idx;
-	s8 requested_type;
-	u8 namelen;
-	u8 name[NM_MAX_NAME_LEN];
-	u64 cookie;
-} dlm_create_lock;
-
-typedef struct _dlm_convert_lock
-{
-	u32 flags;   // TODO: reduce the size of this
-	u16 node_idx;
-	s8 requested_type;
-	u8 namelen;
-	u8 name[NM_MAX_NAME_LEN];
-	u64 cookie;
-	s8 lvb[0];
-} dlm_convert_lock;
-#define DLM_CONVERT_LOCK_MAX_LEN  (sizeof(dlm_convert_lock) + DLM_LVB_LEN)
-
-typedef struct _dlm_unlock_lock
-{
-	u32 flags;   // TODO: reduce the size of this
-	u16 node_idx;
-	u8 namelen;
-	u8 name[NM_MAX_NAME_LEN];
-	u64 cookie;
-	s8 lvb[0];
-} dlm_unlock_lock;
-#define DLM_UNLOCK_LOCK_MAX_LEN  (sizeof(dlm_unlock_lock) + DLM_LVB_LEN)
-
-
-typedef struct _dlm_proxy_ast
-{
-	u32 flags;   // TODO: reduce the size of this
-	u16 node_idx;
-	u8 type;
-	u8 blocked_type;
-	u8 namelen;
-	u8 name[NM_MAX_NAME_LEN];
-	u64 cookie;
-	s8 lvb[0];
-} dlm_proxy_ast;
-#define DLM_PROXY_AST_MAX_LEN  (sizeof(dlm_proxy_ast) + DLM_LVB_LEN)
-
-
-int dlm_create_lock_handler(net_msg *msg, u32 len, void *data);
-int dlm_convert_lock_handler(net_msg *msg, u32 len, void *data);
-int dlm_proxy_ast_handler(net_msg *msg, u32 len, void *data);
-
-int dlm_unlock_lock_handler(net_msg *msg, u32 len, void *data);
-dlm_status dlm_send_remote_unlock_request(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, dlm_lockstatus *lksb, int flags);
-
 /* ----------------------------------------------------------------- */
 
 
@@ -927,6 +871,7 @@
 	lksb->status = DLM_NOLOCKMGR;
 	inode = nm_get_group_node_by_index(dlm->group, res->owner);
 	if (inode) {
+		dlm_unlock_lock_to_net(&unlock);
 		tmpret = net_send_message_arr(DLM_UNLOCK_LOCK_MSG, dlm->key, arrsz, nd, msgsz, inode, &status);
 		if (tmpret >= 0) {
 			// successfully sent and received
@@ -957,9 +902,14 @@
 	int found = 0;
 	dlm_lockstatus *lksb = NULL;
 	int ignore;
-	struct qstr lockname = { .name=unlock->name, .len=unlock->namelen };
-	u32 flags = unlock->flags;
+	struct qstr lockname;
+	u32 flags;
 
+	dlm_unlock_lock_to_host(unlock);
+	lockname.name = unlock->name;
+	lockname.len = unlock->namelen;
+	flags = unlock->flags;
+
 	if (flags & LKM_GET_LVB) {
 		dlmprintk0("bad args!  GET_LVB specified on unlock!\n");
 		return DLM_BADARGS;
@@ -1479,6 +1429,7 @@
 	ret = -EINVAL;
 	inode = nm_get_group_node_by_index(dlm->group, lock->node);
 	if (inode) {
+		dlm_proxy_ast_to_net(&past);
 		ret = net_send_message_arr(DLM_PROXY_AST_MSG, dlm->key, arrsz, nd, msgsz, inode, NULL);
 		iput(inode);
 	}
@@ -1495,11 +1446,17 @@
 	dlm_lock_resource *res;
 	dlm_lock *lock = NULL;
 	dlm_proxy_ast *past = (dlm_proxy_ast *) msg->buf;
-	struct qstr lockname = { .name=past->name, .len=past->namelen };
+	struct qstr lockname;
 	struct list_head *iter, *head=NULL;
-	u64 cookie = past->cookie;
-	u32 flags = past->flags;
+	u64 cookie;
+	u32 flags;
 
+	dlm_proxy_ast_to_host(past);
+	lockname.name = past->name;
+	lockname.len = past->namelen;
+	cookie = past->cookie;
+	flags = past->flags;
+
 	if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
 	     (LKM_PUT_LVB|LKM_GET_LVB)) {
 		dlmprintk("both PUT and GET lvb specified\n");
@@ -1633,6 +1590,7 @@
 	ret = DLM_NOLOCKMGR;
 	inode = nm_get_group_node_by_index(dlm->group, res->owner);
 	if (inode) {
+		dlm_create_lock_to_net(&create);
 		tmpret = net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create, sizeof(create), inode, &status);
 		if (tmpret >= 0) {
 			// successfully sent and received
@@ -1655,7 +1613,11 @@
 	dlm_lock *newlock;
 	dlm_lockstatus *lksb;
 	dlm_status status = DLM_NORMAL;
-	struct qstr lockname = { .name=create->name, .len=create->namelen };
+	struct qstr lockname;
+
+	dlm_create_lock_to_host(create);
+	lockname.name = create->name;
+	lockname.len = create->namelen;
 	
 	dlmprintk0("\n");
 
@@ -1744,6 +1706,7 @@
 	ret = DLM_NOLOCKMGR;
 	inode = nm_get_group_node_by_index(dlm->group, res->owner);
 	if (inode) {
+		dlm_convert_lock_to_net(&convert);
 		tmpret = net_send_message_arr(DLM_CONVERT_LOCK_MSG, dlm->key, arrsz, nd, msgsz, inode, &status);
 		if (tmpret >= 0) {
 			// successfully sent and received
@@ -1770,9 +1733,14 @@
 	dlm_lockstatus *lksb;
 	dlm_status status = DLM_NORMAL;
 	int found = 0;
-	struct qstr lockname = { .name=convert->name, .len=convert->namelen };
-	u32 flags = convert->flags;
+	struct qstr lockname;
+	u32 flags;
 
+	dlm_convert_lock_to_host(convert);
+	lockname.name = convert->name;
+	lockname.len = convert->namelen;
+	flags = convert->flags;
+
 	if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
 	     (LKM_PUT_LVB|LKM_GET_LVB)) {
 		dlmprintk("both PUT and GET lvb specified\n");

Modified: trunk/cluster/dlmmod.h
===================================================================
--- trunk/cluster/dlmmod.h	2005-01-08 02:58:35 UTC (rev 1750)
+++ trunk/cluster/dlmmod.h	2005-01-10 22:08:58 UTC (rev 1751)
@@ -182,6 +182,7 @@
 	u16 sending_node;
 	u32 next_seq;
 	util_thread_info thread;
+	unsigned long node_map[BITS_TO_LONGS(NM_MAX_NODES)];
 } dlm_recovery_ctxt;
 
 
@@ -317,18 +318,43 @@
 #define DLM_PROXY_AST_MSG		505
 #define DLM_UNLOCK_LOCK_MSG		506
 
+#define DLM_RECO_NODE_DATA_MSG          507
 
+
+typedef struct _dlm_reco_node_data
+{
+	int state;
+	u16 node_num;
+	struct list_head list;
+	struct list_head granted;
+	struct list_head converting;
+	struct list_head blocked;
+} dlm_reco_node_data;
+
 enum {
+	DLM_RECO_NODE_DATA_DEAD = -1,
+	DLM_RECO_NODE_DATA_INIT = 0,
+	DLM_RECO_NODE_DATA_REQUESTING,
+	DLM_RECO_NODE_DATA_REQUESTED,
+	DLM_RECO_NODE_DATA_RECEIVING,
+	DLM_RECO_NODE_DATA_DONE,
+	DLM_RECO_NODE_DATA_FINALIZE_SENT,
+};
+
+
+enum {
 	DLM_MASTER_RESP_NO,
 	DLM_MASTER_RESP_YES,
 	DLM_MASTER_RESP_MAYBE,
 	DLM_MASTER_RESP_ERROR
 };
 
+
 typedef struct _dlm_master_request
 {
 	u16 node_idx;
 	u8 namelen;
+	u8 pad1;
 	u8 name[NM_MAX_NAME_LEN];
 } dlm_master_request;
 
@@ -344,13 +370,151 @@
 {
 	u16 node_idx;
 	u8 namelen;
+	u8 pad1;
 	u8 name[NM_MAX_NAME_LEN];
 } dlm_assert_master;
 
+typedef struct _dlm_create_lock
+{
+	u64 cookie;
+	u32 flags;
+	u16 node_idx;
+	s8 requested_type;
+	u8 namelen;
+	u8 name[NM_MAX_NAME_LEN];
+} dlm_create_lock;
 
+typedef struct _dlm_convert_lock
+{
+	u64 cookie;
+	u32 flags;
+	u16 node_idx;
+	s8 requested_type;
+	u8 namelen;
+	u8 name[NM_MAX_NAME_LEN];
+	s8 lvb[0];
+} dlm_convert_lock;
+#define DLM_CONVERT_LOCK_MAX_LEN  (sizeof(dlm_convert_lock) + DLM_LVB_LEN)
 
+typedef struct _dlm_unlock_lock
+{
+	u64 cookie;
+	u32 flags;
+	u16 node_idx;
+	u8 namelen;
+	u8 pad1;
+	u8 name[NM_MAX_NAME_LEN];
+	s8 lvb[0];
+} dlm_unlock_lock;
+#define DLM_UNLOCK_LOCK_MAX_LEN  (sizeof(dlm_unlock_lock) + DLM_LVB_LEN)
 
+typedef struct _dlm_proxy_ast
+{
+	u64 cookie;
+	u32 flags;
+	u16 node_idx;
+	u16 pad1;
+	u8 type;
+	u8 blocked_type;
+	u8 namelen;
+	u8 pad2;
+	u8 name[NM_MAX_NAME_LEN];
+	s8 lvb[0];
+} dlm_proxy_ast;
+#define DLM_PROXY_AST_MAX_LEN  (sizeof(dlm_proxy_ast) + DLM_LVB_LEN)
 
+
+static inline void dlm_master_request_to_net(dlm_master_request *m)
+{
+	m->node_idx = htons(m->node_idx);
+}
+static inline void dlm_master_request_to_host(dlm_master_request *m)
+{
+	m->node_idx = ntohs(m->node_idx);
+}
+
+static inline void dlm_master_request_resp_to_net(dlm_master_request_resp *m)
+{
+	m->node_idx = htons(m->node_idx);
+}
+static inline void dlm_master_request_resp_to_host(dlm_master_request_resp *m)
+{
+	m->node_idx = ntohs(m->node_idx);
+}
+
+static inline void dlm_assert_master_to_net(dlm_assert_master *m)
+{
+	m->node_idx = htons(m->node_idx);
+}
+static inline void dlm_assert_master_to_host(dlm_assert_master *m)
+{
+	m->node_idx = ntohs(m->node_idx);
+}
+
+static inline void dlm_create_lock_to_net(dlm_create_lock *c)
+{
+	c->cookie = cpu_to_be64(c->cookie);
+	c->flags = htonl(c->flags);
+	c->node_idx = htons(c->node_idx);
+}
+static inline void dlm_create_lock_to_host(dlm_create_lock *c)
+{
+	c->cookie = be64_to_cpu(c->cookie);
+	c->flags = ntohl(c->flags);
+	c->node_idx = ntohs(c->node_idx);
+}
+
+static inline void dlm_convert_lock_to_net(dlm_convert_lock *c)
+{
+	c->cookie = cpu_to_be64(c->cookie);
+	c->flags = htonl(c->flags);
+	c->node_idx = htons(c->node_idx);
+}
+static inline void dlm_convert_lock_to_host(dlm_convert_lock *c)
+{
+	c->cookie = be64_to_cpu(c->cookie);
+	c->flags = ntohl(c->flags);
+	c->node_idx = ntohs(c->node_idx);
+}
+
+static inline void dlm_unlock_lock_to_net(dlm_unlock_lock *u)
+{
+	u->cookie = cpu_to_be64(u->cookie);
+	u->flags = htonl(u->flags);
+	u->node_idx = htons(u->node_idx);
+}
+static inline void dlm_unlock_lock_to_host(dlm_unlock_lock *u)
+{
+	u->cookie = be64_to_cpu(u->cookie);
+	u->flags = ntohl(u->flags);
+	u->node_idx = ntohs(u->node_idx);
+}
+
+static inline void dlm_proxy_ast_to_net(dlm_proxy_ast *a)
+{
+	a->cookie = cpu_to_be64(a->cookie);
+	a->flags = htonl(a->flags);
+	a->node_idx = htons(a->node_idx);
+}
+static inline void dlm_proxy_ast_to_host(dlm_proxy_ast *a)
+{
+	a->cookie = be64_to_cpu(a->cookie);
+	a->flags = ntohl(a->flags);
+	a->node_idx = ntohs(a->node_idx);
+}
+
+
+int dlm_create_lock_handler(net_msg *msg, u32 len, void *data);
+int dlm_convert_lock_handler(net_msg *msg, u32 len, void *data);
+int dlm_proxy_ast_handler(net_msg *msg, u32 len, void *data);
+
+int dlm_unlock_lock_handler(net_msg *msg, u32 len, void *data);
+dlm_status dlm_send_remote_unlock_request(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, dlm_lockstatus *lksb, int flags);
+
+
+
+
+
 void dlm_shuffle_lists(dlm_ctxt *dlm, dlm_lock_resource *res);
 void dlm_thread_run_lock_resources(dlm_ctxt *dlm);
 int dlm_thread(void *data);

Modified: trunk/cluster/dlmrecovery.c
===================================================================
--- trunk/cluster/dlmrecovery.c	2005-01-08 02:58:35 UTC (rev 1750)
+++ trunk/cluster/dlmrecovery.c	2005-01-10 22:08:58 UTC (rev 1751)
@@ -63,20 +63,26 @@
 			
 u16 dlm_pick_recovery_master(dlm_ctxt *dlm, u16 *new_dead_node);
 static int dlm_remaster_locks_local(dlm_ctxt *dlm);
-int dlm_init_recovery_area(dlm_ctxt *dlm, u16 dead_node, u16 num_nodes);
+int dlm_init_recovery_area(dlm_ctxt *dlm);
 int dlm_request_all_locks(dlm_ctxt *dlm, u16 request_from, u16 dead_node);
 void dlm_destroy_recovery_area(dlm_ctxt *dlm, u16 dead_node);
 
 #define DLM_RECOVERY_THREAD_MS  2000
 
-#if 0
+
+#ifdef LOUSY_RECOVERY
+
 /*
  * RECOVERY THREAD
  */
 
 void dlm_kick_recovery_thread(dlm_ctxt *dlm)
 {
-	/* wake the recovery thread */
+	/* wake the recovery thread 
+	 * this will wake the reco thread in one of three places
+	 * 1) sleeping with no recovery happening
+	 * 2) sleeping with recovery mastered elsewhere 
+	 * 3) recovery mastered here, waiting on reco data */
 	atomic_set(&dlm->reco.thread.woken, 1);
 	wake_up(&dlm->reco.thread.thread_wq);
 }
@@ -168,12 +174,12 @@
 			dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
 		}
 
-		spin_unlock(&dlm->spinlock);
-
 		if (dlm->reco.dead_node == NM_INVALID_SLOT_NUM) {
 			dlmprintk0("nothing to recover!  sleeping now!\n");
+			spin_unlock(&dlm->spinlock);
 			goto sleep;
 		}
+		spin_unlock(&dlm->spinlock);
 
 		/* take write barrier */
 		/* (stops the list reshuffling thread, proxy ast handling) */
@@ -198,49 +204,53 @@
 				spin_unlock(&dlm->spinlock);
 			}
 		}
-		
+	
+		dlmprintk("RECOVERY!  new_master=%u, this node=%u, dead_node=%u\n",
+			  dlm->reco.new_master, dlm->group_index, dlm->reco.dead_node);
 
-		if (dlm->reco.new_master == dlm->group_index) {
-			status = dlm_remaster_locks_local(dlm);
-			if (status < 0) {
-				dlmprintk("error remastering locks for node %u!!!!  retrying!\n",
-				       dlm->reco.dead_node);
-			} else {
-				// success!  see if any other nodes need recovery
-				spin_lock(&dlm->spinlock);
-				clear_bit(dlm->reco.dead_node, dlm->recovery_map);
-				spin_unlock(&dlm->spinlock);
-				dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
-				dlm->reco.new_master = NM_INVALID_SLOT_NUM;
-				dlm->reco.sending_node = NM_INVALID_SLOT_NUM;
-				dlm->reco.next_seq = 0;
-			}
+		if (dlm->reco.new_master != dlm->group_index) {
+			/* it is safe to start everything back up here
+			 * because all of the dead node's lock resources
+			 * have been marked as in-recovery */
 			up_write(&dlm->recovery_sem);
-			// pick another dead node
-			continue;
-		} else {
+			
 			// sit around until new_master is dead or done
 			// we will get signalled by the waitqueue either way
 			dlmprintk("new_master %u is recovering dead_node %u... waiting...\n",
-			       dlm->reco.new_master, dlm->reco.dead_node);
+				  dlm->reco.new_master, dlm->reco.dead_node);
+sleep:
+			atomic_set(&dlm->reco.thread.woken, 0);
+			status = util_wait_atomic_eq(&dlm->reco.thread.thread_wq,
+						     &dlm->reco.thread.woken,
+						     1, DLM_RECOVERY_THREAD_MS);
+			if (status == 0 || status == -ETIMEDOUT) {
+				if (atomic_read(&dlm->reco.thread.woken))
+					dlmprintk0("aha!!! recovery thread woken!\n");
+				else
+					dlmprintk0("timed out waiting, running again\n");
+				continue;
+			}
+			dlmprintk("recovery thread got %d while waiting\n", status);
+			break;
 		}
 
+		/* new_master == dlm->group_index */
+		status = dlm_remaster_locks_local(dlm);
+		if (status < 0) {
+			dlmprintk("error remastering locks for node %u!!!!  retrying!\n",
+			       dlm->reco.dead_node);
+		} else {
+			// success!  see if any other nodes need recovery
+			spin_lock(&dlm->spinlock);
+			clear_bit(dlm->reco.dead_node, dlm->recovery_map);
+			spin_unlock(&dlm->spinlock);
+			dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
+			dlm->reco.new_master = NM_INVALID_SLOT_NUM;
+			dlm->reco.sending_node = NM_INVALID_SLOT_NUM;
+			dlm->reco.next_seq = 0;
+		}
 		up_write(&dlm->recovery_sem);
-
-sleep:
-		atomic_set(&dlm->reco.thread.woken, 0);
-		status = util_wait_atomic_eq(&dlm->reco.thread.thread_wq, 
-					     &dlm->reco.thread.woken, 
-					     1, DLM_RECOVERY_THREAD_MS);
-		if (status == 0 || status == -ETIMEDOUT) {
-			if (atomic_read(&dlm->reco.thread.woken))
-				dlmprintk0("aha!!! recovery thread woken!\n");
-			else 
-				dlmprintk0("timed out waiting, running again\n");
-			continue;
-		}
-		dlmprintk("recovery thread got %d while waiting\n", status);
-		break;
+		// continue and look for another dead node
 	}
 
 	flush_scheduled_work();
@@ -262,11 +272,15 @@
 /* +--- remove dead_node from recovery map */
 /* +--- unset new_master and dead_node and start all over */
 
+static spinlock_t dlm_reco_state_lock = SPIN_LOCK_UNLOCKED;
 
+
 static int dlm_remaster_locks_local(dlm_ctxt *dlm)
 {
-	int num_nodes = 255, i, status = 0;
-	u32 node_map[8];
+	int num_nodes = NM_MAX_NODES, next=0, status = 0;
+	dlm_reco_node_data *ndata;
+	struct list_head *iter;
+	int all_nodes_done;
 
 
 /* +- if this node is the new master, init the temp recovery area */
@@ -276,36 +290,173 @@
 /* |- apply all temp area changes to real lock */
 /* +- send ALL DONE message to each node */
 
-
-	status = dlm_init_recovery_area(dlm, dlm->reco.dead_node, num_nodes);
+	status = dlm_init_recovery_area(dlm);
 	if (status < 0)
 		return status;
 
-	spin_lock(&dlm->spinlock);
-	num_nodes = nm_get_group_max_slots(dlm->group);
-	memcpy(node_map, dlm->node_map, sizeof(node_map));
-	spin_unlock(&dlm->spinlock);
+	spin_lock(&dlm_reco_state_lock);
+	list_for_each(iter, &dlm->reco.node_data) {
+		ndata = list_entry (iter, dlm_reco_node_data, list);
+		DLM_ASSERT(ndata->state == DLM_RECO_NODE_DATA_INIT);
+		ndata->state = DLM_RECO_NODE_DATA_REQUESTING;
 
-	for (i=0; i<num_nodes; i++) {
-		if (test_bit(i, node_map)) {
-			spin_lock(&dlm->spinlock);
-			dlm->reco.sending_node = i;
-			dlm->reco.next_seq = 0;
-			spin_unlock(&dlm->spinlock);
-			status = dlm_request_all_locks(dlm, i, dlm->reco.dead_node);
-			if (status < 0) {
-				spin_lock(&dlm->spinlock);
-				dlm->reco.sending_node = NM_INVALID_SLOT_NUM;
-				dlm->reco.next_seq = 0;
-				spin_unlock(&dlm->spinlock);
+		status = dlm_request_all_locks(dlm, ndata->node_num, dlm->reco.dead_node);
+		if (status < 0) {
+			dlm_destroy_recovery_area(dlm, dlm->reco.dead_node);
+			return status;
+		}
+
+		switch (ndata->state) {
+			case DLM_RECO_NODE_DATA_INIT:
+			case DLM_RECO_NODE_DATA_FINALIZE_SENT:
+			case DLM_RECO_NODE_DATA_REQUESTED:
+				DLM_ASSERT(0);
+				break;
+			case DLM_RECO_NODE_DATA_DEAD:
+				dlmprintk("eek.  node %u died after requesting recovery info for node %u\n",
+					  ndata->node_num, dlm->reco.dead_node);
+				spin_unlock(&dlm_reco_state_lock);
+				// start all over
 				dlm_destroy_recovery_area(dlm, dlm->reco.dead_node);
-				return status;
+				return -EAGAIN;
+			case DLM_RECO_NODE_DATA_REQUESTING:
+				ndata->state = DLM_RECO_NODE_DATA_REQUESTED;
+				dlmprintk("now receiving recovery data from node %u for dead node %u\n",
+					  ndata->node_num, dlm->reco.dead_node);
+				break;
+			case DLM_RECO_NODE_DATA_RECEIVING:
+				dlmprintk("already receiving recovery data from node %u for dead node %u\n",
+					  ndata->node_num, dlm->reco.dead_node);
+				break;
+			case DLM_RECO_NODE_DATA_DONE:
+				dlmprintk("already DONE receiving recovery data from node %u for dead node %u\n",
+					  ndata->node_num, dlm->reco.dead_node);
+				break;
+		}
+	}
+	spin_unlock(&dlm_reco_state_lock);
+
+	/* nodes should be sending reco data now
+	 * just need to wait */
+
+	while (1) {
+		/* wait to be signalled, with periodic timeout
+		 * to check for node death */
+		atomic_set(&dlm->reco.thread.woken, 0);
+		ret = util_wait_atomic_eq(&dlm->reco.thread.thread_wq,
+					     &dlm->reco.thread.woken, 1,
+					     DLM_RECOVERY_THREAD_MS);
+		if (ret == 0 || ret == -ETIMEDOUT) {
+			if (atomic_read(&dlm->reco.thread.woken))
+				dlmprintk0("waiting on reco data... aha!!! recovery thread woken!\n");
+			else
+				dlmprintk0("waiting on reco data... timed out waiting\n");
+		}
+
+		/* either way, recheck all the nodes now to see if we are
+		 * done, or if anyone died */
+		all_nodes_done = 1;
+		spin_lock(&dlm_reco_state_lock);
+		list_for_each(iter, &dlm->reco.node_data) {
+			ndata = list_entry (iter, dlm_reco_node_data, list);
+	
+			switch (ndata->state) {
+				case DLM_RECO_NODE_DATA_INIT:
+				case DLM_RECO_NODE_DATA_REQUESTING:
+					DLM_ASSERT(0);
+					break;
+				case DLM_RECO_NODE_DATA_DEAD:
+					dlmprintk("eek.  node %u died after requesting recovery info for node %u\n",
+						  ndata->node_num, dlm->reco.dead_node);
+					spin_unlock(&dlm_reco_state_lock);
+					// start all over
+					dlm_destroy_recovery_area(dlm, dlm->reco.dead_node);
+					return -EAGAIN;
+				case DLM_RECO_NODE_DATA_RECEIVING:
+				case DLM_RECO_NODE_DATA_REQUESTED:
+					all_nodes_done = 0;
+					break;
+				case DLM_RECO_NODE_DATA_DONE:
+					break;
+				case DLM_RECO_NODE_DATA_FINALIZE_SENT:
+					break;
 			}
 		}
+		spin_unlock(&dlm_reco_state_lock);
+
+		if (all_nodes_done) {
+			/* all nodes are now in DLM_RECO_NODE_DATA_DONE state 
+	 		* just send a finalize message to everyone and 
+	 		* clean up */
+			ret = dlm_finalize_recovery(dlm);
+			if (ret < 0) {
+				dlmprintk("dlm_finalize_recovery returned %d\n", ret);
+			}
+			dlm_destroy_recovery_area(dlm, dlm->reco.dead_node);
+			status = ret;
+			break;
+		}
 	}
+
 	return status;
 }
 
+int dlm_init_recovery_area(dlm_ctxt *dlm)
+{
+	int num=0, ret;
+	dlm_reco_node_data *ndata;
+	LIST_HEAD(tmplist);
+
+	spin_lock(&dlm->spinlock);
+	memcpy(dlm->reco.node_map, dlm->node_map, sizeof(dlm->node_map));
+	/* nodes can only be removed (by dying) after dropping
+	 * this lock, and death will be trapped later, so this should do */
+	spin_unlock(&dlm->spinlock);
+	
+	while (1) {
+		num = find_next_bit (dlm->reco.node_map, NM_MAX_NODES, num);
+		if (num >= NM_MAX_NODES) {
+			break;
+		}
+		DLM_ASSERT(num != dead_node);
+
+		ndata = kmalloc(sizeof(dlm_reco_node_data), GFP_KERNEL);
+		if (!ndata) {
+			dlm_destroy_recovery_area(dlm, dead_node);
+			return -ENOMEM;
+		}
+		memset(ndata, 0, sizeof(dlm_reco_node_data));
+		ndata->node_num = num;
+		ndata->state = DLM_RECO_NODE_DATA_INIT;
+		LIST_HEAD_INIT(&ndata->granted);
+		LIST_HEAD_INIT(&ndata->converting);
+		LIST_HEAD_INIT(&ndata->blocked);
+		spin_lock(&dlm_reco_state_lock);
+		list_add_tail(&ndata->list, &dlm->reco.node_data);
+		spin_unlock(&dlm_reco_state_lock);
+		num++;
+	}
+
+	return 0;
+}
+
+void dlm_destroy_recovery_area(dlm_ctxt *dlm, u16 dead_node)
+{
+	struct list_head *iter, *iter2;
+	dlm_reco_node_data *ndata;
+	LIST_HEAD(tmplist);
+
+	spin_lock(&dlm_reco_state_lock);
+	list_splice_init(&dlm->reco.node_data, &tmplist);
+	spin_unlock(&dlm_reco_state_lock);
+
+#warning this probably needs to be smarter
+	list_for_each_safe(iter, iter2, &tmplist) {
+		ndata = list_entry (iter, dlm_reco_node_data, list);
+		kfree(ndata);
+	}
+}
+
 int dlm_request_all_locks(dlm_ctxt *dlm, u16 request_from, u16 dead_node)
 {
 	dlmprintk("dlm_request_all_locks: dead node is %u, sending request to %u\n",
@@ -315,10 +466,102 @@
 	return 0;
 }
 
+typedef struct _dlm_reco_request_locks
+{
+	u16 dead_node;
+} dlm_reco_request_locks;
+
+typedef struct _dlm_reco_node_data
+{
+} dlm_reco_node_data;
+
+int dlm_request_all_locks_handler(net_msg *msg, u32 len, void *data)
+{
+#if 0
+	int status;
+	dlm_ctxt *dlm = data;
+	dlm_lock_resource *res;
+	dlm_lock *lock = NULL;
+	dlm_proxy_ast *past = (dlm_proxy_ast *) msg->buf;
+	struct qstr lockname = { .name=past->name, .len=past->namelen };
+	struct list_head *iter, *head=NULL;
+	u64 cookie = past->cookie;
+	u32 flags = past->flags;
+
+	if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
+	     (LKM_PUT_LVB|LKM_GET_LVB)) {
+		dlmprintk("both PUT and GET lvb specified\n");
+		return DLM_BADARGS;
+	}
+
+	dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : 
+		  (flags & LKM_GET_LVB ? "get lvb" : "none"));
+
+	lockname.hash = full_name_hash(lockname.name, lockname.len);
+	
+	dlmprintk("type=%d, blocked_type=%d\n", past->type, past->blocked_type);
+
+	if (past->type != DLM_AST && 
+	    past->type != DLM_BAST) {
+		dlmprintk("Eeeek unknown ast type! %d, cookie=%llu, name=%*s\n", 
+		       past->type, cookie, lockname.len, lockname.name);
+		return 0;
+	}
+
+	res = dlm_lookup_lock(dlm, &lockname);
+	if (!res) {
+		dlmprintk("eek! got %sast for unknown lockres!  cookie=%llu, name=%*s, namelen=%d\n", 
+		       past->type == DLM_AST ? "" : "b", cookie, lockname.len, lockname.name, lockname.len);
+		return 0;
+	}
 #endif
 
+}
+
+int dlm_reco_node_data_handler(net_msg *msg, u32 len, void *data)
+{
 #if 0
+	int status;
+	dlm_ctxt *dlm = data;
+	dlm_lock_resource *res;
+	dlm_lock *lock = NULL;
+	dlm_proxy_ast *past = (dlm_proxy_ast *) msg->buf;
+	struct qstr lockname = { .name=past->name, .len=past->namelen };
+	struct list_head *iter, *head=NULL;
+	u64 cookie = past->cookie;
+	u32 flags = past->flags;
 
+	if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
+	     (LKM_PUT_LVB|LKM_GET_LVB)) {
+		dlmprintk("both PUT and GET lvb specified\n");
+		return DLM_BADARGS;
+	}
+
+	dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : 
+		  (flags & LKM_GET_LVB ? "get lvb" : "none"));
+
+	lockname.hash = full_name_hash(lockname.name, lockname.len);
+	
+	dlmprintk("type=%d, blocked_type=%d\n", past->type, past->blocked_type);
+
+	if (past->type != DLM_AST && 
+	    past->type != DLM_BAST) {
+		dlmprintk("Eeeek unknown ast type! %d, cookie=%llu, name=%*s\n", 
+		       past->type, cookie, lockname.len, lockname.name);
+		return 0;
+	}
+
+	res = dlm_lookup_lock(dlm, &lockname);
+	if (!res) {
+		dlmprintk("eek! got %sast for unknown lockres!  cookie=%llu, name=%*s, namelen=%d\n", 
+		       past->type == DLM_AST ? "" : "b", cookie, lockname.len, lockname.name, lockname.len);
+		return 0;
+	}
+#endif
+}
+
+
+
 int dlm_recovery_request_handler(net_msg *msg, u32 len, void *data);
 int dlm_recovery_response_handler(net_msg *msg, u32 len, void *data);
 int dlm_recovery_lock_arr_req_handler(net_msg *msg, u32 len, void *data);
@@ -421,9 +664,6 @@
 }
 
 
-/* 
- * gawd i hate udp
- */
 int dlm_recovery_lock_arr_req_handler(net_msg *msg, u32 len, void *data)
 {
 	dlm_ctxt *dlm = data;
@@ -434,8 +674,6 @@
 	dlm_lock *lock = NULL;
 	int ret, i, out_of_order = 0;
 	
-	// TODO: ntoh(req)
-
 	ret = 0;
 	if (req->num_locks == 0)
 		goto send_response;
@@ -541,7 +779,7 @@
 }
 
 
-#endif
+#endif  /* LOUSY_RECOVERY */
 
 #warning may need to change kfree to put_lock and refcounting here
 static void dlm_do_local_recovery_cleanup(dlm_ctxt *dlm, u16 dead_node, int locked)

Modified: trunk/cluster/tcp.c
===================================================================
--- trunk/cluster/tcp.c	2005-01-08 02:58:35 UTC (rev 1750)
+++ trunk/cluster/tcp.c	2005-01-10 22:08:58 UTC (rev 1751)
@@ -149,7 +149,7 @@
 static int net_receive(void);
 static int net_accept_tcp_connections(void);
 static void net_release_tcp_sock(void);
-static int net_dispatch_message(struct inode *inode, struct socket *sock, net_msg *hdr, net_msg_handler *hnd);
+static int net_dispatch_message(struct inode *inode, struct socket *sock, int len, net_msg_handler *hnd);
 static int net_ioctl (struct inode *inode, struct file *filp, unsigned int cmd, unsigned long arg);
 
 int gsd_message_action(gsd_message *g);
@@ -287,6 +287,7 @@
 			    ret = gsd_message_action(&g);
 		    } else { 
 			    /* create the group on remote node */
+			    gsd_message_to_net(&g);
 			    ret = net_send_message(GSD_MESSAGE, 0, &g, sizeof(g), to, &response); 
 			    if (ret == 0) 
 				    ret = response;
@@ -318,6 +319,7 @@
 			    ret = gsd_message_action(&g);
 		    } else { 
 			    /* create the group on remote node */
+			    gsd_message_to_net(&g);
 			    ret = net_send_message(GSD_MESSAGE, 0, &g, sizeof(g), to, &response); 
 			    if (ret == 0) 
 				    ret = response;
@@ -522,7 +524,9 @@
 
 int gsd_message_handler(net_msg *msg, u32 len, void *data)
 {
-	return gsd_message_action((gsd_message *)msg->buf);
+	gsd_message *g = (gsd_message *)msg->buf;
+	gsd_message_to_host(g);
+	return gsd_message_action(g);
 }
 
 int gsd_message_action(gsd_message *g)
@@ -680,45 +684,7 @@
 
 
 
-net_msg * net_package_message(u32 msg_type, u32 key, void *data, u32 len)
-{
-	net_msg *ret = NULL;
-	net_msg_handler *handler = NULL;
-	u32 packet_len;
 
-	spin_lock(&net_handler_lock);
-	handler = net_lookup_handler(msg_type, key);
-	spin_unlock(&net_handler_lock);
-	
-	if (!handler) {
-		netprintk("no such message type: %u/%u\n", msg_type, key);
-		return NULL;
-	}
-	if (!net_handler_msg_len_ok(handler, len)) {
-		netprintk("len for message type %u incorrect: %u, should be %u\n", 
-		       msg_type, len, handler->max_len);
-		goto done;
-	}
-	packet_len = len + sizeof(net_msg);
-	ret = kmalloc(packet_len, GFP_KERNEL);
-	if (!ret) {
-		netprintk("failed to allocate %u bytes for message!\n", packet_len);
-		goto done;
-	}
-	memset(ret, 0, packet_len);
-	ret->magic = NET_MSG_MAGIC;
-	ret->data_len = len;
-	ret->msg_type = msg_type;
-	ret->key = key;
-	if (len > 0)
-		memcpy(&(ret->buf[0]), data, len);
-
-done:
-	if (handler)
-		net_put_handler(handler);
-	return ret;
-}
-
 /* TODO Fix */
 static void net_remove_handlers(void)
 {
@@ -888,6 +854,8 @@
  *   - all you need prior to this call is to have inited the
  *       net stuff, to have a valid inode for the node to contact 
  *       in nm, and to have registered the message handler
+ *   - if status was requested, it will be returned to the caller
+ *       already converted to host byteorder
  */
 int net_send_message(u32 msg_type, u32 key, void *data, u32 len, struct inode *inode, int *status)
 {
@@ -980,6 +948,8 @@
 		spin_unlock(&net->sock_lock); 
 	}
 
+	/* finally, convert the message header to network byte-order and send */
+	net_msg_to_net(msg);
 	ret = net_send_tcp_msg(inode, NULL, msg, packet_len);
 
 	if (status) {
@@ -1124,6 +1094,8 @@
 		spin_unlock(&net->sock_lock); 
 	}
 
+	/* finally, convert the message header to network byte-order and send */
+	net_msg_to_net(msg);
 	ret = net_send_tcp_msg(inode, NULL, msg, packet_len);
 
 	if (status) {
@@ -1216,6 +1188,9 @@
 			netprintk0("failed to receive message!\n");
 			goto error;
 		}
+
+		/* convert the header to host byteorder */
+		net_msg_to_host(&hdr);
 		netprintk("received message header... magic=%u type=%u key=%u\n", 
 			  hdr.magic, hdr.msg_type, hdr.key);
 
@@ -1253,7 +1228,7 @@
 			netprintk0("no handler for message.\n");
 			goto error;
 		}
-		err = net_dispatch_message(inode, sock, &hdr, hnd);
+		err = net_dispatch_message(inode, sock, hdr.data_len, hnd);
 
 		/* if node has requested status return, do it now */
 		if (hdr.status) {
@@ -1266,6 +1241,9 @@
 			hdr.magic = NET_MSG_STATUS_MAGIC;  // twiddle the magic
 			hdr.data_len = 0;
 			netprintk("about to send status %d\n", err);
+
+			/* hdr has been in host byteorder this whole time */
+			net_msg_to_net(&hdr);
 			tmperr = net_send_tcp_msg(inode, sock, &hdr, sizeof(net_msg));
 			netprintk0("yay, sent!\n");
 		} else if (err < 0) {
@@ -1332,12 +1310,11 @@
 	spin_unlock(&net_status_lock);
 }
 
-static int net_dispatch_message(struct inode *inode, struct socket *sock, net_msg *hdr, net_msg_handler *hnd)
+static int net_dispatch_message(struct inode *inode, struct socket *sock, int len, net_msg_handler *hnd)
 {
 	int ret = -EINVAL;
-	int len, packet_len;
+	int packet_len;
 
-	len = hdr->data_len;
 	packet_len = len + sizeof(net_msg);
 
 	spin_lock(&hnd->lock);
@@ -1360,6 +1337,10 @@
 	if (ret < 0) {
 		netprintk("net_recv_tcp_msg returned: %d\n", ret);
 	} else {
+		/* convert just the header to host byteorder
+		 * handler func is responsible for the data */
+		net_msg_to_host((net_msg *)hnd->buf);
+
 		net_num_dispatched++;
 		ret = (hnd->func)((net_msg *)hnd->buf, packet_len, hnd->data);
 	}
@@ -1492,6 +1473,7 @@
 	err.magic        = NET_MSG_MAGIC;
 	err.msg_type     = err_type;
 	err.data_len     = 0;
+	net_msg_to_net(&err);
 
         msg.msg_name     = 0;
         msg.msg_namelen  = 0;

Modified: trunk/cluster/tcp.h
===================================================================
--- trunk/cluster/tcp.h	2005-01-08 02:58:35 UTC (rev 1750)
+++ trunk/cluster/tcp.h	2005-01-10 22:08:58 UTC (rev 1751)
@@ -61,6 +61,29 @@
 	__u64 msg_num;
 	__u8  buf[0];
 } net_msg;
+
+static inline void net_msg_to_net(net_msg *m)
+{
+	m->magic = htonl(m->magic);
+	m->data_len = htonl(m->data_len);
+	m->src_node = htons(m->src_node);
+	m->dst_node = htons(m->dst_node);
+	m->msg_type = htonl(m->msg_type);
+	m->key = htonl(m->key);
+	m->status = htonl(m->status);
+	m->msg_num = cpu_to_be64(m->msg_num);
+}
+static inline void net_msg_to_host(net_msg *m)
+{
+	m->magic = ntohl(m->magic);
+	m->data_len = ntohl(m->data_len);
+	m->src_node = ntohs(m->src_node);
+	m->dst_node = ntohs(m->dst_node);
+	m->msg_type = ntohl(m->msg_type);
+	m->key = ntohl(m->key);
+	m->status = ntohl(m->status);
+	m->msg_num = be64_to_cpu(m->msg_num);
+}
 #else
 
 #define NET_MSG_MAGIC           ((u16)0xfa55)
@@ -77,6 +100,24 @@
 	__u8  buf[0];
 } net_msg;
 
+static inline void net_msg_to_net(net_msg *m)
+{
+	m->magic = htons(m->magic);
+	m->data_len = htons(m->data_len);
+	m->msg_type = htons(m->msg_type);
+	m->status = htons(m->status);
+	m->key = htonl(m->key);
+	m->msg_num = htonl(m->msg_num);
+}
+static inline void net_msg_to_host(net_msg *m)
+{
+	m->magic = ntohs(m->magic);
+	m->data_len = ntohs(m->data_len);
+	m->msg_type = ntohs(m->msg_type);
+	m->status = ntohs(m->status);
+	m->key = ntohl(m->key);
+	m->msg_num = ntohl(m->msg_num);
+}
 #endif
 
 typedef int (net_msg_handler_func)(net_msg *msg, u32 len, void *data);
@@ -200,7 +241,6 @@
 
 int net_register_handler(u32 msg_type, u32 key, int flags, 
 			 u32 max_len, net_msg_handler_func *func, void *data, void *buf);
-net_msg * net_package_message(u32 msg_type, u32 key, void *data, u32 len);
 int net_recv_tcp_msg (struct inode *inode, struct socket *sock, void *data, u32 *packet_len);
 int net_send_tcp_msg (struct inode *inode, struct socket *sock, void *data, u32 packet_len);
 int net_send_error(struct socket *sock, u32 err_type);
@@ -222,4 +262,13 @@
 	u8 name[NM_MAX_NAME_LEN];
 } gsd_message;
 
+static inline void gsd_message_to_net(gsd_message *g)
+{
+	g->from = htons(g->from);
+}
+static inline void gsd_message_to_host(gsd_message *g)
+{
+	g->from = ntohs(g->from);
+}
+
 #endif /* CLUSTER_TCP_H */



More information about the Ocfs2-commits mailing list