[Ocfs2-commits] mfasheh commits r1977 - trunk/fs/ocfs2/dlm

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Tue Mar 15 16:14:57 CST 2005


Author: mfasheh
Signed-off-by: khackel
Date: 2005-03-15 16:14:56 -0600 (Tue, 15 Mar 2005)
New Revision: 1977

Modified:
   trunk/fs/ocfs2/dlm/dlmmaster.c
   trunk/fs/ocfs2/dlm/dlmmod.c
   trunk/fs/ocfs2/dlm/dlmmod.h
   trunk/fs/ocfs2/dlm/dlmrecovery.c
Log:
* teach the dlm how to register domains safely within a
  cluster. Parallel mounts should be mostly working now.

Signed-off-by: khackel



Modified: trunk/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmaster.c	2005-03-15 22:03:06 UTC (rev 1976)
+++ trunk/fs/ocfs2/dlm/dlmmaster.c	2005-03-15 22:14:56 UTC (rev 1977)
@@ -241,8 +241,8 @@
 	}
 
 	/* copy off the node_map and register hb callbacks on our copy */
-	memcpy(mle->node_map, dlm->node_map, sizeof(mle->node_map));
-	memcpy(mle->vote_map, dlm->node_map, sizeof(mle->vote_map));
+	memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
+	memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
 	clear_bit(dlm->group_index, mle->vote_map);
 	clear_bit(dlm->group_index, mle->node_map);
 
@@ -916,7 +916,7 @@
 		 * save off the node map and clear out 
 		 * all nodes from this node forward, and
 		 * the node that called us */
-		memcpy(nodemap, dlm->node_map, sizeof(nodemap));
+		memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
 		clear_bit(request->node_idx, nodemap);
 		clear_bit(dlm->group_index, nodemap);
 		while ((bit = find_next_bit(nodemap, NM_MAX_NODES,

Modified: trunk/fs/ocfs2/dlm/dlmmod.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmod.c	2005-03-15 22:03:06 UTC (rev 1976)
+++ trunk/fs/ocfs2/dlm/dlmmod.c	2005-03-15 22:14:56 UTC (rev 1977)
@@ -72,8 +72,10 @@
 static void dlm_dump_purge_list(dlm_ctxt *dlm);
 static void dlm_dump_all_purge_lists(void);
 
+static int dlm_query_join_handler(net_msg *msg, u32 len, void *data);
+static int dlm_assert_joined_handler(net_msg *msg, u32 len, void *data);
+static int dlm_cancel_join_handler(net_msg *msg, u32 len, void *data);
 
-
 LIST_HEAD(dlm_domains);
 spinlock_t dlm_domain_lock = SPIN_LOCK_UNLOCKED;
 DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
@@ -143,8 +145,40 @@
 		entry->proc_fops = &dlm_debug_operations;
 }
 
+static int dlm_register_net_handlers(void)
+{
+	int status;
 
+	status = net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, 0,
+				      sizeof(dlm_query_join_request),
+				      dlm_query_join_handler,
+				      NULL);
+	if (status)
+		goto bail;
 
+	status = net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY, 0,
+				      sizeof(dlm_assert_joined),
+				      dlm_assert_joined_handler,
+				      NULL);
+	if (status) {
+		/* unregister handler here */
+		goto bail;
+	}
+
+	status = net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY, 0,
+				      sizeof(dlm_cancel_join),
+				      dlm_cancel_join_handler,
+				      NULL);
+	if (status) {
+		/* unregister handler here */
+		/* unregister handler here */
+		goto bail;
+	}
+
+bail:
+	return status;
+}
+
 /*
  * dlm_driver_entry()
  *
@@ -154,7 +188,6 @@
 {
 	int status;
 
-
 	dlmprintk0("Loaded dlm Driver module\n");
 	status = dlm_read_params();
 	if (status < 0)
@@ -164,7 +197,12 @@
 	if (dlm_global_index == NM_MAX_NODES)
 		return -1;
 
+	status = dlm_register_net_handlers();
+	if (status)
+		return -1;
+
 	dlm_create_dlm_debug_proc_entry();
+
 	return 0;
 }				/* dlm_driver_entry */
 
@@ -501,14 +539,16 @@
 	return res;
 }
 
-static dlm_ctxt * __dlm_lookup_domain(const char *domain)
+static dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
 {
 	dlm_ctxt *tmp = NULL;
 	struct list_head *iter;
 
+	assert_spin_locked(&dlm_domain_lock);
+
 	list_for_each(iter, &dlm_domains) {
 		tmp = list_entry (iter, dlm_ctxt, list);
-		if (strncmp(tmp->name, domain, NM_MAX_NAME_LEN)==0)
+		if (strncmp(tmp->name, domain, len)==0)
 			break;
 		tmp = NULL;
 	}
@@ -516,6 +556,14 @@
 	return tmp;
 }
 
