[Ocfs2-commits] mfasheh commits r1661 - branches/dlm-glue/src

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Mon Nov 22 21:28:45 CST 2004


Author: mfasheh
Date: 2004-11-22 21:28:43 -0600 (Mon, 22 Nov 2004)
New Revision: 1661

Modified:
   branches/dlm-glue/src/dlmglue.c
   branches/dlm-glue/src/dlmglue.h
   branches/dlm-glue/src/file.c
   branches/dlm-glue/src/heartbeat.c
   branches/dlm-glue/src/heartbeat.h
   branches/dlm-glue/src/journal.c
   branches/dlm-glue/src/namei.c
   branches/dlm-glue/src/ocfs.h
   branches/dlm-glue/src/proc.c
   branches/dlm-glue/src/slot_map.c
   branches/dlm-glue/src/slot_map.h
   branches/dlm-glue/src/super.c
Log:
* set us up to do the network bits now

* fix up recovery, get our mount / umount maps in there

* cleanup more of our now no longer needed structs and flags.



Modified: branches/dlm-glue/src/dlmglue.c
===================================================================
--- branches/dlm-glue/src/dlmglue.c	2004-11-20 23:32:38 UTC (rev 1660)
+++ branches/dlm-glue/src/dlmglue.c	2004-11-23 03:28:43 UTC (rev 1661)
@@ -34,7 +34,8 @@
 #include <dlmcommon.h>
 #include <dlmhb.h>
 #include <dlmnm.h>
-#include <dlmnet.h>
+#include <dlmtcp.h>
+//#include <dlmnet.h>
 #include <dlmmod.h>
 
 #include "ocfs_log.h"
@@ -670,12 +671,6 @@
 
 	LOG_ENTRY();
 
