[Ocfs2-commits] khackel commits r1964 - trunk/fs/ocfs2/dlm
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Tue Mar 8 18:39:02 CST 2005
Author: khackel
Signed-off-by: mfasheh
Date: 2005-03-08 18:39:00 -0600 (Tue, 08 Mar 2005)
New Revision: 1964
Modified:
trunk/fs/ocfs2/dlm/dlmmaster.c
trunk/fs/ocfs2/dlm/dlmmod.h
trunk/fs/ocfs2/dlm/dlmrecovery.c
Log:
* changed refcounting of master list entries from atomic_t to kref
Signed-off-by: mfasheh
Modified: trunk/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmaster.c 2005-03-08 22:18:28 UTC (rev 1963)
+++ trunk/fs/ocfs2/dlm/dlmmaster.c 2005-03-09 00:39:00 UTC (rev 1964)
@@ -70,11 +70,13 @@
spin_lock(&dlm->master_lock);
list_for_each(iter, &dlm->master_list) {
+ struct kref *k;
mle = list_entry(iter, dlm_master_list_entry, list);
+ k = &mle->mle_refs;
type = (mle->type == DLM_MLE_BLOCK ? "BLK" : "MAS");
err = (mle->error ? 'Y' : 'N');
- refs = atomic_read(&mle->refcnt);
+ refs = atomic_read(&k->refcount);
master = mle->master;
attached = (list_empty(&mle->hb_events) ? 'N' : 'Y');
@@ -116,6 +118,7 @@
#endif
+static void dlm_mle_release(struct kref *kref);
static void dlm_init_mle(dlm_master_list_entry *mle,
enum dlm_mle_type type,
dlm_ctxt *dlm,
@@ -172,12 +175,19 @@
}
-static inline void dlm_mle_detach_hb_events(dlm_ctxt *dlm,
+static inline void __dlm_mle_detach_hb_events(dlm_ctxt *dlm,
+ dlm_master_list_entry *mle)
+{
+ if (!list_empty(&mle->hb_events))
+ list_del_init(&mle->hb_events);
+}
+
+
+static inline void dlm_mle_detach_hb_events(dlm_ctxt *dlm,
dlm_master_list_entry *mle)
{
spin_lock(&dlm->spinlock);
- if (!list_empty(&mle->hb_events))
- list_del_init(&mle->hb_events);
+ __dlm_mle_detach_hb_events(dlm, mle);
spin_unlock(&dlm->spinlock);
}
@@ -190,21 +200,16 @@
DLM_ASSERT(mle->dlm);
dlm = mle->dlm;
- if (atomic_dec_and_lock(&mle->refcnt, &dlm->master_lock)) {
- /* remove from list if not already */
- if (!list_empty(&mle->list))
- list_del(&mle->list);
- spin_unlock(&dlm->master_lock);
-
- /* detach the mle from the domain node up/down events */
- dlm_mle_detach_hb_events(dlm, mle);
- kfree(mle);
- }
+ spin_lock(&dlm->spinlock);
+ spin_lock(&dlm->master_lock);
+ kref_put(&mle->mle_refs, dlm_mle_release);
+ spin_unlock(&dlm->master_lock);
+ spin_unlock(&dlm->spinlock);
}
static inline void dlm_get_mle(dlm_master_list_entry *mle)
{
- atomic_inc(&mle->refcnt);
+ kref_get(&mle->mle_refs);
}
@@ -225,7 +230,7 @@
spin_lock_init(&mle->spinlock);
init_waitqueue_head(&mle->wq);
atomic_set(&mle->woken, 0);
- atomic_set(&mle->refcnt, 1);
+ kref_init(&mle->mle_refs, dlm_mle_release);
memset(mle->response_map, 0, sizeof(mle->response_map));
mle->master = NM_MAX_NODES;
mle->error = 0;
@@ -322,6 +327,36 @@
}
+static void dlm_mle_release(struct kref *kref)
+{
+ dlm_master_list_entry *mle;
+ dlm_ctxt *dlm;
+
+ dlmprintk0("\n");
+
+ DLM_ASSERT(kref);
+
+ mle = container_of(kref, dlm_master_list_entry, mle_refs);
+
+ DLM_ASSERT(mle->dlm);
+ dlm = mle->dlm;
+
+ assert_spin_locked(&dlm->spinlock);
+ assert_spin_locked(&dlm->master_lock);
+
+ /* remove from list if not already */
+ if (!list_empty(&mle->list))
+ list_del(&mle->list);
+
+ /* detach the mle from the domain node up/down events */
+ __dlm_mle_detach_hb_events(dlm, mle);
+
+ /* NOTE: kfree under spinlock here.
+ * if this is bad, we can move this to a freelist. */
+ kfree(mle);
+}
+
+
/*
* LOCK RESOURCE FUNCTIONS
*/
Modified: trunk/fs/ocfs2/dlm/dlmmod.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmod.h 2005-03-08 22:18:28 UTC (rev 1963)
+++ trunk/fs/ocfs2/dlm/dlmmod.h 2005-03-09 00:39:00 UTC (rev 1964)
@@ -347,7 +347,7 @@
spinlock_t spinlock;
wait_queue_head_t wq;
atomic_t woken;
- atomic_t refcnt;
+ struct kref mle_refs;
unsigned long maybe_map[BITS_TO_LONGS(NM_MAX_NODES)];
unsigned long vote_map[BITS_TO_LONGS(NM_MAX_NODES)];
unsigned long response_map[BITS_TO_LONGS(NM_MAX_NODES)];
Modified: trunk/fs/ocfs2/dlm/dlmrecovery.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmrecovery.c 2005-03-08 22:18:28 UTC (rev 1963)
+++ trunk/fs/ocfs2/dlm/dlmrecovery.c 2005-03-09 00:39:00 UTC (rev 1964)
@@ -49,7 +49,7 @@
#include "dlmcommon.h"
#include "dlmmod.h"
-static void dlm_do_local_recovery_cleanup(dlm_ctxt *dlm, u8 dead_node, int locked);
+static void dlm_do_local_recovery_cleanup(dlm_ctxt *dlm, u8 dead_node);
int dlm_recovery_thread(void *data);
void dlm_complete_recovery_thread(dlm_ctxt *dlm);
@@ -65,721 +65,9 @@
#define DLM_RECOVERY_THREAD_MS 2000
-#ifdef LOUSY_RECOVERY
-/*
- * RECOVERY THREAD
- */
-
-void dlm_kick_recovery_thread(dlm_ctxt *dlm)
-{
- /* wake the recovery thread
- * this will wake the reco thread in one of three places
- * 1) sleeping with no recovery happening
- * 2) sleeping with recovery mastered elsewhere
- * 3) recovery mastered here, waiting on reco data */
- atomic_set(&dlm->reco.thread.woken, 1);
- wake_up(&dlm->reco.thread.thread_wq);
-}
-
-/* Launch the recovery thread */
-int dlm_launch_recovery_thread(dlm_ctxt *dlm)
-{
- dlmprintk0("starting recovery thread...\n");
- dlm->reco.thread.pid = kernel_thread (dlm_recovery_thread, dlm, CLONE_FS | CLONE_FILES | CLONE_SIGHAND);
- if (dlm->reco.thread.pid < 0) {
- dlmprintk("unable to launch recovery thread, error=%d", dlm->reco.thread.pid);
- return -EINVAL;
- }
- dlmprintk0("recovery thread running...\n");
- return 0;
-}
-
-void dlm_complete_recovery_thread(dlm_ctxt *dlm)
-{
- dlmprintk0 ("waiting for recovery thread to exit....");
- send_sig (SIGINT, dlm->reco.thread.task, 0);
- wait_for_completion (&dlm->reco.thread.complete);
- dlmprintk0 ("recovery thread exited\n");
- dlm->reco.thread.task = NULL;
-}
-
- /*
- * this is lame, but here's how recovery works...
- * 1) all recovery threads cluster wide will work on recovering
- * ONE node at a time
- * 2) negotiate who will take over all the locks for the dead node.
- * thats right... ALL the locks.
- * 3) once a new master is chosen, everyone scans all locks
- * and moves aside those mastered by the dead guy
- * 4) each of these locks should be locked until recovery is done
- * 5) the new master collects up all of secondary lock queue info
- * one lock at a time, forcing each node to communicate back
- * before continuing
- * 6) each secondary lock queue responds with the full known lock info
- * 7) once the new master has run all its locks, it sends a ALLDONE!
- * message to everyone
- * 8) upon receiving this message, the secondary queue node unlocks
- * and responds to the ALLDONE
- * 9) once the new master gets responses from everyone, he unlocks
- * everything and recovery for this dead node is done
- *10) go back to 2) while there are still dead nodes
- *
- */
-
-
-
-int dlm_recovery_thread(void *data)
-{
- int status, i;
- int cnt = 0, dlm_num;
- struct list_head *iter, *iter2, *tmpiter;
- dlm_lock_resource *res;
- char name[12];
- dlm_ctxt *dlm = data;
- u8 tmp;
-
-
- dlm_num = nm_get_group_global_index(dlm->group);
- sprintf(name, "dlmreco-%03u", dlm_num);
- util_daemonize (name, strlen(name), 1);
- dlm->reco.thread.task = current;
-
- while (1) {
- spin_lock(&dlm->spinlock);
-
- /* check to see if the new master has died */
- if (dlm->reco.new_master != NM_INVALID_SLOT_NUM &&
- test_bit(dlm->reco.new_master, dlm->recovery_map)) {
- dlmprintk("new master %u died while recovering %u!\n",
- dlm->reco.new_master, dlm->reco.dead_node);
- // unset the new_master, leave dead_node
- dlm->reco.new_master = NM_INVALID_SLOT_NUM;
- }
-
- /* select a target to recover */
- if (dlm->reco.dead_node == NM_INVALID_SLOT_NUM) {
- dlm->reco.dead_node = find_next_bit (dlm->recovery_map, NM_MAX_NODES, 0);
- if (dlm->reco.dead_node >= NM_MAX_NODES)
- dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
- } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) {
- // BUG?
- dlmprintk("dead_node %u no longer in recovery map!\n",
- dlm->reco.dead_node);
- dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
- }
-
- if (dlm->reco.dead_node == NM_INVALID_SLOT_NUM) {
- dlmprintk0("nothing to recover! sleeping now!\n");
- spin_unlock(&dlm->spinlock);
- goto sleep;
- }
- spin_unlock(&dlm->spinlock);
-
- /* take write barrier */
- /* (stops the list reshuffling thread, proxy ast handling) */
- down_write(&dlm->recovery_sem);
-
- /* choose a new master */
- if (dlm->reco.new_master == NM_INVALID_SLOT_NUM) {
- u8 new_dead_node = dlm->reco.dead_node;
- dlm->reco.new_master = dlm_pick_recovery_master(dlm, &new_dead_node);
- if (new_dead_node != dlm->reco.dead_node) {
- // master wants to recover a different node
- dlm->reco.dead_node = new_dead_node;
-
- // do local cleanup if heartbeat has not added the
- // node to the recovery map yet
- spin_lock(&dlm->spinlock);
- if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) {
- dlm_do_local_recovery_cleanup(dlm, dlm->reco.dead_node, 1);
- set_bit(dlm->reco.dead_node, dlm->recovery_map);
- clear_bit(dlm->reco.dead_node, dlm->node_map);
- }
- spin_unlock(&dlm->spinlock);
- }
- }
-
- dlmprintk("RECOVERY! new_master=%u, this node=%u, dead_node=%u\n",
- dlm->reco.new_master, dlm->group_index, dlm->reco.dead_node);
-
- if (dlm->reco.new_master != dlm->group_index) {
- /* it is safe to start everything back up here
- * because all of the dead node's lock resources
- * have been marked as in-recovery */
- up_write(&dlm->recovery_sem);
-
- // sit around until new_master is dead or done
- // we will get signalled by the waitqueue either way
- dlmprintk("new_master %u is recovering dead_node %u... waiting...\n",
- dlm->reco.new_master, dlm->reco.dead_node);
-sleep:
- atomic_set(&dlm->reco.thread.woken, 0);
- status = wait_event_interruptible_timeout(
- dlm->reco.thread.thread_wq,
- (atomic_read(&dlm->reco.thread.woken) == 1),
- msecs_to_jiffies(DLM_RECOVERY_THREAD_MS));
-
- if (status >= 0) {
- if (atomic_read(&dlm->reco.thread.woken))
- dlmprintk0("aha!!! recovery thread woken!\n");
- else
- dlmprintk0("timed out waiting, running again\n");
- continue;
- }
- dlmprintk("recovery thread got %d while waiting\n", status);
- break;
- }
-
- /* new_master == dlm->group_index */
- status = dlm_remaster_locks_local(dlm);
- if (status < 0) {
- dlmprintk("error remastering locks for node %u!!!! retrying!\n",
- dlm->reco.dead_node);
- } else {
- // success! see if any other nodes need recovery
- spin_lock(&dlm->spinlock);
- clear_bit(dlm->reco.dead_node, dlm->recovery_map);
- spin_unlock(&dlm->spinlock);
- dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
- dlm->reco.new_master = NM_INVALID_SLOT_NUM;
- dlm->reco.sending_node = NM_INVALID_SLOT_NUM;
- dlm->reco.next_seq = 0;
- }
- up_write(&dlm->recovery_sem);
- // continue and look for another dead node
- }
-
- flush_scheduled_work();
- complete (&dlm->reco.thread.complete);
- dlmprintk0("quitting recovery thread!!!!!!\n");
- return 0;
-}
-
-/* +- if this node is NOT the new master... */
-/* +--- if master's dead_node is not the one we chose, do local cleanup again with proper dead_node */
-/* +--- wait for poll messages from new master: register net message handler, it will do the work */
-/* +--- check for death of new master */
-/* +--- if dead, unregister the handler, unset new_master, keep dead_node and goto "select a target" */
-/* |- on request, send header with number of packets, get response, then start blasting packets */
-/* |- retransmit any missed packets on request */
-/* |- once ALL DONE is received, run all locks again */
-/* +--- unset the RECOVERING flag */
-/* +--- set the new owner as new_master */
-/* +--- remove dead_node from recovery map */
-/* +--- unset new_master and dead_node and start all over */
-
-static spinlock_t dlm_reco_state_lock = SPIN_LOCK_UNLOCKED;
-
-
-static int dlm_remaster_locks_local(dlm_ctxt *dlm)
-{
- int num_nodes = NM_MAX_NODES, next=0, status = 0;
- dlm_reco_node_data *ndata;
- struct list_head *iter;
- int all_nodes_done;
-
-
-/* +- if this node is the new master, init the temp recovery area */
-/* |- poll each live node for lock state */
-/* |- collect the data from each node until node says it's done, or dead */
-/* +--- if node died, throw away temp recovery area, keep new_master and dead_node, goto "select a target" */
-/* |- apply all temp area changes to real lock */
-/* +- send ALL DONE message to each node */
-
- status = dlm_init_recovery_area(dlm);
- if (status < 0)
- return status;
-
- spin_lock(&dlm_reco_state_lock);
- list_for_each(iter, &dlm->reco.node_data) {
- ndata = list_entry (iter, dlm_reco_node_data, list);
- DLM_ASSERT(ndata->state == DLM_RECO_NODE_DATA_INIT);
- ndata->state = DLM_RECO_NODE_DATA_REQUESTING;
-
- status = dlm_request_all_locks(dlm, ndata->node_num, dlm->reco.dead_node);
- if (status < 0) {
- dlm_destroy_recovery_area(dlm, dlm->reco.dead_node);
- return status;
- }
-
- switch (ndata->state) {
- case DLM_RECO_NODE_DATA_INIT:
- case DLM_RECO_NODE_DATA_FINALIZE_SENT:
- case DLM_RECO_NODE_DATA_REQUESTED:
- DLM_ASSERT(0);
- break;
- case DLM_RECO_NODE_DATA_DEAD:
- dlmprintk("eek. node %u died after requesting recovery info for node %u\n",
- ndata->node_num, dlm->reco.dead_node);
- spin_unlock(&dlm_reco_state_lock);
- // start all over
- dlm_destroy_recovery_area(dlm, dlm->reco.dead_node);
- return -EAGAIN;
- case DLM_RECO_NODE_DATA_REQUESTING:
- ndata->state = DLM_RECO_NODE_DATA_REQUESTED;
- dlmprintk("now receiving recovery data from node %u for dead node %u\n",
- ndata->node_num, dlm->reco.dead_node);
- break;
- case DLM_RECO_NODE_DATA_RECEIVING:
- dlmprintk("already receiving recovery data from node %u for dead node %u\n",
- ndata->node_num, dlm->reco.dead_node);
- break;
- case DLM_RECO_NODE_DATA_DONE:
- dlmprintk("already DONE receiving recovery data from node %u for dead node %u\n",
- ndata->node_num, dlm->reco.dead_node);
- break;
- }
- }
- spin_unlock(&dlm_reco_state_lock);
-
- /* nodes should be sending reco data now
- * just need to wait */
-
- while (1) {
- /* wait to be signalled, with periodic timeout
- * to check for node death */
- atomic_set(&dlm->reco.thread.woken, 0);
- ret = wait_event_interruptible_timeout(dlm->reco.thread.thread_wq,
- (atomic_read(&dlm->reco.thread.woken) == 1),
- msecs_to_jiffies(DLM_RECOVERY_THREAD_MS));
- if (ret >= 0) {
- if (atomic_read(&dlm->reco.thread.woken))
- dlmprintk0("waiting on reco data... aha!!! recovery thread woken!\n");
- else
- dlmprintk0("waiting on reco data... timed out waiting\n");
- }
-
- /* either way, recheck all the nodes now to see if we are
- * done, or if anyone died */
- all_nodes_done = 1;
- spin_lock(&dlm_reco_state_lock);
- list_for_each(iter, &dlm->reco.node_data) {
- ndata = list_entry (iter, dlm_reco_node_data, list);
-
- switch (ndata->state) {
- case DLM_RECO_NODE_DATA_INIT:
- case DLM_RECO_NODE_DATA_REQUESTING:
- DLM_ASSERT(0);
- break;
- case DLM_RECO_NODE_DATA_DEAD:
- dlmprintk("eek. node %u died after requesting recovery info for node %u\n",
- ndata->node_num, dlm->reco.dead_node);
- spin_unlock(&dlm_reco_state_lock);
- // start all over
- dlm_destroy_recovery_area(dlm, dlm->reco.dead_node);
- return -EAGAIN;
- case DLM_RECO_NODE_DATA_RECEIVING:
- case DLM_RECO_NODE_DATA_REQUESTED:
- all_nodes_done = 0;
- break;
- case DLM_RECO_NODE_DATA_DONE:
- break;
- case DLM_RECO_NODE_DATA_FINALIZE_SENT:
- break;
- }
- }
- spin_unlock(&dlm_reco_state_lock);
-
- if (all_nodes_done) {
- /* all nodes are now in DLM_RECO_NODE_DATA_DONE state
- * just send a finalize message to everyone and
- * clean up */
- ret = dlm_finalize_recovery(dlm);
- if (ret < 0) {
- dlmprintk("dlm_finalize_recovery returned %d\n", ret);
- }
- dlm_destroy_recovery_area(dlm, dlm->reco.dead_node);
- status = ret;
- break;
- }
- }
-
- return status;
-}
-
-int dlm_init_recovery_area(dlm_ctxt *dlm)
-{
- int num=0, ret;
- dlm_reco_node_data *ndata;
- LIST_HEAD(tmplist);
-
- spin_lock(&dlm->spinlock);
- memcpy(dlm->reco.node_map, dlm->node_map, sizeof(dlm->node_map));
- /* nodes can only be removed (by dying) after dropping
- * this lock, and death will be trapped later, so this should do */
- spin_unlock(&dlm->spinlock);
-
- while (1) {
- num = find_next_bit (dlm->reco.node_map, NM_MAX_NODES, num);
- if (num >= NM_MAX_NODES) {
- break;
- }
- DLM_ASSERT(num != dead_node);
-
- ndata = kmalloc(sizeof(dlm_reco_node_data), GFP_KERNEL);
- if (!ndata) {
- dlm_destroy_recovery_area(dlm, dead_node);
- return -ENOMEM;
- }
- memset(ndata, 0, sizeof(dlm_reco_node_data));
- ndata->node_num = num;
- ndata->state = DLM_RECO_NODE_DATA_INIT;
- LIST_HEAD_INIT(&ndata->granted);
- LIST_HEAD_INIT(&ndata->converting);
- LIST_HEAD_INIT(&ndata->blocked);
- spin_lock(&dlm_reco_state_lock);
- list_add_tail(&ndata->list, &dlm->reco.node_data);
- spin_unlock(&dlm_reco_state_lock);
- num++;
- }
-
- return 0;
-}
-
-void dlm_destroy_recovery_area(dlm_ctxt *dlm, u8 dead_node)
-{
- struct list_head *iter, *iter2;
- dlm_reco_node_data *ndata;
- LIST_HEAD(tmplist);
-
- spin_lock(&dlm_reco_state_lock);
- list_splice_init(&dlm->reco.node_data, &tmplist);
- spin_unlock(&dlm_reco_state_lock);
-
-#warning this probably needs to be smarter
- list_for_each_safe(iter, iter2, &tmplist) {
- ndata = list_entry (iter, dlm_reco_node_data, list);
- kfree(ndata);
- }
-}
-
-int dlm_request_all_locks(dlm_ctxt *dlm, u8 request_from, u8 dead_node)
-{
- dlmprintk("dlm_request_all_locks: dead node is %u, sending request to %u\n",
- dead_node, request_from);
- // send message
- // sleep until all received or error
- return 0;
-}
-
-typedef struct _dlm_reco_request_locks
-{
- u8 dead_node;
-} dlm_reco_request_locks;
-
-typedef struct _dlm_reco_node_data
-{
-} dlm_reco_node_data;
-
-int dlm_request_all_locks_handler(net_msg *msg, u32 len, void *data)
-{
-#if 0
- int status;
- dlm_ctxt *dlm = data;
- dlm_lock_resource *res;
- dlm_lock *lock = NULL;
- dlm_proxy_ast *past = (dlm_proxy_ast *) msg->buf;
- struct qstr lockname = { .name=past->name, .len=past->namelen };
- struct list_head *iter, *head=NULL;
- u64 cookie = past->cookie;
- u32 flags = past->flags;
-
- if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
- (LKM_PUT_LVB|LKM_GET_LVB)) {
- dlmprintk("both PUT and GET lvb specified\n");
- return DLM_BADARGS;
- }
-
- dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" :
- (flags & LKM_GET_LVB ? "get lvb" : "none"));
-
- lockname.hash = full_name_hash(lockname.name, lockname.len);
-
- dlmprintk("type=%d, blocked_type=%d\n", past->type, past->blocked_type);
-
- if (past->type != DLM_AST &&
- past->type != DLM_BAST) {
- dlmprintk("Eeeek unknown ast type! %d, cookie=%llu, name=%.*s\n",
- past->type, cookie, lockname.len, lockname.name);
- return 0;
- }
-
- res = dlm_lookup_lock(dlm, &lockname);
- if (!res) {
- dlmprintk("eek! got %sast for unknown lockres! cookie=%llu, name=%.*s, namelen=%d\n",
- past->type == DLM_AST ? "" : "b", cookie, lockname.len, lockname.name, lockname.len);
- return 0;
- }
-#endif
-
-}
-
-int dlm_reco_node_data_handler(net_msg *msg, u32 len, void *data)
-{
-#if 0
- int status;
- dlm_ctxt *dlm = data;
- dlm_lock_resource *res;
- dlm_lock *lock = NULL;
- dlm_proxy_ast *past = (dlm_proxy_ast *) msg->buf;
- struct qstr lockname = { .name=past->name, .len=past->namelen };
- struct list_head *iter, *head=NULL;
- u64 cookie = past->cookie;
- u32 flags = past->flags;
-
- if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
- (LKM_PUT_LVB|LKM_GET_LVB)) {
- dlmprintk("both PUT and GET lvb specified\n");
- return DLM_BADARGS;
- }
-
- dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" :
- (flags & LKM_GET_LVB ? "get lvb" : "none"));
-
- lockname.hash = full_name_hash(lockname.name, lockname.len);
-
- dlmprintk("type=%d, blocked_type=%d\n", past->type, past->blocked_type);
-
- if (past->type != DLM_AST &&
- past->type != DLM_BAST) {
- dlmprintk("Eeeek unknown ast type! %d, cookie=%llu, name=%.*s\n",
- past->type, cookie, lockname.len, lockname.name);
- return 0;
- }
-
- res = dlm_lookup_lock(dlm, &lockname);
- if (!res) {
- dlmprintk("eek! got %sast for unknown lockres! cookie=%llu, name=%.*s, namelen=%d\n",
- past->type == DLM_AST ? "" : "b", cookie, lockname.len, lockname.name, lockname.len);
- return 0;
- }
-#endif
-}
-
-
-
-int dlm_recovery_request_handler(net_msg *msg, u32 len, void *data);
-int dlm_recovery_response_handler(net_msg *msg, u32 len, void *data);
-int dlm_recovery_lock_arr_req_handler(net_msg *msg, u32 len, void *data);
-
-typedef struct _dlm_reco_lock_info
-{
- u8 node;
- u8 unused1;
- u64 cookie;
- s8 type;
- s8 convert_type;
- u8 list;
- u8 lockname_len;
- u8 lockname[DLM_LOCKID_NAME_MAX];
-} dlm_reco_lock_info;
-
-enum {
- DLM_RECO_MASTER_REQUEST,
- DLM_RECO_XMIT_LOCKS_REQUEST,
- DLM_RECO_XMIT_LOCK_HDR_REQUEST,
- DLM_RECO_XMIT_LOCK_ARR_REQUEST,
- DLM_RECO_XMIT_COMPLETE_REQUEST,
- DLM_RECO_ALL_DONE_REQUEST
-};
-
-enum {
- DLM_RECO_NO_RESPONSE,
- DLM_RECO_YES_RESPONSE
-};
-
-#define DLM_LOCKS_PER_PACKET 40
-
-typedef struct _dlm_reco_lock_arr_req
-{
- u8 request_type;
- u8 num_locks;
- u8 dead_node;
- u32 seqnum;
- dlm_reco_lock_info lock[DLM_LOCKS_PER_PACKET];
-} dlm_reco_lock_arr_req;
-
-typedef struct _dlm_reco_request
-{
- u8 request_type;
- u8 unused1;
- u8 dead_node;
- u32 num;
-} dlm_reco_request;
-
-typedef struct _dlm_reco_response
-{
- u8 response_type;
- u8 unused1[7];
-} dlm_reco_response;
-
-static inline int dlm_reco_lock_info_valid(dlm_reco_lock_info *info)
-{
- if (info->type != LKM_NLMODE &&
- info->type != LKM_PRMODE &&
- info->type != LKM_EXMODE)
- return 0;
- if (info->convert_type != LKM_NLMODE &&
- info->convert_type != LKM_PRMODE &&
- info->convert_type != LKM_EXMODE)
- return 0;
- if (info->list > 2)
- return 0;
- return 1;
-}
-
-static inline int dlm_check_reco_lock_arr_msg(net_msg *msg, dlm_ctxt *dlm, int *out_of_order);
-
-static inline int dlm_check_reco_lock_arr_msg(net_msg *msg, dlm_ctxt *dlm, int *out_of_order)
-{
- int ret = -EINVAL;
- dlm_reco_lock_arr_req *req = (dlm_reco_lock_arr_req *)msg->buf;
-
- /* check a bunch of ugly conditions */
- *out_of_order = 0;
- if (req->num_locks > DLM_LOCKS_PER_PACKET) {
- dlmprintk("num_locks too large! %u\n", req->num_locks);
- } else if (req->seqnum != dlm->reco.next_seq) {
- dlmprintk("expected seq %lu from node %u, got %lu\n",
- dlm->reco.next_seq, msg->src_node,
- req->seqnum);
- *out_of_order = 1;
- } else if (dlm->reco.dead_node != req->dead_node) {
- dlmprintk("bad lock array: dead node=%u, sent=%u\n",
- dlm->reco.dead_node != req->dead_node);
- } else if (dlm->reco.new_master != dlm->group_index) {
- dlmprintk0("this node is not the recovery master!\n");
- } else if (dlm->reco.sending_node != msg->src_node ||
- dlm->group_index == msg->dest_node) {
- dlmprintk0("eek. sending_node=%u, actual=%u, dest=%u, me=%u\n",
- dlm->reco.sending_node, msg->src_node,
- msg->dest_node, dlm->group_index);
- } else
- ret = 0;
- return ret;
-}
-
-
-int dlm_recovery_lock_arr_req_handler(net_msg *msg, u32 len, void *data)
-{
- dlm_ctxt *dlm = data;
- dlm_reco_lock_arr_req *req = (dlm_reco_lock_arr_req *)msg->buf;
- dlm_lock_resource *res = NULL;
- dlm_reco_lock_info *info;
- dlm_lock **newlocks = NULL;
- dlm_lock *lock = NULL;
- int ret, i, out_of_order = 0;
-
- ret = 0;
- if (req->num_locks == 0)
- goto send_response;
-
- /* check to see if it's worth kmallocing */
- spin_lock(&dlm->spinlock);
- ret = dlm_check_reco_lock_arr_msg(msg, dlm, &out_of_order);
- spin_unlock(&dlm->spinlock);
- if (ret < 0)
- goto send_response;
-
- newlocks = kmalloc(req->num_locks * sizeof(dlm_lock *), GFP_KERNEL);
- if (!newlocks) {
- dlmprintk0("failed to alloc temp lock array!\n");
- ret = -ENOMEM;
- goto send_response;
- }
- memset(newlocks, 0, req->num_locks * sizeof(dlm_lock *));
- for (i=0; i<req->num_locks; i++) {
- info = &(req->lock[i]);
- if (!dlm_reco_lock_info_valid(info)) {
- ret = -EINVAL;
- goto send_response;
- }
- lock = newlocks[i] = kmem_cache_alloc(dlm_lock_cache, GFP_KERNEL);
- if (!newlocks[i]) {
- ret = -ENOMEM;
- goto send_response;
- }
- memset(lock, 0, sizeof(dlm_lock));
- LIST_HEAD_INIT(&lock->list);
- LIST_HEAD_INIT(&lock->ast_list);
- spin_lock_init(&lock->spinlock);
- lock->type = info->type;
- lock->convert_type = info->convert_type;
- lock->ml.node = dlm->group_index;
- //atomic_set(&lock->ast_lock, 0);
- //atomic_set(&lock->bast_lock, 0);
- lock->ast = NULL;
- lock->bast = NULL;
- lock->astdata = (void *)info->list; // cheating here...
- lock->cookie = info->cookie;
- }
-
- spin_lock(&dlm->spinlock);
- /* ok now that everything is allocated and the lock has
- * been taken again, recheck all those stupid conditions */
- ret = dlm_check_reco_lock_arr_msg(msg, dlm, &out_of_order);
- if (ret < 0) {
- spin_unlock(&dlm->spinlock);
- goto send_response;
- }
- for (i=0; i<req->num_locks; i++) {
- info = &(req->lock[i]);
- lock = newlocks[i];
- list_add_tail(&lock->list, &dlm->reco.received);
- }
- spin_unlock(&dlm->spinlock);
-
-send_response:
- if (newlocks) {
- if (ret < 0) {
- for (i=0; i<req->num_locks; i++)
- if (newlocks[i])
- kmem_cache_free(dlm_reco_lock_info_cache, newlocks[i]);
- }
- kfree(newlocks);
- }
-
- return ret;
-}
-int dlm_recovery_request_handler(net_msg *msg, u32 len, void *data)
-{
- dlm_ctxt *dlm = data;
-}
-int dlm_recovery_response_handler(net_msg *msg, u32 len, void *data)
-{
- dlm_ctxt *dlm = data;
-}
-
-
-
-
-
-static int dlm_send_reco_request(dlm_ctxt *dlm, dlm_reco_request *buf, u8 to, struct inode *node)
-{
- int ret;
- net_msg *msg = net_package_message(DLM_NET_RECOVERY_REQUEST_MSG_TYPE,
- dlm->key, buf, sizeof(*buf),
- dlm->group_index, to);
- if (!msg)
- return -ENOMEM;
- ret = net_send_udp_msg (node, msg, sizeof(*buf));
- kfree(msg);
- return ret;
-}
-
-static int dlm_recover_domain(dlm_ctxt *dlm)
-{
-
-
- return 0;
-}
-
-
-#endif /* LOUSY_RECOVERY */
-
#warning may need to change kfree to put_lock and refcounting here
-static void dlm_do_local_recovery_cleanup(dlm_ctxt *dlm, u8 dead_node, int locked)
+static void dlm_do_local_recovery_cleanup(dlm_ctxt *dlm, u8 dead_node)
{
struct list_head *iter, *iter2, *tmpiter;
dlm_lock_resource *res;
@@ -787,9 +75,6 @@
int i;
struct list_head *bucket;
- if (!locked)
- spin_lock(&dlm->spinlock);
-
for (i=0; i<DLM_HASH_SIZE; i++) {
bucket = &(dlm->resources[i]);
list_for_each(iter, bucket) {
@@ -826,8 +111,6 @@
}
}
- if (!locked)
- spin_unlock(&dlm->spinlock);
}
@@ -858,7 +141,7 @@
dlmprintk("node %u already added to recovery map!\n", idx);
else {
set_bit(idx, dlm->recovery_map);
- dlm_do_local_recovery_cleanup(dlm, idx, 1);
+ dlm_do_local_recovery_cleanup(dlm, idx);
}
spin_unlock(&dlm->spinlock);
More information about the Ocfs2-commits
mailing list