+/* For null terminated domain strings ONLY */
+static dlm_ctxt * __dlm_lookup_domain(const char *domain)
+{
+	assert_spin_locked(&dlm_domain_lock);
+
+	return __dlm_lookup_domain_full(domain, strlen(domain));
+}
+
 /* returns true on one of two conditions:
  * 1) the domain does not exist
  * 2) the domain exists and it's state is "joined" */
@@ -677,95 +725,454 @@
 }
 EXPORT_SYMBOL(dlm_unregister_domain);
 
-static dlm_ctxt *dlm_alloc_ctxt(const char *domain,
-				struct inode *group,
-				u32 key)
+
+static void __dlm_print_nodes(dlm_ctxt *dlm)
 {
-	int i;
+	int node = -1;
+
+	assert_spin_locked(&dlm->spinlock);
+
+	dlmprintk("Nodes in my domain (\"%s\"):\n", dlm->name);
+
+	while ((node = find_next_bit(dlm->domain_map, NM_MAX_NODES, node + 1))
+	       != -1) {
+		if (node >= NM_MAX_NODES)
+			break;
+		dlmprintk(" node %d\n", node);
+	}
+}
+
+static int dlm_query_join_handler(net_msg *msg, u32 len, void *data)
+{
+	dlm_query_join_request *query;
+	enum dlm_query_join_response response;
 	dlm_ctxt *dlm = NULL;
 
-	/* if for some reason we can't get a reference on the group
-	 * inode (required) then don't even try the rest. */
-	if (!igrab(group))
-		goto leave;
+	query = (dlm_query_join_request *) msg->buf;
+	dlm_query_join_request_to_host(query);
 
-	dlm = kmalloc(sizeof(dlm_ctxt), GFP_KERNEL);
-	if (!dlm) {
-		dlmprintk0("could not allocate dlm_ctxt\n");
-		goto leave;
+	dlmprintk("node %u wants to join domain %s\n", query->node_idx,
+		  query->domain);
+
+	response = JOIN_OK_NO_MAP;
+
+	spin_lock(&dlm_domain_lock);
+	dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
+	if (dlm) {
+		spin_lock(&dlm->spinlock);
+
+		if (dlm->dlm_state == DLM_CTXT_NEW &&
+		    dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) {
+			/*If this is a brand new context and we
+			 * haven't started our join process yet, then
+			 * the other node won the race. */
+			response = JOIN_OK_NO_MAP;
+		} else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
+			/* Disallow parallel joins. */
+			response = JOIN_DISALLOW;
+		} else {
+			/* Alright we're fully a part of this domain
+			 * so we keep some state as to who's joining
+			 * and indicate to him that needs to be fixed
+			 * up. */
+			response = JOIN_OK;
+			dlm->joining_node = query->node_idx;
+		}
+
+		spin_unlock(&dlm->spinlock);
 	}
-	memset(dlm, 0, sizeof(dlm_ctxt));
+	spin_unlock(&dlm_domain_lock);
 
-	dlm->name = kmalloc(strlen(domain) + 1, GFP_KERNEL);
-	if (dlm->name == NULL) {
-		dlmprintk0("could not allocate dlm domain name\n");
-		kfree(dlm);
-		dlm = NULL;
-		goto leave;
+	dlmprintk("We respond with %u\n", response);
+
+	return response;
+}
+
+static int dlm_assert_joined_handler(net_msg *msg, u32 len, void *data)
+{
+	dlm_assert_joined *assert;
+	dlm_ctxt *dlm = NULL;
+
+	assert = (dlm_assert_joined *) msg->buf;
+	dlm_assert_joined_to_host(assert);
+
+	dlmprintk("node %u asserts join on domain %s\n", assert->node_idx,
+		  assert->domain);
+
+	spin_lock(&dlm_domain_lock);
+	dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
+	/* XXX should we consider no dlm ctxt an error? */
+	if (dlm) {
+		spin_lock(&dlm->spinlock);
+
+		/* Alright, this node has officially joined our
+		 * domain. Set him in the map and clean up our
+		 * leftover join state. */
+		BUG_ON(dlm->joining_node != assert->node_idx);
+		set_bit(assert->node_idx, dlm->domain_map);
+		dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
+
+		__dlm_print_nodes(dlm);
+
+		spin_unlock(&dlm->spinlock);
 	}
+	spin_unlock(&dlm_domain_lock);
 
-	dlm->resources = (struct list_head *) __get_free_page(GFP_KERNEL);
-	if (!dlm->resources) {
-		dlmprintk0("could not allocate dlm hash\n");
-		kfree(dlm->name);
-		kfree(dlm);
-		dlm = NULL;
-		goto leave;
+	return 0;
+}
+
+static int dlm_cancel_join_handler(net_msg *msg, u32 len, void *data)
+{
+	dlm_cancel_join *cancel;
+	dlm_ctxt *dlm = NULL;
+
+	cancel = (dlm_cancel_join *) msg->buf;
+	dlm_cancel_join_to_host(cancel);
+
+	dlmprintk("node %u cancels join on domain %s\n", cancel->node_idx,
+		  cancel->domain);
+
+	spin_lock(&dlm_domain_lock);
+	dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
+
+	if (dlm) {
+		spin_lock(&dlm->spinlock);
+
+		/* Yikes, this guy wants to cancel his join. No
+		 * problem, we simply cleanup our join state. */
+		BUG_ON(dlm->joining_node != cancel->node_idx);
+		dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
+
+		spin_unlock(&dlm->spinlock);
 	}
-	memset(dlm->resources, 0, PAGE_SIZE);
+	spin_unlock(&dlm_domain_lock);
 
-	for (i=0; i<DLM_HASH_SIZE; i++)
-		INIT_LIST_HEAD(&dlm->resources[i]);
+	return 0;
+}
 
-	strcpy(dlm->name, domain);
-	dlm->key = key;
+static int dlm_send_one_join_cancel(dlm_ctxt *dlm,
+				    unsigned int node)
+{
+	int status;
+	struct inode *node_inode;
+	dlm_cancel_join cancel_msg;
 
-	spin_lock_init(&dlm->spinlock);
-	spin_lock_init(&dlm->master_lock);
-	INIT_LIST_HEAD(&dlm->list);
-	INIT_LIST_HEAD(&dlm->dirty_list);
-	INIT_LIST_HEAD(&dlm->pending_asts);
-	INIT_LIST_HEAD(&dlm->pending_basts);
-	INIT_LIST_HEAD(&dlm->reco.resources);
-	INIT_LIST_HEAD(&dlm->reco.received);
-	INIT_LIST_HEAD(&dlm->purge_list);
+	node_inode = nm_get_group_node_by_index(dlm->group, node);
+	if (!node_inode) {
+		status = -EINVAL;
+		dlmprintk("Could not get inode for node %u!\n", node);
+		goto bail;
+	}
 
-	dlm->dlm_thread_task = NULL;
-	init_waitqueue_head(&dlm->dlm_thread_wq);
-	INIT_LIST_HEAD(&dlm->master_list);
-	INIT_LIST_HEAD(&dlm->mle_hb_events);
-	init_rwsem(&dlm->recovery_sem);
+	memset(&cancel_msg, 0, sizeof(cancel_msg));
+	cancel_msg.node_idx = dlm->group_index;
+	cancel_msg.name_len = strlen(dlm->name);
+	strncpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
 
-	/* this eats the reference we got above. */
-	dlm->group = group;
-	dlm->group_index = nm_this_node(group);
+	status = net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
+				  &cancel_msg, sizeof(cancel_msg), node_inode,
+				  NULL);
+	iput(node_inode);
+	if (status < 0) {
+		dlmprintk("net_send_message returned %d!\n", status);
+		goto bail;
+	}
 
-	dlm->reco.new_master = NM_INVALID_SLOT_NUM;
-	dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
-	dlm->reco.sending_node = NM_INVALID_SLOT_NUM;
-	dlm->reco.next_seq = 0;
-	atomic_set(&dlm->local_resources, 0);
-	atomic_set(&dlm->remote_resources, 0);
-	atomic_set(&dlm->unknown_resources, 0);
+bail:
+	return status;
+}
 
-	kref_init(&dlm->dlm_refs, dlm_ctxt_release);
-	dlm->dlm_state = DLM_CTXT_NEW;
+/* map_size should be in bytes. */
+static int dlm_send_join_cancels(dlm_ctxt *dlm,
+				 unsigned long *node_map,
+				 unsigned int map_size)
+{
+	int status, tmpstat;
+	unsigned int node;
 
-	dlmprintk("context init: refcount %u\n",
-		  atomic_read(&dlm->dlm_refs.refcount));
+	if (map_size != BITS_TO_LONGS(NM_MAX_NODES))
+		return -EINVAL;
 
-leave:
-	return dlm;
+	status = 0;
+	node = -1;
+	while ((node = find_next_bit(node_map, NM_MAX_NODES, node + 1))
+	       != -1) {
+		if (node >= NM_MAX_NODES)
+			break;
+
+		if (node == dlm->group_index)
+			continue;
+
+		tmpstat = dlm_send_one_join_cancel(dlm, node);
+		if (tmpstat) {
+			dlmprintk("Error return %d cancelling join on node "
+				  "%d\n", tmpstat, node);
+			if (!status)
+				status = tmpstat;
+		}
+	}
+
+	return status;
 }
 
-static int dlm_join_domain(dlm_ctxt *dlm)
+static int dlm_request_join(dlm_ctxt *dlm,
+			    int node,
+			    enum dlm_query_join_response *response)
 {
+	int status, retval;
+	dlm_query_join_request join_msg;
+	struct inode *node_inode;
+
+	dlmprintk("querying node %d\n", node);
+
+	node_inode = nm_get_group_node_by_index(dlm->group, node);
+	if (!node_inode) {
+		status = -EINVAL;
+		dlmprintk("Could not get inode for node %u!\n", node);
+		goto bail;
+	}
+
+	memset(&join_msg, 0, sizeof(join_msg));
+	join_msg.node_idx = dlm->group_index;
+	join_msg.name_len = strlen(dlm->name);
+	strncpy(join_msg.domain, dlm->name, join_msg.name_len);
+
+	dlm_query_join_request_to_net(&join_msg);
+
+	status = net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
+				  sizeof(join_msg), node_inode, &retval);
+	iput(node_inode);
+	if (status < 0 && status != -ENOPROTOOPT && status != -ENOTCONN) {
+		dlmprintk("net_send_message returned %d!\n", status);
+		goto bail;
+	}
+
+	/* -ENOPROTOOPT from the net code means the other side isn't
+            listening for our message type -- that's fine, it means
+            his dlm isn't up, so we can consider him a 'yes' but not
+            joined into the domain. 
+	   -ENOTCONN is treated similarly -- it's returned from the
+            core kernel net code however and indicates that they don't
+            even have their cluster networking module loaded (bad
+            user!) */
+	if (status == -ENOPROTOOPT || status == -ENOTCONN) {
+		status = 0;
+		*response = JOIN_OK_NO_MAP;
+	} else if (retval == JOIN_DISALLOW ||
+		   retval == JOIN_OK ||
+		   retval == JOIN_OK_NO_MAP) {
+		*response = retval;
+	} else {
+		status = -EINVAL;
+		dlmprintk("invalid response %d from node %u\n", retval, node);
+	}
+
+	dlmprintk("status %d, node %d response is %d\n", status, node,
+		  *response);
+
+bail:
+	return status;
+}
+
+static int dlm_send_one_join_assert(dlm_ctxt *dlm,
+				    unsigned int node)
+{
 	int status;
+	struct inode *node_inode;
+	dlm_assert_joined assert_msg;
 
-	BUG_ON(!dlm);
+	dlmprintk("Sending join assert to node %u\n", node);
 
-	dlmprintk("Join domain %s\n", dlm->name);
+	node_inode = nm_get_group_node_by_index(dlm->group, node);
+	if (!node_inode) {
+		status = -EINVAL;
+		dlmprintk("Could not get inode for node %u!\n", node);
+		goto bail;
+	}
 
+	memset(&assert_msg, 0, sizeof(assert_msg));
+	assert_msg.node_idx = dlm->group_index;
+	assert_msg.name_len = strlen(dlm->name);
+	strncpy(assert_msg.domain, dlm->name, assert_msg.name_len);
+
+	dlm_assert_joined_to_net(&assert_msg);
+
+	status = net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
+				  &assert_msg, sizeof(assert_msg), node_inode,
+				  NULL);
+	iput(node_inode);
+	if (status < 0)
+		dlmprintk("net_send_message returned %d!\n", status);
+
+bail:
+	return status;
+}
+
+static void dlm_send_join_asserts(dlm_ctxt *dlm,
+				  unsigned long *node_map)
+{
+	int status, node, live;
+
+	status = 0;
+	node = -1;
+	while ((node = find_next_bit(node_map, NM_MAX_NODES, node + 1))
+	       != -1) {
+		if (node >= NM_MAX_NODES)
+			break;
+
+		if (node == dlm->group_index)
+			continue;
+
+		do {
+			/* It is very important that this message be
+			 * received so we spin until either the node
+			 * has died or it gets the message. */
+			status = dlm_send_one_join_assert(dlm, node);
+
+			spin_lock(&dlm->spinlock);
+			live = test_bit(node, dlm->live_nodes_map);
+			spin_unlock(&dlm->spinlock);
+
+			if (status) {
+				dlmprintk("Error return %d asserting join on "
+					  "node %d\n", status, node);
+
+				/* give us some time betweek errors... */
+				if (live)
+					schedule();
+			}
+		} while (status && live);
+	}
+}
+
+struct domain_join_ctxt {
+	unsigned long live_map[BITS_TO_LONGS(NM_MAX_NODES)];
+	unsigned long yes_resp_map[BITS_TO_LONGS(NM_MAX_NODES)];
+};
+
+static int dlm_should_restart_join(dlm_ctxt *dlm,
+				   struct domain_join_ctxt *ctxt,
+				   enum dlm_query_join_response response)
+{
+	int ret;
+
+	if (response == JOIN_DISALLOW) {
+		dlmprintk("Latest response of disallow -- should restart\n");
+		return 1;
+	}
+
+	spin_lock(&dlm->spinlock);
+	/* For now, we restart the process if the node maps have
+	 * changed at all */
+	ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
+		     sizeof(dlm->live_nodes_map));
+	spin_unlock(&dlm->spinlock);
+
+	if (ret)
+		dlmprintk("Node maps changed -- should restart\n");
+
+	return ret;
+}
+
+static int dlm_try_to_join_domain(dlm_ctxt *dlm)
+{
+	int status, tmpstat, node;
+	struct domain_join_ctxt *ctxt;
+	enum dlm_query_join_response response;
+
+	ctxt = kmalloc(sizeof(struct domain_join_ctxt), GFP_KERNEL);
+	if (!ctxt) {
+		dlmprintk("No memory for domain_join_ctxt\n");
+		status = -ENOMEM;
+		goto bail;
+	}
+	memset(ctxt, 0, sizeof(*ctxt));
+
+	/* group sem locking should work for us here -- we're already
+	 * registered for heartbeat events so filling this should be
+	 * atomic wrt getting those handlers called. */
+	status = hb_fill_node_map(dlm->group, dlm->live_nodes_map,
+				  sizeof(dlm->live_nodes_map));
+	if (status < 0) {
+		dlmprintk("I couldn't fill my node map!\n");
+		goto bail;
+	}
+
+	spin_lock(&dlm->spinlock);
+	memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
+
+	dlm->joining_node = dlm->group_index;
+
+	spin_unlock(&dlm->spinlock);
+
+	node = -1;
+	while ((node = find_next_bit(ctxt->live_map, NM_MAX_NODES, node + 1))
+	       != -1) {
+		if (node >= NM_MAX_NODES)
+			break;
+
+		if (node == dlm->group_index)
+			continue;
+
+		status = dlm_request_join(dlm, node, &response);
+		if (status < 0) {
+			dlmprintk("%d return from request_join!\n", status);
+			goto bail;
+		}
+
+		/* Ok, either we got a response or the node doesn't have a
+		 * dlm up. */
+		if (response == JOIN_OK)
+			set_bit(node, ctxt->yes_resp_map);
+
+		if (dlm_should_restart_join(dlm, ctxt, response)) {
+			status = -EAGAIN;
+			goto bail;
+		}
+	}
+
+	dlmprintk("Yay, done querying nodes!\n");
+
+	/* Yay, everyone agree's we can join the domain. My domain is
+	 * comprised of all nodes who were put in the
+	 * yes_resp_map. Copy that into our domain map and send a join
+	 * assert message to clean up everyone elses state. */
+	spin_lock(&dlm->spinlock);
+	memcpy(dlm->domain_map, ctxt->yes_resp_map,
+	       sizeof(ctxt->yes_resp_map));
+	set_bit(dlm->group_index, dlm->domain_map);
+	spin_unlock(&dlm->spinlock);
+
+	dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
+
+	spin_lock(&dlm->spinlock);
+	dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
+	__dlm_print_nodes(dlm);
+	spin_unlock(&dlm->spinlock);
+
+bail:
+	if (ctxt) {
+		/* Do we need to send a cancel message to any nodes? */
+		if (status < 0) {
+			tmpstat = dlm_send_join_cancels(dlm,
+							ctxt->yes_resp_map,
+							sizeof(ctxt->yes_resp_map));
+			if (tmpstat < 0)
+				dlmprintk("%d return cancelling join!\n",
+					  tmpstat);
+		}
+		kfree(ctxt);
+	}
+
+	return status;
+}
+
+static int dlm_register_domain_handlers(dlm_ctxt *dlm)
+{
+	int status;
+
+	dlmprintk("registering handlers.\n");
+
 	hb_setup_callback(&dlm->dlm_hb_down, HB_NODE_DOWN_CB,
 			  dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI);
 	status = hb_register_callback(&dlm->dlm_hb_down);
@@ -778,31 +1185,6 @@
 	if (status)
 		goto bail;
 
-	/* TODO: need to use hb_fill_node_map to fill a temporary
-	 * votemap then communicate with each of these nodes that I
-	 * want to come up FOR THIS DLM.  there may be many nodes in
-	 * this group heartbeating but they may not care about this
-	 * particular dlm instance.  once everyone has come back with
-	 * a response that i have been added or that they are not a
-	 * member I can put together the REAL node map for this dlm in
-	 * dlm->node_map */
-	/* TODO: I guess we can fill this here as a superset of
-	 * possible nodes so that the hb_callbacks above have
-	 * something to work on in the meantime, then trim out the
-	 * nodes that are not part of this dlm once we know */
-	/* TODO: I may need to register a special net handler on
-	 * insmod of dlm.o with a key of 0 so that I can respond to
-	 * requests even if I am not part of a dlm group.  this would
-	 * still leave a gap in time between the start of heartbeating
-	 * and the insmod dlm.o, unless I change the module loading
-	 * stuff in clusterbo to include dlm.o (which would work
-	 * fine) */
-#warning WRONG WRONG WRONG
-	status = hb_fill_node_map(dlm->group, dlm->node_map,
-				  sizeof(dlm->node_map));
-	if (status)
-		goto bail;
-
 	status = net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key, 0, 
 				      sizeof(dlm_master_request), 
 				      dlm_master_request_handler,
@@ -845,8 +1227,23 @@
 				      DLM_PROXY_AST_MAX_LEN,
 				      dlm_proxy_ast_handler,
 				      dlm);