-#warning "this is ignored for now!"
-	/* Still waiting for this to be implemented in dlmmod, for now
-	 * we fake a response */
-	if (lkm_flags & LKM_NOQUEUE)
-		return -EAGAIN;
-
 again:
 	if (signal_pending(current)) {
 		ret = -EINTR;
@@ -726,7 +721,7 @@
 		status = dlmlock(osb->dlm,
 				 level,
 				 &lockres->l_lksb,
-				 LKM_CONVERT|LKM_VALBLK,
+				 lkm_flags|LKM_CONVERT|LKM_VALBLK,
 				 lockres->l_name,
 				 ocfs2_lock_type_asts[type],
 				 lockres,
@@ -1761,12 +1756,14 @@
 	 * on the superblock, so our recovery threads (if having been
 	 * launched) are waiting on it.*/
 	ocfs_recovery_map_clear(osb, node_num);
+	ocfs_node_map_set_bit(osb, &osb->mounted_map, node_num);
 }
 
 static void ocfs2_process_umount_request(ocfs_super *osb,
 					 unsigned int node_num)
 {
 	printk("UMOUNT vote from node %u\n", node_num);
+	ocfs_node_map_clear_bit(osb, &osb->mounted_map, node_num);
 	ocfs_node_map_set_bit(osb, &osb->umount_map, node_num);
 }
 
@@ -1839,47 +1836,95 @@
 static void ocfs2_process_vote(ocfs_super *osb,
 			       ocfs2_vote_msg *msg)
 {
-	int vote_response = 0;
+	int net_status, vote_response;
 	int rename = 0;
+	unsigned int node_num, generation;
+	u64 blkno;
+	enum ocfs2_vote_request request;
 	struct inode *inode = NULL;
+	struct inode *remote_node;
+	ocfs2_msg_hdr *hdr = &msg->v_hdr;
+	ocfs2_response_msg response;
 
-	OCFS_ASSERT(!memcmp(msg->m_hdr.h_uuid, osb->uuid, MAX_VOL_ID_LENGTH));
+	/* decode the network mumbo jumbo into local variables. */
+	request = ntohl(hdr->h_request);
+	blkno = be64_to_cpu(hdr->h_blkno);
+	generation = ntohl(hdr->h_generation);
+	node_num = ntohl(hdr->h_node_num);
 
-	switch (msg->m_request) {
+	printk("ocfs2: processing vote: request = %u, blkno = %llu, "
+	       "generation = %u, node_num = %u\n", request, blkno, generation,
+	       node_num);
+
+	vote_response = 0;
+	switch (request) {
 	case OCFS2_VOTE_REQ_UMOUNT:
-		ocfs2_process_umount_request(osb, msg->m_req_node);
+		ocfs2_process_umount_request(osb, node_num);
 		goto respond;
-		break;
 	case OCFS2_VOTE_REQ_MOUNT:
-		ocfs2_process_mount_request(osb, msg->m_req_node);
+		ocfs2_process_mount_request(osb, node_num);
 		goto respond;
+	default:
+		/* avoids a gcc warning */
 		break;
 	}
 
+	/* We cannot process the remaining message types before we're
+	 * fully mounted. It's perfectly safe however to send a 'yes'
+	 * response as we can't possibly have any of the state they're
+	 * asking us to modify yet. */
+	if (atomic_read(&osb->vol_state) == VOLUME_INIT)
+		goto respond;
+
+	vote_response = -EINVAL;
 	/* If we get here, then the request is against an inode. */
-	inode = ocfs_ilookup(osb, msg->m_blkno);
+	inode = ocfs_ilookup(osb, blkno);
 	if (!inode)
 		goto respond;
 
-	OCFS_ASSERT(inode->i_generation == msg->m_generation);
+	OCFS_ASSERT(inode->i_generation == generation);
 
-	switch (msg->m_request) {
+	switch (request) {
 	case OCFS2_VOTE_REQ_DELETE:
 		vote_response = ocfs2_process_delete_request(inode);
 		break;
 	case OCFS2_VOTE_REQ_RENAME:
 		rename = 1;
+		/* fall through */
 	case OCFS2_VOTE_REQ_UNLINK:
 		ocfs2_process_dentry_request(inode, rename);
 		break;
 	default:
 		printk("ocfs2_process_vote: node %u, invalid request: %u\n",
-		       msg->m_req_node, msg->m_request);
-		vote_response = -EINVAL;
+		       node_num, request);
 	}
 
 respond:
-//vote response here...
+	/* Response struture is small so we just put it on the stack
+	 * and stuff it inline. */
+	memset(&response, 0, sizeof(ocfs2_response_msg));
+	response.r_hdr.h_response_id = hdr->h_response_id;
+	response.r_hdr.h_blkno = hdr->h_blkno;
+	response.r_hdr.h_generation = hdr->h_generation;
+	response.r_hdr.h_node_num = htonl(osb->node_num);
+	response.r_response = htonl(vote_response);
+
+	remote_node = nm_get_node_by_num(node_num);
+	if (!remote_node) {
+		LOG_ERROR_ARGS("Couldn't get inode for node %u!\n", node_num);
+	} else {
+		net_status = net_send_message(OCFS2_MESSAGE_TYPE_RESPONSE,
+					      osb->net_key,
+					      &response,
+					      sizeof(ocfs2_response_msg),
+					      remote_node,
+					      NULL);
+		if (net_status < 0)
+			LOG_ERROR_ARGS("message to node %u fails with error "
+				       "%d!\n", node_num, net_status);
+		iput(remote_node);
+	}
+
 	if (inode)
 		iput(inode);
 }
@@ -1991,14 +2036,172 @@
 	return status;
 }
 
+static ocfs2_net_wait_ctxt *ocfs2_new_net_wait_ctxt(ocfs_super *osb,
+						    unsigned int response_id)
+{
+	ocfs2_net_wait_ctxt *w;
+
+	w = kmalloc(sizeof(*w), GFP_KERNEL);
+	if (!w) {
+		LOG_ERROR_STATUS(-ENOMEM);
+		goto bail;
+	}
+	memset(w, 0, sizeof(*w));
+
+	INIT_LIST_HEAD(&w->n_list);
+	init_waitqueue_head(&w->n_event);
+	ocfs_node_map_init(osb, &w->n_node_map);
+	w->n_response_id = response_id;
+bail:
+	return w;
+}
+
+static unsigned int ocfs2_new_response_id(ocfs_super *osb)
+{
+	unsigned int ret;
+
+	spin_lock(&osb->net_response_lock);
+	ret = ++osb->net_response_ids;
+	spin_unlock(&osb->net_response_lock);
+
+	return ret;
+}
+
+static void ocfs2_dequeue_net_wait_ctxt(ocfs_super *osb,
+					ocfs2_net_wait_ctxt *w)
+{
+	spin_lock(&osb->net_response_lock);
+	list_del(&w->n_list);
+	spin_unlock(&osb->net_response_lock);
+}
+
+static void ocfs2_queue_net_wait_ctxt(ocfs_super *osb,
+				      ocfs2_net_wait_ctxt *w)
+{
+	spin_lock(&osb->net_response_lock);
+	list_add_tail(&osb->net_response_list,
+		      &w->n_list);
+	spin_unlock(&osb->net_response_lock);
+}
+
+#define OCFS2_RESPONSE_WAIT_JIFFIES (60 * (HZ >> 1))
+static int ocfs2_wait_on_vote_responses(ocfs_super *osb,
+					ocfs2_net_wait_ctxt *w)
+{
+	int status = 0;
+	signed long timeout = OCFS2_RESPONSE_WAIT_JIFFIES;
+	wait_queue_t wait;
+	init_waitqueue_entry(&wait, current);
+
+	add_wait_queue(&w->n_event, &wait);
+	while (1) {
+		set_current_state(TASK_INTERRUPTIBLE);
+
+		if (ocfs_node_map_is_empty(osb, &w->n_node_map))
+			break;
+
+		if (!signal_pending(current)) {
+			timeout = schedule_timeout(timeout);
+			if (!timeout) {
+				status = -ETIMEDOUT;
+				break;
+			}
+			continue;
+		}
+		status = -ERESTARTSYS;
+		break;
+	}
+	current->state = TASK_RUNNING;
+	remove_wait_queue(&w->n_event, &wait);
+
+	return status;
+}
+
+static int ocfs2_broadcast_vote(ocfs_super *osb,
+				ocfs2_vote_msg *request,
+				unsigned int response_id)
+{
+	int status, i, remote_err;
+	ocfs2_net_wait_ctxt *w = NULL;
+	struct inode *remote_node;
+
+	w = ocfs2_new_net_wait_ctxt(osb, response_id);
+	if (!w) {
+		status = -ENOMEM;
+		LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+
+	/* we're pretty much ready to go at this point, and this fills
+	 * in n_response which we need anyway... */
+	ocfs2_queue_net_wait_ctxt(osb, w);
+
+	i = ocfs_node_map_iterate(osb, &osb->mounted_map, 0);
+	while (i != OCFS_INVALID_NODE_NUM) {
+		if (i != osb->node_num) {
+			ocfs_node_map_set_bit(osb, &w->n_node_map, i);
+
+			remote_node = nm_get_node_by_num(i);
+			if (!remote_node) {
+				status = -EINVAL;
+				goto bail;
+			}
+
+			remote_err = 0;
+			status = net_send_message(OCFS2_MESSAGE_TYPE_VOTE,
+						  osb->net_key,
+						  request,
+						  sizeof(*request),
+						  remote_node,
+						  &remote_err);
+			iput(remote_node);
+			if (status == -ETIMEDOUT) {
+				printk("ocfs2: remote node %d timed out!\n",
+				       i);
+				status = -EAGAIN;
+				goto bail;
+			}
+			if (remote_err < 0) {
+				status = remote_err;
+				printk("ocfs2: remote error %d on node %d!\n",
+				       remote_err, i);
+				goto bail;
+			}
+			if (status < 0) {
+				LOG_ERROR_STATUS(status);
+				goto bail;
+			}
+		}
+		i = ocfs_node_map_iterate(osb, &osb->mounted_map, i);
+	}
+
+	status = ocfs2_wait_on_vote_responses(osb, w);
+	if (status < 0) {
+		if (status != -EINTR)
+			LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+
+	ocfs2_dequeue_net_wait_ctxt(osb, w);
+	status = w->n_response;
+bail:
+	if (w) {
+		ocfs2_dequeue_net_wait_ctxt(osb, w);
+		kfree(w);
+	}
+
+	return status;
+}
+
 static int ocfs2_do_request_vote(ocfs_super *osb,
 				 u64 blkno,
 				 unsigned int generation,
 				 enum ocfs2_vote_request type)
 {
 	int status;
+	unsigned int response_id;
 	ocfs2_vote_msg *request = NULL;
-	ocfs2_response_msg *response = NULL;
+	ocfs2_msg_hdr *hdr;
 
 	OCFS_ASSERT(type == OCFS2_VOTE_REQ_DELETE ||
 		    type == OCFS2_VOTE_REQ_UNLINK ||
@@ -2012,38 +2215,26 @@
 		goto bail;
 	}
 	memset(request, 0, sizeof(*request));
+	hdr = &request->v_hdr;
 
-	response = kmalloc(sizeof(*response), GFP_KERNEL);
-	if (!response) {
-		status = -ENOMEM;
-		LOG_ERROR_STATUS(status);
+	response_id = ocfs2_new_response_id(osb);
+
+	hdr->h_response_id = htonl(response_id);
+	hdr->h_request = htonl(type);
+	hdr->h_blkno = cpu_to_be64(blkno);
+	hdr->h_generation = htonl(generation);
+	hdr->h_node_num = htonl((unsigned int) osb->node_num);
+
+	status = ocfs2_broadcast_vote(osb, request, response_id);
+	if (status < 0) {
+		if (status != -EINTR)
+			LOG_ERROR_STATUS(status);
 		goto bail;
 	}
-	memset(response, 0, sizeof(*response));
-	memcpy(request->m_hdr.h_uuid, osb->uuid, MAX_VOL_ID_LENGTH);
-	request->m_hdr.h_type = OCFS2_MESSAGE_TYPE_VOTE;
-	request->m_req_node = osb->node_num;
-	request->m_request = type;
-	request->m_blkno = blkno;
-	request->m_generation = generation;
 
-	/* register for the response here */
-	/* send the broadcast request here */
-	/* wait for the response here */
-
-	OCFS_ASSERT(!memcmp(response->r_hdr.h_uuid, request->m_hdr.h_uuid,
-			    MAX_VOL_ID_LENGTH));
-	OCFS_ASSERT(response->r_hdr.h_type == OCFS2_MESSAGE_TYPE_RESPONSE);
-	OCFS_ASSERT(response->r_request == request->m_request);
-	OCFS_ASSERT(response->r_blkno == request->m_blkno);
-	OCFS_ASSERT(response->r_generation == request->m_generation);
-
-	status = response->r_response;
 bail:
 	if (request)
 		kfree(request);
-	if (response)
-		kfree(response);
 
 	return status;
 }
@@ -2062,7 +2253,8 @@
 		if (signal_pending(current))
 			return -EINTR;
 
-		if (ocfs_node_map_is_only(osb, &osb->node_map, osb->node_num))
+		if (ocfs_node_map_is_only(osb, &osb->mounted_map,
+					  osb->node_num))
 			return 0;
 
 		status = ocfs2_super_lock(osb, 0);
@@ -2104,7 +2296,8 @@
 		if (signal_pending(current))
 			return -EINTR;
 
-		if (ocfs_node_map_is_only(osb, &osb->node_map, osb->node_num))
+		if (ocfs_node_map_is_only(osb, &osb->mounted_map,
+					  osb->node_num))
 			return 0;
 
 		status = ocfs2_do_request_vote(osb, 0ULL, 0,
@@ -2122,7 +2315,8 @@
 		if (signal_pending(current))
 			return -EINTR;
 
-		if (ocfs_node_map_is_only(osb, &osb->node_map, osb->node_num))
+		if (ocfs_node_map_is_only(osb, &osb->mounted_map,
+					  osb->node_num))
 			return 0;
 
 		status = ocfs2_do_request_vote(osb, 0ULL, 0,
@@ -2130,3 +2324,185 @@
 	}
 	return status;
 }
+
+/* TODO: This should eventually be a hash table! */
+static ocfs2_net_wait_ctxt * __ocfs2_find_net_wait_ctxt(ocfs_super *osb,
+							u32 response_id)
+{
+	struct list_head *p;
+	ocfs2_net_wait_ctxt *w = NULL;
+
+	list_for_each(p, &osb->net_response_list) {
+		w = list_entry(p, ocfs2_net_wait_ctxt, n_list);
+		if (response_id == w->n_response_id)
+			break;
+		w = NULL;
+	}
+
+	return w;
+}
+
+static int ocfs2_handle_response_message(net_msg *msg,
+					 u32 len,
+					 void *data)
+{
+	unsigned int response_id, node_num;
+	int response_status;
+	ocfs_super *osb = data;
+	ocfs2_response_msg *resp;
+	ocfs2_net_wait_ctxt * w;
+
+	resp = (ocfs2_response_msg *) msg->buf;
+
+	response_id = ntohl(resp->r_hdr.h_response_id);
+	node_num = ntohl(resp->r_hdr.h_node_num);
+	response_status = ntohl(resp->r_response);
+
+	printk("recieved response message:\n");
+	printk("h_response_id = %u\n", ntohl(response_id));
+	printk("h_request = %u\n", ntohl(resp->r_hdr.h_request));
+	printk("h_blkno = %llu\n", be64_to_cpu(resp->r_hdr.h_blkno));
+	printk("h_generation = %u\n", ntohl(resp->r_hdr.h_generation));
+	printk("h_node_num = %u\n", node_num);
+	printk("r_response = %d\n", response_status);
+
+	spin_lock(&osb->net_response_lock);
+	w = __ocfs2_find_net_wait_ctxt(osb, response_id);
+	if (!w) {
+		spin_unlock(&osb->net_response_lock);
+		printk("request not found!\n");
+		goto bail;
+	}
+
+	if (response_status && (!w->n_response)) {
+		/* we only really need one negative response so don't
+		 * set it twice. */
+		w->n_response = response_status;
+	}
+
+	ocfs_node_map_clear_bit(osb, &w->n_node_map, node_num);
+	if (ocfs_node_map_is_empty(osb, &w->n_node_map))
+		wake_up_all(&w->n_event);
+	spin_unlock(&osb->net_response_lock);
+
+bail:
+	return 0;
+}
+
+static int ocfs2_handle_vote_message(net_msg *msg,
+					u32 len,
+					void *data)
+{
+	int status;
+	ocfs_super *osb = data;
+	ocfs2_vote_work *work;
+
+	work = kmalloc(sizeof(ocfs2_vote_work), GFP_KERNEL);
+	if (!work) {
+		status = -ENOMEM;
+		LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+
+	INIT_LIST_HEAD(&work->w_list);
+	memcpy(&work->w_msg, msg->buf, sizeof(ocfs2_vote_msg));
+
+	printk("scheduling vote request:\n");
+	printk("h_response_id = %u\n", work->w_msg.v_hdr.h_response_id);
+	printk("h_request = %u\n", work->w_msg.v_hdr.h_request);
+	printk("h_blkno = %llu\n", work->w_msg.v_hdr.h_blkno);
+	printk("h_generation = %u\n", work->w_msg.v_hdr.h_generation);
+	printk("h_node_num = %u\n", work->w_msg.v_hdr.h_node_num);
+
+	spin_lock(&osb->vote_task_lock);
+	list_add_tail(&osb->vote_list, &work->w_list);
+	osb->vote_count++;
+	spin_unlock(&osb->vote_task_lock);
+
+	ocfs2_kick_vote_thread(osb);
+
+	status = 0;
+bail:
+	return status;
+}
+
+int ocfs2_register_net_handlers(ocfs_super *osb)
+{
+	int status;
+	int i = MAX_VOL_ID_LENGTH - sizeof(osb->net_key);
+
+	memcpy(&osb->net_key, &osb->uuid[i], sizeof(osb->net_key));
+	osb->net_response_buf = osb->net_vote_buf = NULL;
+	osb->net_response_ids = 0;
+	spin_lock_init(&osb->net_response_lock);
+	INIT_LIST_HEAD(&osb->net_response_list);
+
+	osb->net_response_buf = kmalloc(sizeof(ocfs2_response_msg),
+					GFP_KERNEL);
+	if (!osb->net_response_buf) {
+		status = -ENOMEM;
+		LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+
+	osb->net_vote_buf = kmalloc(sizeof(ocfs2_vote_msg),
+				    GFP_KERNEL);
+	if (!osb->net_vote_buf) {
+		status = -ENOMEM;
+		LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+
+	status = net_register_handler(OCFS2_MESSAGE_TYPE_RESPONSE,
+				      osb->net_key,
+				      0,
+				      sizeof(ocfs2_response_msg),
+				      ocfs2_handle_response_message,
+				      osb,
+				      osb->net_response_buf);
+	if (status < 0) {
+		LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+
+	status = net_register_handler(OCFS2_MESSAGE_TYPE_VOTE,
+				      osb->net_key,
+				      0,
+				      sizeof(ocfs2_vote_msg),
+				      ocfs2_handle_vote_message,
+				      osb,
+				      osb->net_vote_buf);
+	if (status < 0) {
+		/* TODO: net_unregister here! */
+		LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+
+bail:
+	if (status < 0) {
+		if (osb->net_response_buf)
+			kfree(osb->net_response_buf);
+		if (osb->net_vote_buf)
+			kfree(osb->net_vote_buf);
+		osb->net_response_buf = osb->net_vote_buf = NULL;
+		/* 0 indicates we never registered anything */
+		osb->net_key = 0;
+	}
+	return status;
+}
+
+void ocfs2_unregister_net_handlers(ocfs_super *osb)
+{
+	if (!osb->net_key)
+		return;
+
+	/* TODO: net_unregister here! */
+	/* TODO: net_unregister here! */
+
+	if (!list_empty(&osb->net_response_list))
+		printk("ocfs2: net response list not empty!\n");
+
+	kfree(osb->net_response_buf);
+	kfree(osb->net_vote_buf);
+}
+

Modified: branches/dlm-glue/src/dlmglue.h
===================================================================
--- branches/dlm-glue/src/dlmglue.h	2004-11-20 23:32:38 UTC (rev 1660)
+++ branches/dlm-glue/src/dlmglue.h	2004-11-23 03:28:43 UTC (rev 1661)
@@ -120,6 +120,8 @@
 int ocfs2_request_rename_vote(struct inode *inode);
 int ocfs2_request_mount_vote(ocfs_super *osb);
 int ocfs2_request_umount_vote(ocfs_super *osb);
+int ocfs2_register_net_handlers(ocfs_super *osb);
+void ocfs2_unregister_net_handlers(ocfs_super *osb);
 
 static inline void ocfs2_lvb_set_trunc_clusters(struct inode *inode,
 						unsigned int trunc_clusters)
@@ -136,29 +138,27 @@
 	spin_unlock(&lockres->l_lock);
 }
 
+#define OCFS2_MESSAGE_TYPE_VOTE     (0x1)
+#define OCFS2_MESSAGE_TYPE_RESPONSE (0x2)
 typedef struct _ocfs2_msg_hdr
 {
-	u8  h_uuid[MAX_VOL_ID_LENGTH];
-#define OCFS2_MESSAGE_TYPE_VOTE     (0x1)
-#define OCFS2_MESSAGE_TYPE_RESPONSE (0x2)
-	u32 h_type;
+	u32 h_response_id; /* used to lookup message handle on sending
+			    * node. */
+	u32 h_request;
+	u64 h_blkno;
+	u32 h_generation;
+	u32 h_node_num;    /* node sending this particular message. */
 } ocfs2_msg_hdr;
 
 typedef struct _ocfs2_vote_msg
 {
-	ocfs2_msg_hdr m_hdr;
-	u32 m_request;
-	u32 m_req_node;
-	u64 m_blkno;
-	u32 m_generation;
+	ocfs2_msg_hdr v_hdr;
+	/* may put stuff in here... */
 } ocfs2_vote_msg;
 
 typedef struct _ocfs2_response_msg
 {
 	ocfs2_msg_hdr r_hdr;
-	u32 r_request;
-	u64 r_blkno;
-	u32 r_generation;
 	s32 r_response; /* this maps to '0' or a -value in errno.h */
 } ocfs2_response_msg;
 
@@ -176,4 +176,15 @@
 	OCFS2_VOTE_REQ_UMOUNT
 };
 
+typedef struct _ocfs2_net_wait_ctxt {
+	struct list_head   n_list;
+	u32                n_response_id;
+	wait_queue_head_t  n_event;
+	ocfs_node_map      n_node_map;
+	int                n_response; /* an agreggate response. 0 if
+					* all nodes are go, < 0 on any
+					* negative response from any
+					* node or network error. */
+} ocfs2_net_wait_ctxt;
+
 #endif

Modified: branches/dlm-glue/src/file.c
===================================================================
--- branches/dlm-glue/src/file.c	2004-11-20 23:32:38 UTC (rev 1660)
+++ branches/dlm-glue/src/file.c	2004-11-23 03:28:43 UTC (rev 1661)
@@ -92,7 +92,6 @@
 {
 	int status;
 	int mode = file->f_flags;
-	ocfs_super *osb = OCFS_SB(inode->i_sb);
 	ocfs_inode_private *oip = OCFS_I(inode);
 
 	LOG_ENTRY_ARGS ("(0x%p, 0x%p, '%*s')\n", inode, file, 
@@ -100,10 +99,6 @@
 			file->f_dentry->d_name.name);
 
 	status = -EACCES;
-	if (osb->osb_flags & OCFS_OSB_FLAGS_SHUTDOWN) {
-		LOG_ERROR_STR ("Volume has been shutdown");
-		goto leave;
-	}
 
 	spin_lock(&oip->ip_lock);
 	if (oip->ip_open_cnt &&
@@ -257,12 +252,6 @@
 	osb = OCFS_SB(inode->i_sb);
 	sector_size = 1 << osb->s_sectsize_bits;
 
-	if (osb->osb_flags & OCFS_OSB_FLAGS_SHUTDOWN) {
-		LOG_TRACE_STR ("Volume has already started shutdown");
-		ret = -EIO;
-		goto bail;
-	}
-
 	down(&inode->i_sem);
 	have_i_sem = 1;
 

Modified: branches/dlm-glue/src/heartbeat.c
===================================================================
--- branches/dlm-glue/src/heartbeat.c	2004-11-20 23:32:38 UTC (rev 1660)
+++ branches/dlm-glue/src/heartbeat.c	2004-11-23 03:28:43 UTC (rev 1661)
@@ -61,7 +61,6 @@
 				int node_num,
 				void *data);
 
-static void ocfs_node_map_init(ocfs_super *osb, ocfs_node_map *map);
 static void __ocfs_node_map_dup(ocfs_super *osb,
 				ocfs_node_map *target,
 				ocfs_node_map *from);
@@ -78,7 +77,7 @@
 void ocfs2_init_node_maps(ocfs_super *osb)
 {
 	spin_lock_init(&osb->node_map_lock);
-	ocfs_node_map_init(osb, &osb->node_map);
+	ocfs_node_map_init(osb, &osb->mounted_map);
 	ocfs_node_map_init(osb, &osb->recovery_map);
 	ocfs_node_map_init(osb, &osb->umount_map);
 }
@@ -90,7 +89,6 @@
 {
 	ocfs_super *osb = data;
 
-	ocfs_node_map_clear_bit(osb, &osb->node_map, node_num);
 	if (osb->group_inode != group)
 		return;
 
@@ -116,7 +114,6 @@
 {
 	ocfs_super *osb = data;
 
-	ocfs_node_map_set_bit(osb, &osb->node_map, node_num);
 	if (osb->group_inode != group)
 		return;
 
@@ -147,11 +144,6 @@
 	if (status < 0)
 		LOG_ERROR_STATUS(status);
 
-	status = hb_fill_node_map(osb->group_inode, &osb->node_map.map,
-				  sizeof(osb->node_map.map));
-	if (status < 0)
-		LOG_ERROR_STATUS(status);
-
 bail:
 	return status;
 }
@@ -174,8 +166,8 @@
 
 /* special case -1 for now
  * TODO: should *really* make sure the calling func never passes -1!!  */
-static void ocfs_node_map_init(ocfs_super *osb,
-			       ocfs_node_map *map)
+void ocfs_node_map_init(ocfs_super *osb,
+			ocfs_node_map *map)
 {
 	map->num_nodes = osb->max_nodes;
 	memset(map->map, 0, BITS_TO_LONGS(OCFS_NODE_MAP_MAX_NODES) * 
@@ -296,7 +288,7 @@
 			   int num)
 {
 	spin_lock(&osb->node_map_lock);
-	__ocfs_node_map_clear_bit(&osb->node_map, num);
+	__ocfs_node_map_clear_bit(&osb->mounted_map, num);
 	__ocfs_node_map_set_bit(&osb->recovery_map, num);
 	spin_unlock(&osb->node_map_lock);
 }
@@ -307,19 +299,27 @@
 	ocfs_node_map_clear_bit(osb, &osb->recovery_map, num);
 }
 
-int ocfs_node_map_first_set_bit(ocfs_super *osb,
-				ocfs_node_map *map)
+int ocfs_node_map_iterate(ocfs_super *osb,
+			  ocfs_node_map *map,
+			  int idx)
 {
-	int i, ret = -1;
+	int i = idx;
 
+	idx = OCFS_INVALID_NODE_NUM;
 	spin_lock(&osb->node_map_lock);
-	for(i = 0; i < map->num_nodes; i++)
-		if (test_bit(i, map->map)) {
-			ret = i;
-			break;
+	if ((i != OCFS_INVALID_NODE_NUM) &&
+	    (i >= 0) &&
+	    (i < map->num_nodes)) {
+		while(i < map->num_nodes) {
+			if (test_bit(i, map->map)) {
+				idx = i;
+				break;
+			}
+			i++;
 		}
+	}
 	spin_unlock(&osb->node_map_lock);
-	return ret;
+	return idx;
 }
 
 #if 0

Modified: branches/dlm-glue/src/heartbeat.h
===================================================================
--- branches/dlm-glue/src/heartbeat.h	2004-11-20 23:32:38 UTC (rev 1660)
+++ branches/dlm-glue/src/heartbeat.h	2004-11-23 03:28:43 UTC (rev 1661)
@@ -33,6 +33,7 @@
 
 /* node map functions - used to keep track of mounted and in-recovery
  * nodes. */
+void ocfs_node_map_init(ocfs_super *osb, ocfs_node_map *map);
 int ocfs_node_map_is_empty(ocfs_super *osb,
 			   ocfs_node_map *map);
 void ocfs_node_map_set_bit(ocfs_super *osb,
@@ -44,8 +45,14 @@
 int ocfs_node_map_test_bit(ocfs_super *osb,
 			   ocfs_node_map *map,
 			   int bit);
-int ocfs_node_map_first_set_bit(ocfs_super *osb,
-				ocfs_node_map *map);
+int ocfs_node_map_iterate(ocfs_super *osb,
+			  ocfs_node_map *map,
+			  int idx);
+static inline int ocfs_node_map_first_set_bit(ocfs_super *osb,
+					      ocfs_node_map *map)
+{
+	return ocfs_node_map_iterate(osb, map, 0);
+}
 void ocfs_recovery_map_set(ocfs_super *osb,
 			   int num);
 void ocfs_recovery_map_clear(ocfs_super *osb,

Modified: branches/dlm-glue/src/journal.c
===================================================================
--- branches/dlm-glue/src/journal.c	2004-11-20 23:32:38 UTC (rev 1660)
+++ branches/dlm-glue/src/journal.c	2004-11-23 03:28:43 UTC (rev 1661)
@@ -993,7 +993,7 @@
 	while(!ocfs_node_map_is_empty(osb, &osb->recovery_map)) {
 		node_num = ocfs_node_map_first_set_bit(osb,
 						       &osb->recovery_map);
-		if (node_num < 0) {
+		if (node_num == OCFS_INVALID_NODE_NUM) {
 			LOG_TRACE_ARGS("Out of nodes to recover.\n");
 			break;
 		}
@@ -1010,7 +1010,6 @@
 			printk("ocfs2: Volume requires unmount.\n");
 			continue;
 		}
-		atomic_dec(&osb->num_recovery_threads);
 	}
 	ocfs2_super_unlock(osb, 1);
 
@@ -1020,11 +1019,12 @@
 		up(&osb->recovery_lock);
 		goto restart;
 	}
-	osb->recovery_launched = 0;
-	up(&osb->recovery_lock);
 
+	osb->recovery_launched = 0;
 	wake_up_all(&osb->recovery_event);
 
+	up(&osb->recovery_lock);
+
 	LOG_EXIT_STATUS(status);
 	return status;
 }
@@ -1035,21 +1035,19 @@
 		       node_num, osb->node_num);
 
 	down(&osb->recovery_lock);
-	/* atomic_inc this here and let recover_vol dec it when
-	 * done. We do it this way to avoid races with umount. People
-	 * waiting on recovery will wait on this value to drop back
-	 * down to zero. */
-	atomic_inc(&osb->num_recovery_threads);
-	ocfs_recovery_map_set(osb, node_num);
+	if (!osb->disable_recovery) {
+		/* People waiting on recovery will wait on
+		 * the recovery map to empty. */
+		ocfs_recovery_map_set(osb, node_num);
 
-	LOG_TRACE_STR("starting recovery thread...");
+		LOG_TRACE_STR("starting recovery thread...");
 
-	if (!osb->recovery_launched) {
-		kernel_thread(__ocfs_recovery_thread, osb,
-			      CLONE_VM | CLONE_FS | CLONE_FILES);
-		osb->recovery_launched = 1;
+		if (!osb->recovery_launched) {
+			kernel_thread(__ocfs_recovery_thread, osb,
+				      CLONE_VM | CLONE_FS | CLONE_FILES);
+			osb->recovery_launched = 1;
+		}
 	}
-
 	up(&osb->recovery_lock);
 	wake_up_all(&osb->recovery_event);
 

Modified: branches/dlm-glue/src/namei.c
===================================================================
--- branches/dlm-glue/src/namei.c	2004-11-20 23:32:38 UTC (rev 1660)
+++ branches/dlm-glue/src/namei.c	2004-11-23 03:28:43 UTC (rev 1661)
@@ -288,11 +288,6 @@
 
 	/* get our super block */
 	osb = OCFS_SB(dir->i_sb);
-	if (osb->osb_flags & OCFS_OSB_FLAGS_SHUTDOWN) {
-		LOG_ERROR_STR ("Volume has been shutdown");
-		status = -EACCES;
-		goto leave;
-	}
 
 	if (S_ISDIR(mode) && (dir->i_nlink >= OCFS2_LINK_MAX)) {
 		printk("inode %llu has i_nlink of %u\n",

Modified: branches/dlm-glue/src/ocfs.h
===================================================================
--- branches/dlm-glue/src/ocfs.h	2004-11-20 23:32:38 UTC (rev 1660)
+++ branches/dlm-glue/src/ocfs.h	2004-11-23 03:28:43 UTC (rev 1661)
@@ -69,24 +69,13 @@
 #define OCFS_CURRENT_TIME               ocfs_get_seconds(CURRENT_TIME)
 #define OCFS_SET_INODE_TIME(i, x, y)    (ocfs_get_seconds(i->x) = (y))
 
-
-#define  MISS_COUNT_WARNING        20
-#define  MISS_COUNT_EMERGENCY      40
-#define  MISS_COUNT_NODE_DEAD      60
-
 #define  OCFS_MAX_OSB_ID             65536
 
 #define  OCFS_INVALID_NODE_NUM         -1
 
-/* osb->osb_flags flags */
-#define  OCFS_OSB_FLAGS_BEING_DISMOUNTED  (0x00000004)
-#define  OCFS_OSB_FLAGS_SHUTDOWN          (0x00000008)
-#define  OCFS_OSB_FLAGS_INITIALIZED       (0x00000020)
-
 /* OcfsGlobalCtxt.flags flags */
 #define  OCFS_FLAG_GLBL_CTXT_RESOURCE_INITIALIZED (0x00000001)
 #define  OCFS_FLAG_MEM_LISTS_INITIALIZED          (0x00000002)
-#define  OCFS_FLAG_SHUTDOWN_VOL_THREAD            (0x00000004)
 
 #define SHUTDOWN_SIGS   (sigmask(SIGKILL) | sigmask(SIGHUP) | \
 			 sigmask(SIGINT) | sigmask(SIGQUIT))
@@ -95,14 +84,6 @@
 
 #define OCFS_LINUX_MAX_FILE_SIZE   9223372036854775807LL
 
-#define OCFS_VOLCFG_LOCK_ITERATE	(HZ/10)	/* in jiffies */
-#define OCFS_VOLCFG_LOCK_TIME		1000    /* in ms */
-#define OCFS_VOLCFG_HDR_SECTORS		2	/* in sectors */
-#define OCFS_VOLCFG_NEWCFG_SECTORS	4	/* in sectors */
-
-#define OCFS_NM_HEARTBEAT_TIME		500	/* in ms */
-#define OCFS_HEARTBEAT_INIT             10      /* number of NM iterations to stabilize the publish map */
-	
 #ifndef O_DIRECT
 #warning this depends on the architecture!
 #define O_DIRECT        040000
@@ -355,7 +336,6 @@
 	struct list_head osb_next;	/* list of ocfs_super(s) */
 	__u32 osb_id;		/* id used by the proc interface */
 	ocfs_commit_task *commit;
-	__u32 osb_flags;
 	struct super_block *sb;
 	struct inode *root_inode;
 	struct inode *sys_root_inode;
@@ -364,7 +344,7 @@
 	struct _ocfs2_slot_info *slot_info;
 
 	spinlock_t node_map_lock;
-	ocfs_node_map node_map;
+	ocfs_node_map mounted_map;
 	ocfs_node_map recovery_map;
 	ocfs_node_map umount_map;
 
@@ -399,7 +379,6 @@
 	struct semaphore recovery_lock;
 	int recovery_launched;
 	int disable_recovery;
-	atomic_t num_recovery_threads;
 	wait_queue_head_t flush_event;
 	atomic_t flush_event_woken;
 	struct _ocfs_journal *journal;
@@ -438,6 +417,13 @@
 	int vote_count;
 	struct completion vote_event_complete;
 	struct completion vote_event_init;
+
+	u32 net_key;
+	char *net_vote_buf;
+	char *net_response_buf;
+	spinlock_t net_response_lock;
+	unsigned int net_response_ids;
+	struct list_head net_response_list;
 };
 
 typedef struct _ocfs_global_ctxt

Modified: branches/dlm-glue/src/proc.c
===================================================================
--- branches/dlm-glue/src/proc.c	2004-11-20 23:32:38 UTC (rev 1660)
+++ branches/dlm-glue/src/proc.c	2004-11-23 03:28:43 UTC (rev 1661)
@@ -486,7 +486,7 @@
 
 	if (osb) {
 		for (i = 0; i < osb->max_nodes; i++) {
-			mount = ocfs_node_map_test_bit(osb, &osb->node_map, i) ? 'M' : ' ';
+			mount = ocfs_node_map_test_bit(osb, &osb->mounted_map, i) ? 'M' : ' ';
 			len += sprintf(page + len, "%2d %c\n", i, mount);
 		}
 	}

Modified: branches/dlm-glue/src/slot_map.c
===================================================================
--- branches/dlm-glue/src/slot_map.c	2004-11-20 23:32:38 UTC (rev 1660)
+++ branches/dlm-glue/src/slot_map.c	2004-11-23 03:28:43 UTC (rev 1661)
@@ -36,6 +36,7 @@
 
 #include "dlmglue.h"
 #include "extent_map.h"
+#include "heartbeat.h"
 #include "slot_map.h"
 #include "sysfile.h"
 
@@ -49,6 +50,25 @@
 			      s16 slot_num,
 			      s16 node_num);
 
+/* Use the slot information we've collected to create a map of mounted
+ * nodes. Should be holding an EX on super block. assumes slot info is
+ * up to date. Note that we call this *after* we find a slot, so our
+ * own node should be set in the map too... */
+void ocfs2_populate_mounted_map(ocfs_super *osb)
+{
+	int i;
+	ocfs2_slot_info *si = osb->slot_info;
+
+	spin_lock(&si->si_lock);
+
+	for (i = 0; i < si->si_size; i++)
+		if (si->si_global_node_nums[i] != OCFS_INVALID_NODE_NUM)
+			ocfs_node_map_set_bit(osb, &osb->mounted_map,
+					      si->si_global_node_nums[i]);
+
+	spin_unlock(&si->si_lock);
+}
+
 /* post the slot information on disk into our slot_info struct. */
 void ocfs2_update_slot_info(ocfs2_slot_info *si)
 {
@@ -248,17 +268,6 @@
 	if (!si)
 		return;
 
-	status = ocfs2_request_umount_vote(osb);
-	if (status < 0) {
-		LOG_ERROR_STATUS(status);
-		goto bail;
-	}
-
-	/* so what happens if someone does recovery while we're
-	 * waiting for the ex? */
-
-	/* cluster lock */
-
 	ocfs2_update_slot_info(si);
 
 	spin_lock(&si->si_lock);
@@ -266,7 +275,7 @@
 	osb->slot_num = OCFS_INVALID_NODE_NUM;
 	spin_unlock(&si->si_lock);
 
-	ocfs2_update_disk_slots(osb, si);
+	status = ocfs2_update_disk_slots(osb, si);
 	if (status < 0) {
 		LOG_ERROR_STATUS(status);
 		goto bail;

Modified: branches/dlm-glue/src/slot_map.h
===================================================================
--- branches/dlm-glue/src/slot_map.h	2004-11-20 23:32:38 UTC (rev 1660)
+++ branches/dlm-glue/src/slot_map.h	2004-11-23 03:28:43 UTC (rev 1661)
@@ -52,4 +52,6 @@
 void ocfs2_clear_slot(ocfs2_slot_info *si,
 		      s16 slot_num);
 
+void ocfs2_populate_mounted_map(ocfs_super *osb);
+
 #endif

Modified: branches/dlm-glue/src/super.c
===================================================================
--- branches/dlm-glue/src/super.c	2004-11-20 23:32:38 UTC (rev 1660)
+++ branches/dlm-glue/src/super.c	2004-11-23 03:28:43 UTC (rev 1661)
@@ -157,7 +157,7 @@
 static int ocfs_init_global_system_inodes(ocfs_super *osb);
 static int ocfs_init_local_system_inodes(ocfs_super *osb);
 static int ocfs_release_system_inodes(ocfs_super *osb);
-static int ocfs2_fill_node_info(ocfs_super *osb, char **group_name);
+static int ocfs2_fill_local_node_info(ocfs_super *osb, char **group_name);
 static int ocfs2_complete_mount_recovery(ocfs_super *osb);
 static int ocfs_check_volume(ocfs_super * osb);
 static int ocfs_verify_volume(ocfs2_dinode *di, struct buffer_head *bh,
@@ -618,7 +618,6 @@
 
 	/* Signal DLM thread to exit */
 	down (&(OcfsGlobalCtxt.global_res));
-	OCFS_SET_FLAG (OcfsGlobalCtxt.flags, OCFS_FLAG_SHUTDOWN_VOL_THREAD);
 
 	if (OcfsGlobalCtxt.flags & OCFS_FLAG_MEM_LISTS_INITIALIZED)
 		ocfs_free_mem_lists ();
@@ -859,7 +858,7 @@
 	return 0;
 }
 
-static int ocfs2_fill_node_info(ocfs_super *osb, char **group_name)
+static int ocfs2_fill_local_node_info(ocfs_super *osb, char **group_name)
 {
 	int status;
 	struct inode *group = NULL;
@@ -943,7 +942,7 @@
 		goto leave;
 	}
 
-	status = ocfs2_fill_node_info(osb, group_name);
+	status = ocfs2_fill_local_node_info(osb, group_name);
 	if (status < 0) {
 		LOG_ERROR_STATUS (status);
 		goto leave;
@@ -961,6 +960,13 @@
 		goto leave;
 	}
 
+	/* requires vote_thread to be running. */
+	status = ocfs2_register_net_handlers(osb);
+	if (status < 0) {
+		LOG_ERROR_STATUS (status);
+		goto leave;
+	}
+
 	status = ocfs2_super_lock(osb, 1);
 	if (status < 0) {
 		LOG_ERROR_STATUS (status);
@@ -975,6 +981,8 @@
 		goto leave;
 	}
 
+	ocfs2_populate_mounted_map(osb);
+
 	/* load all node-local system inodes */
 	status = ocfs_init_local_system_inodes(osb);
 	if (status < 0) {
@@ -1020,6 +1028,7 @@
  */
 static void ocfs_dismount_volume (struct super_block *sb)
 {
+	int tmp;
 	ocfs_super *osb = NULL;
 
 	LOG_ENTRY_ARGS ("(0x%p)\n", sb);
@@ -1028,17 +1037,20 @@
 	osb = OCFS_SB(sb);
 	OCFS_ASSERT(osb);
 
+	ocfs_shutdown_local_alloc(osb);
+
 	/* disable any new recovery threads and wait for any currently
 	 * running ones to exit. Do this before setting the vol_state. */
 	down(&osb->recovery_lock);
 	osb->disable_recovery = 1;
-	up(&osb->recovery_lock);
-	while (atomic_read(&osb->num_recovery_threads)) {
+	while (osb->recovery_launched) {
+		up(&osb->recovery_lock);
 		LOG_TRACE_STR("Waiting on a recovery thread to complete.");
 		schedule();
+		down(&osb->recovery_lock);
 	}
+	up(&osb->recovery_lock);
 
-	ocfs_shutdown_local_alloc(osb);
 	ocfs_journal_shutdown(osb);
 
 	ocfs_sync_blockdev(sb);
@@ -1046,9 +1058,16 @@
 	/* Remove the proc element for this volume */
 	ocfs_proc_remove_volume (osb);
 
-	/* Dismount */
-	OCFS_SET_FLAG (osb->osb_flags, OCFS_OSB_FLAGS_BEING_DISMOUNTED);
+	tmp = ocfs2_super_lock(osb, 1);
+	if (tmp < 0) {
+		LOG_ERROR_STATUS(tmp);
+		return;
+	}
 
+	tmp = ocfs2_request_umount_vote(osb);
+	if (tmp < 0)
+		LOG_ERROR_STATUS(tmp);
+
 	ocfs2_put_slot(osb);
 
 	ocfs2_dlm_shutdown(osb);
@@ -1157,8 +1176,6 @@
 
 	ocfs2_init_node_maps(osb);
 
-	OCFS_CLEAR_FLAG (osb->osb_flags, OCFS_OSB_FLAGS_SHUTDOWN);
-
 	INIT_LIST_HEAD (&(osb->osb_next));
 
 	snprintf(osb->dev_str, sizeof(osb->dev_str), "%u,%u",
@@ -1167,9 +1184,8 @@
 	init_MUTEX (&(osb->recovery_lock));
 
 	osb->disable_recovery = 0;
+	osb->recovery_launched = 0;
 
-	atomic_set (&osb->num_recovery_threads, 0);
-
 	init_waitqueue_head (&osb->flush_event);
 	atomic_set (&osb->flush_event_woken, 0);
 	atomic_set (&osb->clean_buffer_seq, 1);
@@ -1246,27 +1262,13 @@
 		goto bail;
 	}
 
-	status = ocfs2_extent_map_get_blocks(inode, 0ULL, 1, &p_blkno,
-					     NULL);
-	if (status < 0) {
-		LOG_ERROR_STATUS(status);
-		goto bail;
-	}
-
 	// i_size must be at least
 	// (2 + osb->max_nodes + 4) + osb->max_nodes + osb->max_nodes
-	if (inode->i_size >> osb->sb->s_blocksize_bits < 
-	    (OCFS_VOLCFG_HDR_SECTORS + osb->max_nodes) + // autoconfig
-	    OCFS_VOLCFG_NEWCFG_SECTORS + // new autoconfig
-	    osb->max_nodes + // publish
-	    osb->max_nodes ) { // vote
+	if (inode->i_size >> osb->sb->s_blocksize_bits < OCFS2_MAX_NODES) {
 		LOG_ERROR_ARGS("dlm area size incorrect: "
 			       "found=%llu, need=%u\n", 
 			       inode->i_size,
-			       (OCFS_VOLCFG_HDR_SECTORS + 
-				OCFS_VOLCFG_NEWCFG_SECTORS +
-			       (osb->max_nodes*3)) << 
-			       osb->sb->s_blocksize_bits);
+			       OCFS2_MAX_NODES << osb->sb->s_blocksize_bits);
 		status = -EINVAL;
 		goto bail;
 
@@ -1310,9 +1312,6 @@
 	list_add_tail (&(osb->osb_next), &(OcfsGlobalCtxt.osb_next));
 	up (&(OcfsGlobalCtxt.global_res));
 
-	/*  Mark the fact that this osb structure is initialized. */
-	OCFS_SET_FLAG (osb->osb_flags, OCFS_OSB_FLAGS_INITIALIZED);
-
 	spin_lock (&osb_id_lock);
 	osb->osb_id = osb_id;
 	if (osb_id < OCFS_MAX_OSB_ID)



More information about the Ocfs2-commits mailing list