[Ocfs2-commits] khackel commits r1964 - trunk/fs/ocfs2/dlm

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Tue Mar 8 18:39:02 CST 2005


Author: khackel
Signed-off-by: mfasheh
Date: 2005-03-08 18:39:00 -0600 (Tue, 08 Mar 2005)
New Revision: 1964

Modified:
   trunk/fs/ocfs2/dlm/dlmmaster.c
   trunk/fs/ocfs2/dlm/dlmmod.h
   trunk/fs/ocfs2/dlm/dlmrecovery.c
Log:
* changed refcounting of master list entries from atomic_t to kref

Signed-off-by: mfasheh



Modified: trunk/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmaster.c	2005-03-08 22:18:28 UTC (rev 1963)
+++ trunk/fs/ocfs2/dlm/dlmmaster.c	2005-03-09 00:39:00 UTC (rev 1964)
@@ -70,11 +70,13 @@
 	spin_lock(&dlm->master_lock);
 
 	list_for_each(iter, &dlm->master_list) {
+		struct kref *k;
 		mle = list_entry(iter, dlm_master_list_entry, list);
 		
+		k = &mle->mle_refs;
 		type = (mle->type == DLM_MLE_BLOCK ? "BLK" : "MAS");
 		err = (mle->error ? 'Y' : 'N');
-		refs = atomic_read(&mle->refcnt);
+		refs = atomic_read(&k->refcount);
 		master = mle->master;
 		attached = (list_empty(&mle->hb_events) ? 'N' : 'Y');
 	
@@ -116,6 +118,7 @@
 #endif
 
 
+static void dlm_mle_release(struct kref *kref);
 static void dlm_init_mle(dlm_master_list_entry *mle,
 			enum dlm_mle_type type,
 			dlm_ctxt *dlm,
@@ -172,12 +175,19 @@
 }
 
 
-static inline void dlm_mle_detach_hb_events(dlm_ctxt *dlm, 
+static inline void __dlm_mle_detach_hb_events(dlm_ctxt *dlm,
+					      dlm_master_list_entry *mle)
+{
+	if (!list_empty(&mle->hb_events))
+		list_del_init(&mle->hb_events);
+}
+
+
+static inline void dlm_mle_detach_hb_events(dlm_ctxt *dlm,
 					    dlm_master_list_entry *mle)
 {
 	spin_lock(&dlm->spinlock);
-	if (!list_empty(&mle->hb_events))
-		list_del_init(&mle->hb_events);
+	__dlm_mle_detach_hb_events(dlm, mle);
 	spin_unlock(&dlm->spinlock);
 }
 
@@ -190,21 +200,16 @@
 	DLM_ASSERT(mle->dlm);
 	dlm = mle->dlm;
 
-	if (atomic_dec_and_lock(&mle->refcnt, &dlm->master_lock)) {
-		/* remove from list if not already */
-		if (!list_empty(&mle->list))
-			list_del(&mle->list);
-		spin_unlock(&dlm->master_lock);
-
-		/* detach the mle from the domain node up/down events */
-		dlm_mle_detach_hb_events(dlm, mle);
-		kfree(mle);
-	}
+	spin_lock(&dlm->spinlock);
+	spin_lock(&dlm->master_lock);
+	kref_put(&mle->mle_refs, dlm_mle_release);
+	spin_unlock(&dlm->master_lock);
+	spin_unlock(&dlm->spinlock);
 }
 
 static inline void dlm_get_mle(dlm_master_list_entry *mle)
 {
-	atomic_inc(&mle->refcnt);
+	kref_get(&mle->mle_refs);
 }
 
 
@@ -225,7 +230,7 @@
 	spin_lock_init(&mle->spinlock);
 	init_waitqueue_head(&mle->wq);
 	atomic_set(&mle->woken, 0);
-	atomic_set(&mle->refcnt, 1);
+	kref_init(&mle->mle_refs, dlm_mle_release);
 	memset(mle->response_map, 0, sizeof(mle->response_map));
 	mle->master = NM_MAX_NODES;
 	mle->error = 0;
@@ -322,6 +327,36 @@
 }
 
 
+static void dlm_mle_release(struct kref *kref)
+{
+	dlm_master_list_entry *mle;
+	dlm_ctxt *dlm;
+
+	dlmprintk0("\n");
+
+	DLM_ASSERT(kref);
+
+	mle = container_of(kref, dlm_master_list_entry, mle_refs);
+
+	DLM_ASSERT(mle->dlm);
+	dlm = mle->dlm;
+
+	assert_spin_locked(&dlm->spinlock);
+	assert_spin_locked(&dlm->master_lock);
+
+	/* remove from list if not already */
+	if (!list_empty(&mle->list))
+		list_del(&mle->list);
+
+	/* detach the mle from the domain node up/down events */
+	__dlm_mle_detach_hb_events(dlm, mle);
+
+	/* NOTE: kfree under spinlock here.
+	 * if this is bad, we can move this to a freelist. */
+	kfree(mle);
+}
+
+
 /*
  * LOCK RESOURCE FUNCTIONS
  */