-	if (status)
+bail:
+	return status;
+}
+
+static int dlm_join_domain(dlm_ctxt *dlm)
+{
+	int status;
+
+	BUG_ON(!dlm);
+
+	dlmprintk("Join domain %s\n", dlm->name);
+
+	status = dlm_register_domain_handlers(dlm);
+	if (status) {
+		dlmprintk("Error %d registering handlers!\n", status);
 		goto bail;
+	}
 
 	status = dlm_launch_thread(dlm);
 	if (status < 0) {
@@ -854,6 +1251,27 @@
 		goto bail;
 	}
 
+	do {
+		status = dlm_try_to_join_domain(dlm);
+
+		/* If we're racing another node to the join, then we
+		 * need to back off temporarily and let them
+		 * complete. */
+		if (status == -EAGAIN) {
+			schedule();
+
+			if (signal_pending(current)) {
+				status = -EINTR;
+				goto bail;
+			}
+		}
+	} while (status == -EAGAIN);
+
+	if (status < 0) {
+		dlmprintk("Joining broke! %d\n", status);
+		goto bail;
+	}
+
 	spin_lock(&dlm_domain_lock);
 	dlm->num_joins++;
 	dlm->dlm_state = DLM_CTXT_JOINED;
@@ -866,6 +1284,87 @@
 	return status;
 }
 
+static dlm_ctxt *dlm_alloc_ctxt(const char *domain,
+				struct inode *group,
+				u32 key)
+{
+	int i;
+	dlm_ctxt *dlm = NULL;
+
+	/* if for some reason we can't get a reference on the group
+	 * inode (required) then don't even try the rest. */
+	if (!igrab(group))
+		goto leave;
+
+	dlm = kmalloc(sizeof(dlm_ctxt), GFP_KERNEL);
+	if (!dlm) {
+		dlmprintk0("could not allocate dlm_ctxt\n");
+		goto leave;
+	}
+	memset(dlm, 0, sizeof(dlm_ctxt));
+
+	dlm->name = kmalloc(strlen(domain) + 1, GFP_KERNEL);
+	if (dlm->name == NULL) {
+		dlmprintk0("could not allocate dlm domain name\n");
+		kfree(dlm);
+		dlm = NULL;
+		goto leave;
+	}
+
+	dlm->resources = (struct list_head *) __get_free_page(GFP_KERNEL);
+	if (!dlm->resources) {
+		dlmprintk0("could not allocate dlm hash\n");
+		kfree(dlm->name);
+		kfree(dlm);
+		dlm = NULL;
+		goto leave;
+	}
+	memset(dlm->resources, 0, PAGE_SIZE);
+
+	for (i=0; i<DLM_HASH_SIZE; i++)
+		INIT_LIST_HEAD(&dlm->resources[i]);
+
+	strcpy(dlm->name, domain);
+	dlm->key = key;
+
+	spin_lock_init(&dlm->spinlock);
+	spin_lock_init(&dlm->master_lock);
+	INIT_LIST_HEAD(&dlm->list);
+	INIT_LIST_HEAD(&dlm->dirty_list);
+	INIT_LIST_HEAD(&dlm->reco.resources);
+	INIT_LIST_HEAD(&dlm->reco.received);
+	INIT_LIST_HEAD(&dlm->purge_list);
+
+	dlm->dlm_thread_task = NULL;
+	init_waitqueue_head(&dlm->dlm_thread_wq);
+	INIT_LIST_HEAD(&dlm->master_list);
+	INIT_LIST_HEAD(&dlm->mle_hb_events);
+	init_rwsem(&dlm->recovery_sem);
+
+	/* this eats the reference we got above. */
+	dlm->group = group;
+	dlm->group_index = nm_this_node(group);
+
+	dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
+
+	dlm->reco.new_master = NM_INVALID_SLOT_NUM;
+	dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
+	dlm->reco.sending_node = NM_INVALID_SLOT_NUM;
+	dlm->reco.next_seq = 0;
+	atomic_set(&dlm->local_resources, 0);
+	atomic_set(&dlm->remote_resources, 0);
+	atomic_set(&dlm->unknown_resources, 0);
+
+	kref_init(&dlm->dlm_refs, dlm_ctxt_release);
+	dlm->dlm_state = DLM_CTXT_NEW;
+
+	dlmprintk("context init: refcount %u\n",
+		  atomic_read(&dlm->dlm_refs.refcount));
+
+leave:
+	return dlm;
+}
+
 /*
  * dlm_register_domain: one-time setup per "domain"
  */