Modified: trunk/fs/ocfs2/dlm/dlmmod.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmod.h	2005-03-08 22:18:28 UTC (rev 1963)
+++ trunk/fs/ocfs2/dlm/dlmmod.h	2005-03-09 00:39:00 UTC (rev 1964)
@@ -347,7 +347,7 @@
 	spinlock_t spinlock;
 	wait_queue_head_t wq;
 	atomic_t woken;
-	atomic_t refcnt;
+	struct kref mle_refs;
 	unsigned long maybe_map[BITS_TO_LONGS(NM_MAX_NODES)];
 	unsigned long vote_map[BITS_TO_LONGS(NM_MAX_NODES)];
 	unsigned long response_map[BITS_TO_LONGS(NM_MAX_NODES)];

Modified: trunk/fs/ocfs2/dlm/dlmrecovery.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmrecovery.c	2005-03-08 22:18:28 UTC (rev 1963)
+++ trunk/fs/ocfs2/dlm/dlmrecovery.c	2005-03-09 00:39:00 UTC (rev 1964)
@@ -49,7 +49,7 @@
 #include "dlmcommon.h"
 #include "dlmmod.h"
 
-static void dlm_do_local_recovery_cleanup(dlm_ctxt *dlm, u8 dead_node, int locked);
+static void dlm_do_local_recovery_cleanup(dlm_ctxt *dlm, u8 dead_node);
 
 int dlm_recovery_thread(void *data);
 void dlm_complete_recovery_thread(dlm_ctxt *dlm);
@@ -65,721 +65,9 @@
 #define DLM_RECOVERY_THREAD_MS  2000
 
 
-#ifdef LOUSY_RECOVERY
 