Modified: trunk/fs/ocfs2/dlm/dlmmod.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmod.h	2005-03-15 22:03:06 UTC (rev 1976)
+++ trunk/fs/ocfs2/dlm/dlmmod.h	2005-03-15 22:14:56 UTC (rev 1977)
@@ -215,7 +215,9 @@
 	struct inode *group;
 	u32 key;
 	u8  group_index;
-	unsigned long node_map[BITS_TO_LONGS(NM_MAX_NODES)];
+	u8  joining_node;
+	unsigned long live_nodes_map[BITS_TO_LONGS(NM_MAX_NODES)];
+	unsigned long domain_map[BITS_TO_LONGS(NM_MAX_NODES)];
 	unsigned long recovery_map[BITS_TO_LONGS(NM_MAX_NODES)];
 	dlm_recovery_ctxt reco;
 	spinlock_t master_lock;
@@ -378,6 +380,9 @@
 
 #define DLM_RECO_NODE_DATA_MSG          507
 
+#define DLM_QUERY_JOIN_MSG		510
+#define DLM_ASSERT_JOINED_MSG		511
+#define DLM_CANCEL_JOIN_MSG		512
 
 typedef struct _dlm_reco_node_data
 {
@@ -499,7 +504,61 @@
 } dlm_proxy_ast;
 #define DLM_PROXY_AST_MAX_LEN  (sizeof(dlm_proxy_ast) + DLM_LVB_LEN)
 
+#define DLM_MOD_KEY (0x666c6172)
+enum dlm_query_join_response {
+	JOIN_DISALLOW = 0,
+	JOIN_OK,
+	JOIN_OK_NO_MAP,
+};
 
+typedef struct _dlm_query_join_request
+{
+	u8 node_idx;
+	u8 pad1[2];
+	u8 name_len;
+	u8 domain[NM_MAX_NAME_LEN];
+} dlm_query_join_request;
+
+typedef struct _dlm_assert_joined
+{
+	u8 node_idx;
+	u8 pad1[2];
+	u8 name_len;
+	u8 domain[NM_MAX_NAME_LEN];
+} dlm_assert_joined;
+
+typedef struct _dlm_cancel_join
+{
+	u8 node_idx;
+	u8 pad1[2];
+	u8 name_len;
+	u8 domain[NM_MAX_NAME_LEN];
+} dlm_cancel_join;
+
+static inline void dlm_query_join_request_to_net(dlm_query_join_request *m)
+{
+	/* do nothing */
+}
+static inline void dlm_query_join_request_to_host(dlm_query_join_request *m)
+{
+	/* do nothing */
+}
+static inline void dlm_assert_joined_to_net(dlm_assert_joined *m)
+{
+	/* do nothing */
+}
+static inline void dlm_assert_joined_to_host(dlm_assert_joined *m)
+{
+	/* do nothing */
+}
+static inline void dlm_cancel_join_to_net(dlm_cancel_join *m)
+{
+	/* do nothing */
+}
+static inline void dlm_cancel_join_to_host(dlm_cancel_join *m)
+{
+	/* do nothing */
+}
 static inline void dlm_master_request_to_net(dlm_master_request *m)
 {
 	/* do nothing */
@@ -699,9 +758,7 @@
 void dlm_hb_node_down_cb(struct inode *group, struct inode *node, int idx, void *data);
 void dlm_hb_node_up_cb(struct inode *group, struct inode *node, int idx, void *data);
 int dlm_hb_node_dead(dlm_ctxt *dlm, int node);
-int dlm_hb_node_up(dlm_ctxt *dlm, int node);
 int __dlm_hb_node_dead(dlm_ctxt *dlm, int node);
-int __dlm_hb_node_up(dlm_ctxt *dlm, int node);
 
 int dlm_lock_owner_broadcast(dlm_ctxt *dlm, dlm_lock_resource *res);
 int dlm_master_request_handler(net_msg *msg, u32 len, void *data);

Modified: trunk/fs/ocfs2/dlm/dlmrecovery.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmrecovery.c	2005-03-15 22:03:06 UTC (rev 1976)
+++ trunk/fs/ocfs2/dlm/dlmrecovery.c	2005-03-15 22:14:56 UTC (rev 1977)
@@ -124,25 +124,39 @@
 		return;
 
 	spin_lock(&dlm->spinlock);
+
+	clear_bit(idx, dlm->live_nodes_map);
+
+	/* Clean up join state on node death. */
+	if (dlm->joining_node == idx) {
+		dlmprintk("Clearing join state for node %u\n", idx);
+		dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
+	}
+
 	/* notify any mles attached to the heartbeat events */
 	list_for_each(iter, &dlm->mle_hb_events) {
 		mle = list_entry(iter, dlm_master_list_entry, hb_events);
 		dlm_mle_node_down(dlm, mle, group, node, idx);
 	}
 
-	if (!test_bit(idx, dlm->node_map))
-		dlmprintk("node %u already removed from nodemap!\n", idx);
-	else {
-		dlmprintk("node %u being removed from nodemap!\n", idx);
-		clear_bit(idx, dlm->node_map);
+	if (!test_bit(idx, dlm->domain_map)) {
+		/* This also catches the case that we get a node down
+		 * but haven't joined the domain yet. */
+		dlmprintk("node %u already removed from domain!\n", idx);
+		goto bail;
 	}
 
+	dlmprintk("node %u being removed from domain map!\n", idx);
+	clear_bit(idx, dlm->domain_map);
+
 	if (test_bit(idx, dlm->recovery_map))
 		dlmprintk("node %u already added to recovery map!\n", idx);
 	else {
 		set_bit(idx, dlm->recovery_map);
 		dlm_do_local_recovery_cleanup(dlm, idx);
 	}
+
+bail:
 	spin_unlock(&dlm->spinlock);
 
 	dlm_put(dlm);
@@ -158,24 +172,15 @@
 		return;
 
 	spin_lock(&dlm->spinlock);
+
+	set_bit(idx, dlm->live_nodes_map);
+
 	/* notify any mles attached to the heartbeat events */
 	list_for_each(iter, &dlm->mle_hb_events) {
 		mle = list_entry(iter, dlm_master_list_entry, hb_events);
 		dlm_mle_node_up(dlm, mle, group, node, idx);
 	}
 
-
-	if (test_bit(idx, dlm->recovery_map)) {
-		dlmprintk("BUG!!! node up message on node in recovery (%u)!!!\n", idx);
-	} else {
-		if (test_bit(idx, dlm->node_map))
-			dlmprintk("node %u already in node map!!!\n", idx);
-		else {
-			dlmprintk("node %u being added to node map!!!\n", idx);
-			set_bit(idx, dlm->node_map);
-		}
-	}
-
 	spin_unlock(&dlm->spinlock);
 
 	dlm_put(dlm);
@@ -188,13 +193,6 @@
 	return 0;
 }
 
-int __dlm_hb_node_up(dlm_ctxt *dlm, int node)
-{
-	if (test_bit(node, dlm->node_map))
-		return 1;
-	return 0;
-}
-
 int dlm_hb_node_dead(dlm_ctxt *dlm, int node)
 {
 	int ret;
@@ -204,15 +202,6 @@
 	return ret;
 }
 
-int dlm_hb_node_up(dlm_ctxt *dlm, int node)
-{
-	int ret;
-	spin_lock(&dlm->spinlock);
-	ret = __dlm_hb_node_up(dlm, node);
-	spin_unlock(&dlm->spinlock);
-	return ret;
-}
-
 u8 dlm_pick_recovery_master(dlm_ctxt *dlm, u8 *new_dead_node)
 {
 	u8 master = 0;



More information about the Ocfs2-commits mailing list