-/*
- * RECOVERY THREAD
- */
-
-void dlm_kick_recovery_thread(dlm_ctxt *dlm)
-{
-	/* wake the recovery thread 
-	 * this will wake the reco thread in one of three places
-	 * 1) sleeping with no recovery happening
-	 * 2) sleeping with recovery mastered elsewhere 
-	 * 3) recovery mastered here, waiting on reco data */
-	atomic_set(&dlm->reco.thread.woken, 1);
-	wake_up(&dlm->reco.thread.thread_wq);
-}
-
-/* Launch the recovery thread */
-int dlm_launch_recovery_thread(dlm_ctxt *dlm)
-{
-	dlmprintk0("starting recovery thread...\n");
-	dlm->reco.thread.pid = kernel_thread (dlm_recovery_thread, dlm, CLONE_FS | CLONE_FILES | CLONE_SIGHAND);
-	if (dlm->reco.thread.pid < 0) {
-		dlmprintk("unable to launch recovery thread, error=%d", dlm->reco.thread.pid);
-		return -EINVAL;
-	}
-	dlmprintk0("recovery thread running...\n");
-	return 0;
-}
-
-void dlm_complete_recovery_thread(dlm_ctxt *dlm)
-{
-	dlmprintk0 ("waiting for recovery thread to exit....");
-	send_sig (SIGINT, dlm->reco.thread.task, 0);
-	wait_for_completion (&dlm->reco.thread.complete);
-	dlmprintk0 ("recovery thread exited\n");
-	dlm->reco.thread.task = NULL;
-}
-
-	/* 
-	 * this is lame, but here's how recovery works...
-	 * 1) all recovery threads cluster wide will work on recovering
-	 *    ONE node at a time
-	 * 2) negotiate who will take over all the locks for the dead node.
-	 *    thats right... ALL the locks.
-	 * 3) once a new master is chosen, everyone scans all locks
-	 *    and moves aside those mastered by the dead guy
-	 * 4) each of these locks should be locked until recovery is done
-	 * 5) the new master collects up all of secondary lock queue info
-	 *    one lock at a time, forcing each node to communicate back
-	 *    before continuing
-	 * 6) each secondary lock queue responds with the full known lock info
-	 * 7) once the new master has run all its locks, it sends a ALLDONE! 
-	 *    message to everyone
-	 * 8) upon receiving this message, the secondary queue node unlocks
-	 *    and responds to the ALLDONE
-	 * 9) once the new master gets responses from everyone, he unlocks 
-	 *    everything and recovery for this dead node is done
-	 *10) go back to 2) while there are still dead nodes
-	 *
-	 */
-
-
-
-int dlm_recovery_thread(void *data)
-{
-	int status, i;
-	int cnt = 0, dlm_num;
-	struct list_head *iter, *iter2, *tmpiter;
-	dlm_lock_resource *res;
-	char name[12];
-	dlm_ctxt *dlm = data;
-	u8 tmp;
-
-
-	dlm_num = nm_get_group_global_index(dlm->group);
-	sprintf(name, "dlmreco-%03u", dlm_num);
-	util_daemonize (name, strlen(name), 1);
-	dlm->reco.thread.task = current;
-
-	while (1) {
-		spin_lock(&dlm->spinlock);
-
-		/* check to see if the new master has died */
-		if (dlm->reco.new_master != NM_INVALID_SLOT_NUM &&
-		    test_bit(dlm->reco.new_master, dlm->recovery_map)) {
-			dlmprintk("new master %u died while recovering %u!\n",
-			       dlm->reco.new_master, dlm->reco.dead_node);
-			// unset the new_master, leave dead_node
-			dlm->reco.new_master = NM_INVALID_SLOT_NUM;
-		}
-
-		/* select a target to recover */
-		if (dlm->reco.dead_node == NM_INVALID_SLOT_NUM) {
-			dlm->reco.dead_node = find_next_bit (dlm->recovery_map, NM_MAX_NODES, 0);
-			if (dlm->reco.dead_node >= NM_MAX_NODES)
-				dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
-		} else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) {
-			// BUG?
-			dlmprintk("dead_node %u no longer in recovery map!\n",
-			       dlm->reco.dead_node);
-			dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
-		}
-
-		if (dlm->reco.dead_node == NM_INVALID_SLOT_NUM) {
-			dlmprintk0("nothing to recover!  sleeping now!\n");
-			spin_unlock(&dlm->spinlock);
-			goto sleep;
-		}
-		spin_unlock(&dlm->spinlock);
-
-		/* take write barrier */
-		/* (stops the list reshuffling thread, proxy ast handling) */
-		down_write(&dlm->recovery_sem);
-
-		/* choose a new master */
-		if (dlm->reco.new_master == NM_INVALID_SLOT_NUM) {
-			u8 new_dead_node = dlm->reco.dead_node;
-			dlm->reco.new_master = dlm_pick_recovery_master(dlm, &new_dead_node);
-			if (new_dead_node != dlm->reco.dead_node) {
-				// master wants to recover a different node
-				dlm->reco.dead_node = new_dead_node;
-				
-				// do local cleanup if heartbeat has not added the
-				// node to the recovery map yet
-				spin_lock(&dlm->spinlock);
-				if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) {
-					dlm_do_local_recovery_cleanup(dlm, dlm->reco.dead_node, 1);
-					set_bit(dlm->reco.dead_node, dlm->recovery_map);
-					clear_bit(dlm->reco.dead_node, dlm->node_map);
-				}
-				spin_unlock(&dlm->spinlock);
-			}
-		}
-	
-		dlmprintk("RECOVERY!  new_master=%u, this node=%u, dead_node=%u\n",
-			  dlm->reco.new_master, dlm->group_index, dlm->reco.dead_node);
-
-		if (dlm->reco.new_master != dlm->group_index) {
-			/* it is safe to start everything back up here
-			 * because all of the dead node's lock resources
-			 * have been marked as in-recovery */
-			up_write(&dlm->recovery_sem);
-			
-			// sit around until new_master is dead or done
-			// we will get signalled by the waitqueue either way
-			dlmprintk("new_master %u is recovering dead_node %u... waiting...\n",
-				  dlm->reco.new_master, dlm->reco.dead_node);
-sleep:
-			atomic_set(&dlm->reco.thread.woken, 0);
-			status = wait_event_interruptible_timeout(
-				  dlm->reco.thread.thread_wq,
-				  (atomic_read(&dlm->reco.thread.woken) == 1),
-				  msecs_to_jiffies(DLM_RECOVERY_THREAD_MS));
-
-			if (status >= 0) {
-				if (atomic_read(&dlm->reco.thread.woken))
-					dlmprintk0("aha!!! recovery thread woken!\n");
-				else
-					dlmprintk0("timed out waiting, running again\n");
-				continue;
-			}
-			dlmprintk("recovery thread got %d while waiting\n", status);
-			break;
-		}
-
-		/* new_master == dlm->group_index */
-		status = dlm_remaster_locks_local(dlm);
-		if (status < 0) {
-			dlmprintk("error remastering locks for node %u!!!!  retrying!\n",
-			       dlm->reco.dead_node);
-		} else {
-			// success!  see if any other nodes need recovery
-			spin_lock(&dlm->spinlock);
-			clear_bit(dlm->reco.dead_node, dlm->recovery_map);
-			spin_unlock(&dlm->spinlock);
-			dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
-			dlm->reco.new_master = NM_INVALID_SLOT_NUM;
-			dlm->reco.sending_node = NM_INVALID_SLOT_NUM;
-			dlm->reco.next_seq = 0;
-		}
-		up_write(&dlm->recovery_sem);
-		// continue and look for another dead node
-	}
-
-	flush_scheduled_work();
-	complete (&dlm->reco.thread.complete);
-	dlmprintk0("quitting recovery thread!!!!!!\n");
-	return 0;
-}
-
-/* +- if this node is NOT the new master... */
-/* +--- if master's dead_node is not the one we chose, do local cleanup again with proper dead_node */
-/* +---	wait for poll messages from new master: register net message handler, it will do the work */
-/* +--- check for death of new master */
-/* +--- if dead, unregister the handler, unset new_master, keep dead_node and goto "select a target" */
-/* |- on request, send header with number of packets, get response, then start blasting packets */
-/* |- retransmit any missed packets on request */
-/* |- once ALL DONE is received, run all locks again */
-/* +--- unset the RECOVERING flag */
-/* +--- set the new owner as new_master */
-/* +--- remove dead_node from recovery map */
-/* +--- unset new_master and dead_node and start all over */
-
-static spinlock_t dlm_reco_state_lock = SPIN_LOCK_UNLOCKED;
-
-
-static int dlm_remaster_locks_local(dlm_ctxt *dlm)
-{
-	int num_nodes = NM_MAX_NODES, next=0, status = 0;
-	dlm_reco_node_data *ndata;
-	struct list_head *iter;
-	int all_nodes_done;
-
-
-/* +- if this node is the new master, init the temp recovery area */
-/* |- poll each live node for lock state */
-/* |- collect the data from each node until node says it's done, or dead */
-/* +--- if node died, throw away temp recovery area, keep new_master and dead_node, goto "select a target" */
-/* |- apply all temp area changes to real lock */
-/* +- send ALL DONE message to each node */
-
-	status = dlm_init_recovery_area(dlm);
-	if (status < 0)
-		return status;
-
-	spin_lock(&dlm_reco_state_lock);
-	list_for_each(iter, &dlm->reco.node_data) {
-		ndata = list_entry (iter, dlm_reco_node_data, list);
-		DLM_ASSERT(ndata->state == DLM_RECO_NODE_DATA_INIT);
-		ndata->state = DLM_RECO_NODE_DATA_REQUESTING;
-
-		status = dlm_request_all_locks(dlm, ndata->node_num, dlm->reco.dead_node);
-		if (status < 0) {
-			dlm_destroy_recovery_area(dlm, dlm->reco.dead_node);
-			return status;
-		}
-
-		switch (ndata->state) {
-			case DLM_RECO_NODE_DATA_INIT:
-			case DLM_RECO_NODE_DATA_FINALIZE_SENT:
-			case DLM_RECO_NODE_DATA_REQUESTED:
-				DLM_ASSERT(0);
-				break;
-			case DLM_RECO_NODE_DATA_DEAD:
-				dlmprintk("eek.  node %u died after requesting recovery info for node %u\n",
-					  ndata->node_num, dlm->reco.dead_node);
-				spin_unlock(&dlm_reco_state_lock);
-				// start all over
-				dlm_destroy_recovery_area(dlm, dlm->reco.dead_node);
-				return -EAGAIN;
-			case DLM_RECO_NODE_DATA_REQUESTING:
-				ndata->state = DLM_RECO_NODE_DATA_REQUESTED;
-				dlmprintk("now receiving recovery data from node %u for dead node %u\n",
-					  ndata->node_num, dlm->reco.dead_node);
-				break;
-			case DLM_RECO_NODE_DATA_RECEIVING:
-				dlmprintk("already receiving recovery data from node %u for dead node %u\n",
-					  ndata->node_num, dlm->reco.dead_node);
-				break;
-			case DLM_RECO_NODE_DATA_DONE:
-				dlmprintk("already DONE receiving recovery data from node %u for dead node %u\n",
-					  ndata->node_num, dlm->reco.dead_node);
-				break;
-		}
-	}
-	spin_unlock(&dlm_reco_state_lock);
-
-	/* nodes should be sending reco data now
-	 * just need to wait */
-
-	while (1) {
-		/* wait to be signalled, with periodic timeout
-		 * to check for node death */
-		atomic_set(&dlm->reco.thread.woken, 0);
-		ret = wait_event_interruptible_timeout(dlm->reco.thread.thread_wq,
-				(atomic_read(&dlm->reco.thread.woken) ==  1),
-				msecs_to_jiffies(DLM_RECOVERY_THREAD_MS));
-		if (ret >= 0) {
-			if (atomic_read(&dlm->reco.thread.woken))
-				dlmprintk0("waiting on reco data... aha!!! recovery thread woken!\n");
-			else
-				dlmprintk0("waiting on reco data... timed out waiting\n");
-		}
-
-		/* either way, recheck all the nodes now to see if we are
-		 * done, or if anyone died */
-		all_nodes_done = 1;
-		spin_lock(&dlm_reco_state_lock);
-		list_for_each(iter, &dlm->reco.node_data) {
-			ndata = list_entry (iter, dlm_reco_node_data, list);
-	
-			switch (ndata->state) {
-				case DLM_RECO_NODE_DATA_INIT:
-				case DLM_RECO_NODE_DATA_REQUESTING:
-					DLM_ASSERT(0);
-					break;
-				case DLM_RECO_NODE_DATA_DEAD:
-					dlmprintk("eek.  node %u died after requesting recovery info for node %u\n",
-						  ndata->node_num, dlm->reco.dead_node);
-					spin_unlock(&dlm_reco_state_lock);
-					// start all over
-					dlm_destroy_recovery_area(dlm, dlm->reco.dead_node);
-					return -EAGAIN;
-				case DLM_RECO_NODE_DATA_RECEIVING:
-				case DLM_RECO_NODE_DATA_REQUESTED:
-					all_nodes_done = 0;
-					break;
-				case DLM_RECO_NODE_DATA_DONE:
-					break;
-				case DLM_RECO_NODE_DATA_FINALIZE_SENT:
-					break;
-			}
-		}
-		spin_unlock(&dlm_reco_state_lock);
-
-		if (all_nodes_done) {
-			/* all nodes are now in DLM_RECO_NODE_DATA_DONE state 
-	 		* just send a finalize message to everyone and 
-	 		* clean up */
-			ret = dlm_finalize_recovery(dlm);
-			if (ret < 0) {
-				dlmprintk("dlm_finalize_recovery returned %d\n", ret);
-			}
-			dlm_destroy_recovery_area(dlm, dlm->reco.dead_node);
-			status = ret;
-			break;
-		}
-	}
-
-	return status;
-}
-
-int dlm_init_recovery_area(dlm_ctxt *dlm)
-{
-	int num=0, ret;
-	dlm_reco_node_data *ndata;
-	LIST_HEAD(tmplist);
-
-	spin_lock(&dlm->spinlock);
-	memcpy(dlm->reco.node_map, dlm->node_map, sizeof(dlm->node_map));
-	/* nodes can only be removed (by dying) after dropping
-	 * this lock, and death will be trapped later, so this should do */
-	spin_unlock(&dlm->spinlock);
-	
-	while (1) {
-		num = find_next_bit (dlm->reco.node_map, NM_MAX_NODES, num);
-		if (num >= NM_MAX_NODES) {
-			break;
-		}
-		DLM_ASSERT(num != dead_node);
-
-		ndata = kmalloc(sizeof(dlm_reco_node_data), GFP_KERNEL);
-		if (!ndata) {
-			dlm_destroy_recovery_area(dlm, dead_node);
-			return -ENOMEM;
-		}
-		memset(ndata, 0, sizeof(dlm_reco_node_data));
-		ndata->node_num = num;
-		ndata->state = DLM_RECO_NODE_DATA_INIT;
-		LIST_HEAD_INIT(&ndata->granted);
-		LIST_HEAD_INIT(&ndata->converting);
-		LIST_HEAD_INIT(&ndata->blocked);
-		spin_lock(&dlm_reco_state_lock);
-		list_add_tail(&ndata->list, &dlm->reco.node_data);
-		spin_unlock(&dlm_reco_state_lock);
-		num++;
-	}
-
-	return 0;
-}
-
-void dlm_destroy_recovery_area(dlm_ctxt *dlm, u8 dead_node)
-{
-	struct list_head *iter, *iter2;
-	dlm_reco_node_data *ndata;
-	LIST_HEAD(tmplist);
-
-	spin_lock(&dlm_reco_state_lock);
-	list_splice_init(&dlm->reco.node_data, &tmplist);
-	spin_unlock(&dlm_reco_state_lock);
-
-#warning this probably needs to be smarter
-	list_for_each_safe(iter, iter2, &tmplist) {
-		ndata = list_entry (iter, dlm_reco_node_data, list);
-		kfree(ndata);
-	}
-}
-
-int dlm_request_all_locks(dlm_ctxt *dlm, u8 request_from, u8 dead_node)
-{
-	dlmprintk("dlm_request_all_locks: dead node is %u, sending request to %u\n",
-	       dead_node, request_from);
-	// send message
-	// sleep until all received or error
-	return 0;
-}
-
-typedef struct _dlm_reco_request_locks
-{
-	u8 dead_node;
-} dlm_reco_request_locks;
-
-typedef struct _dlm_reco_node_data
-{
-} dlm_reco_node_data;
-
-int dlm_request_all_locks_handler(net_msg *msg, u32 len, void *data)
-{
-#if 0
-	int status;
-	dlm_ctxt *dlm = data;
-	dlm_lock_resource *res;
-	dlm_lock *lock = NULL;
-	dlm_proxy_ast *past = (dlm_proxy_ast *) msg->buf;
-	struct qstr lockname = { .name=past->name, .len=past->namelen };
-	struct list_head *iter, *head=NULL;
-	u64 cookie = past->cookie;
-	u32 flags = past->flags;
-
-	if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
-	     (LKM_PUT_LVB|LKM_GET_LVB)) {
-		dlmprintk("both PUT and GET lvb specified\n");
-		return DLM_BADARGS;
-	}
-
-	dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : 
-		  (flags & LKM_GET_LVB ? "get lvb" : "none"));
-
-	lockname.hash = full_name_hash(lockname.name, lockname.len);
-	
-	dlmprintk("type=%d, blocked_type=%d\n", past->type, past->blocked_type);
-
-	if (past->type != DLM_AST && 
-	    past->type != DLM_BAST) {
-		dlmprintk("Eeeek unknown ast type! %d, cookie=%llu, name=%.*s\n", 
-		       past->type, cookie, lockname.len, lockname.name);
-		return 0;
-	}
-
-	res = dlm_lookup_lock(dlm, &lockname);
-	if (!res) {
-		dlmprintk("eek! got %sast for unknown lockres!  cookie=%llu, name=%.*s, namelen=%d\n", 
-		       past->type == DLM_AST ? "" : "b", cookie, lockname.len, lockname.name, lockname.len);
-		return 0;
-	}
-#endif
-
-}
-
-int dlm_reco_node_data_handler(net_msg *msg, u32 len, void *data)
-{
-#if 0
-	int status;
-	dlm_ctxt *dlm = data;
-	dlm_lock_resource *res;
-	dlm_lock *lock = NULL;
-	dlm_proxy_ast *past = (dlm_proxy_ast *) msg->buf;
-	struct qstr lockname = { .name=past->name, .len=past->namelen };
-	struct list_head *iter, *head=NULL;
-	u64 cookie = past->cookie;
-	u32 flags = past->flags;
-
-	if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
-	     (LKM_PUT_LVB|LKM_GET_LVB)) {
-		dlmprintk("both PUT and GET lvb specified\n");
-		return DLM_BADARGS;
-	}
-
-	dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : 
-		  (flags & LKM_GET_LVB ? "get lvb" : "none"));
-
-	lockname.hash = full_name_hash(lockname.name, lockname.len);
-	
-	dlmprintk("type=%d, blocked_type=%d\n", past->type, past->blocked_type);
-
-	if (past->type != DLM_AST && 
-	    past->type != DLM_BAST) {
-		dlmprintk("Eeeek unknown ast type! %d, cookie=%llu, name=%.*s\n", 
-		       past->type, cookie, lockname.len, lockname.name);
-		return 0;
-	}
-
-	res = dlm_lookup_lock(dlm, &lockname);
-	if (!res) {
-		dlmprintk("eek! got %sast for unknown lockres!  cookie=%llu, name=%.*s, namelen=%d\n", 
-		       past->type == DLM_AST ? "" : "b", cookie, lockname.len, lockname.name, lockname.len);
-		return 0;
-	}
-#endif
-}
-
-
-
-int dlm_recovery_request_handler(net_msg *msg, u32 len, void *data);
-int dlm_recovery_response_handler(net_msg *msg, u32 len, void *data);
-int dlm_recovery_lock_arr_req_handler(net_msg *msg, u32 len, void *data);
-
-typedef struct _dlm_reco_lock_info
-{
-	u8 node;
-	u8 unused1;
-	u64 cookie;
-	s8 type;
-	s8 convert_type;
-	u8 list;
-	u8 lockname_len;
-	u8 lockname[DLM_LOCKID_NAME_MAX];
-} dlm_reco_lock_info;
-
-enum {
-	DLM_RECO_MASTER_REQUEST, 
-	DLM_RECO_XMIT_LOCKS_REQUEST,
-	DLM_RECO_XMIT_LOCK_HDR_REQUEST,
-	DLM_RECO_XMIT_LOCK_ARR_REQUEST,
-	DLM_RECO_XMIT_COMPLETE_REQUEST,
-	DLM_RECO_ALL_DONE_REQUEST
-};
-
-enum {
-	DLM_RECO_NO_RESPONSE,
-	DLM_RECO_YES_RESPONSE
-};
-
-#define DLM_LOCKS_PER_PACKET   40
-
-typedef struct _dlm_reco_lock_arr_req
-{
-	u8 request_type;
-	u8 num_locks;
-	u8 dead_node;
-	u32 seqnum;
-	dlm_reco_lock_info lock[DLM_LOCKS_PER_PACKET];
-} dlm_reco_lock_arr_req;
-
-typedef struct _dlm_reco_request
-{
-	u8 request_type;
-	u8 unused1;
-	u8 dead_node;
-	u32 num;
-} dlm_reco_request;
-
-typedef struct _dlm_reco_response
-{
-	u8 response_type;
-	u8 unused1[7];
-} dlm_reco_response;
-
-static inline int dlm_reco_lock_info_valid(dlm_reco_lock_info *info)
-{
-	if (info->type != LKM_NLMODE &&
-	    info->type != LKM_PRMODE &&
-	    info->type != LKM_EXMODE)
-		return 0;
-	if (info->convert_type != LKM_NLMODE &&
-	    info->convert_type != LKM_PRMODE &&
-	    info->convert_type != LKM_EXMODE)
-		return 0;
-	if (info->list > 2)
-		return 0;
-	return 1;
-}
-
-static inline int dlm_check_reco_lock_arr_msg(net_msg *msg, dlm_ctxt *dlm, int *out_of_order);
-
-static inline int dlm_check_reco_lock_arr_msg(net_msg *msg, dlm_ctxt *dlm, int *out_of_order)
-{
-	int ret = -EINVAL;
-	dlm_reco_lock_arr_req *req = (dlm_reco_lock_arr_req *)msg->buf;
-	
-	/* check a bunch of ugly conditions */
-	*out_of_order = 0;
-	if (req->num_locks > DLM_LOCKS_PER_PACKET) {
-		dlmprintk("num_locks too large! %u\n", req->num_locks);
-	} else if (req->seqnum != dlm->reco.next_seq) {
-		dlmprintk("expected seq %lu from node %u, got %lu\n",
-		       dlm->reco.next_seq, msg->src_node,
-		       req->seqnum);
-		*out_of_order = 1;
-	} else if (dlm->reco.dead_node != req->dead_node) {
-		dlmprintk("bad lock array: dead node=%u, sent=%u\n",
-		       dlm->reco.dead_node != req->dead_node);
-	} else if (dlm->reco.new_master != dlm->group_index) {
-		dlmprintk0("this node is not the recovery master!\n");
-	} else if (dlm->reco.sending_node != msg->src_node ||
-		 dlm->group_index == msg->dest_node) {
-		dlmprintk0("eek. sending_node=%u, actual=%u, dest=%u, me=%u\n",
-		       dlm->reco.sending_node, msg->src_node, 
-		       msg->dest_node, dlm->group_index);
-	} else
-		ret = 0;
-	return ret;
-}
-
-
-int dlm_recovery_lock_arr_req_handler(net_msg *msg, u32 len, void *data)
-{
-	dlm_ctxt *dlm = data;
-	dlm_reco_lock_arr_req *req = (dlm_reco_lock_arr_req *)msg->buf;
-	dlm_lock_resource *res = NULL;
-	dlm_reco_lock_info *info;
-	dlm_lock **newlocks = NULL;
-	dlm_lock *lock = NULL;
-	int ret, i, out_of_order = 0;
-	
-	ret = 0;
-	if (req->num_locks == 0)
-		goto send_response;
-
-	/* check to see if it's worth kmallocing */
-	spin_lock(&dlm->spinlock);
-	ret = dlm_check_reco_lock_arr_msg(msg, dlm, &out_of_order);
-	spin_unlock(&dlm->spinlock);
-	if (ret < 0)
-		goto send_response;
-
-	newlocks = kmalloc(req->num_locks * sizeof(dlm_lock *), GFP_KERNEL);
-	if (!newlocks) {
-		dlmprintk0("failed to alloc temp lock array!\n");
-		ret = -ENOMEM;
-		goto send_response;
-	}
-	memset(newlocks, 0, req->num_locks * sizeof(dlm_lock *));
-	for (i=0; i<req->num_locks; i++) {
-		info = &(req->lock[i]);
-		if (!dlm_reco_lock_info_valid(info)) {
-			ret = -EINVAL;
-			goto send_response;
-		}
-		lock = newlocks[i] = kmem_cache_alloc(dlm_lock_cache, GFP_KERNEL);
-		if (!newlocks[i]) {
-			ret = -ENOMEM;
-			goto send_response;
-		}
-		memset(lock, 0, sizeof(dlm_lock));
-		LIST_HEAD_INIT(&lock->list);
-		LIST_HEAD_INIT(&lock->ast_list);
-		spin_lock_init(&lock->spinlock);
-		lock->type = info->type;
-		lock->convert_type = info->convert_type;
-		lock->ml.node = dlm->group_index;
-		//atomic_set(&lock->ast_lock, 0);
-		//atomic_set(&lock->bast_lock, 0);
-		lock->ast = NULL;
-		lock->bast = NULL;
-		lock->astdata = (void *)info->list;   // cheating here...
-		lock->cookie = info->cookie;	
-	}
-
-	spin_lock(&dlm->spinlock);
-	/* ok now that everything is allocated and the lock has
-	 * been taken again, recheck all those stupid conditions */
-	ret = dlm_check_reco_lock_arr_msg(msg, dlm, &out_of_order);
-	if (ret < 0) {
-		spin_unlock(&dlm->spinlock);
-		goto send_response;
-	}
-	for (i=0; i<req->num_locks; i++) {
-		info = &(req->lock[i]);
-		lock = newlocks[i];
-		list_add_tail(&lock->list, &dlm->reco.received);
-	}
-	spin_unlock(&dlm->spinlock);
-
-send_response:
-	if (newlocks) {
-		if (ret < 0) {
-			for (i=0; i<req->num_locks; i++)
-				if (newlocks[i])
-					kmem_cache_free(dlm_reco_lock_info_cache, newlocks[i]);
-		}
-		kfree(newlocks);
-	}
-
-	return ret;
-}
-int dlm_recovery_request_handler(net_msg *msg, u32 len, void *data)
-{
-	dlm_ctxt *dlm = data;
-}
-int dlm_recovery_response_handler(net_msg *msg, u32 len, void *data)
-{
-	dlm_ctxt *dlm = data;
-}
-
-
-
-
-
-static int dlm_send_reco_request(dlm_ctxt *dlm, dlm_reco_request *buf, u8 to, struct inode *node)
-{
-	int ret;
-	net_msg *msg = net_package_message(DLM_NET_RECOVERY_REQUEST_MSG_TYPE, 
-				  dlm->key, buf, sizeof(*buf), 
-				  dlm->group_index, to);
-	if (!msg)
-		return -ENOMEM;
-	ret = net_send_udp_msg (node, msg, sizeof(*buf));
-	kfree(msg);
-	return ret;
-}
-
-static int dlm_recover_domain(dlm_ctxt *dlm)
-{
-
-	
-	return 0;
-}
-
-
-#endif  /* LOUSY_RECOVERY */
-
 #warning may need to change kfree to put_lock and refcounting here
-static void dlm_do_local_recovery_cleanup(dlm_ctxt *dlm, u8 dead_node, int locked)
+static void dlm_do_local_recovery_cleanup(dlm_ctxt *dlm, u8 dead_node)
 {
 	struct list_head *iter, *iter2, *tmpiter;
 	dlm_lock_resource *res;
@@ -787,9 +75,6 @@
 	int i;
 	struct list_head *bucket;
 	
-	if (!locked)	
-		spin_lock(&dlm->spinlock);
-
 	for (i=0; i<DLM_HASH_SIZE; i++) {
 		bucket = &(dlm->resources[i]);
 		list_for_each(iter, bucket) {
@@ -826,8 +111,6 @@
 		}
 	}
 
-	if (!locked)
-		spin_unlock(&dlm->spinlock);
 }
 
 
@@ -858,7 +141,7 @@
 		dlmprintk("node %u already added to recovery map!\n", idx);
 	else {
 		set_bit(idx, dlm->recovery_map);
-		dlm_do_local_recovery_cleanup(dlm, idx, 1);
+		dlm_do_local_recovery_cleanup(dlm, idx);
 	}
 	spin_unlock(&dlm->spinlock);
 



More information about the Ocfs2-commits mailing list