[Ocfs2-commits] khackel commits r2055 - trunk/fs/ocfs2/dlm
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Fri Mar 25 15:36:28 CST 2005
Author: khackel
Signed-off-by: mfasheh
Date: 2005-03-25 15:36:27 -0600 (Fri, 25 Mar 2005)
New Revision: 2055
Modified:
trunk/fs/ocfs2/dlm/dlmcommon.h
trunk/fs/ocfs2/dlm/dlmlock.c
trunk/fs/ocfs2/dlm/dlmmaster.c
trunk/fs/ocfs2/dlm/dlmrecovery.c
trunk/fs/ocfs2/dlm/dlmthread.c
Log:
* use dlm_new_lock/dlm_init_lock to standardize how callers can
create and init dlm_lock structures
* add dlm_dispatch_assert_master to allow assert_master calls to
be dispatched to keventd for a few places that cannot do network
messages of their own
* small cleanup in path that frees locks owned by dead nodes
* prepares for dlm_lock refcounting in next patch
Signed-off-by: mfasheh
Modified: trunk/fs/ocfs2/dlm/dlmcommon.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmcommon.h 2005-03-25 20:09:38 UTC (rev 2054)
+++ trunk/fs/ocfs2/dlm/dlmcommon.h 2005-03-25 21:36:27 UTC (rev 2055)
@@ -165,6 +165,14 @@
u8 real_master;
} dlm_mig_lockres_priv;
+typedef struct _dlm_assert_master_priv
+{
+ dlm_lock_resource *lockres;
+ u8 request_from;
+ unsigned ignore_higher:1;
+} dlm_assert_master_priv;
+
+
struct _dlm_work_item
{
struct list_head list;
@@ -174,6 +182,7 @@
union {
dlm_request_all_locks_priv ral;
dlm_mig_lockres_priv ml;
+ dlm_assert_master_priv am;
} u;
};
@@ -764,6 +773,8 @@
}
+dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie, dlm_lockstatus *lksb);
+
int dlm_create_lock_handler(net_msg *msg, u32 len, void *data);
int dlm_convert_lock_handler(net_msg *msg, u32 len, void *data);
int dlm_proxy_ast_handler(net_msg *msg, u32 len, void *data);
@@ -773,6 +784,7 @@
void dlm_shuffle_lists(dlm_ctxt *dlm, dlm_lock_resource *res);
int dlm_launch_thread(dlm_ctxt *dlm);
void dlm_complete_thread(dlm_ctxt *dlm);
+void dlm_flush_asts(dlm_ctxt *dlm);
int dlm_launch_recovery_thread(dlm_ctxt *dlm);
void dlm_complete_recovery_thread(dlm_ctxt *dlm);
@@ -858,7 +870,11 @@
int dlm_begin_reco_handler(net_msg *msg, u32 len, void *data);
int dlm_finalize_reco_handler(net_msg *msg, u32 len, void *data);
+int dlm_dispatch_assert_master(dlm_ctxt *dlm, dlm_lock_resource *res,
+ int ignore_higher, u8 request_from);
+void dlm_assert_master_worker(dlm_work_item *item, void *data);
+
int dlm_send_one_lockres(dlm_ctxt *dlm, dlm_lock_resource *res,
dlm_migratable_lockres *mres,
u8 send_to, u8 flags);
@@ -878,7 +894,6 @@
DLM_LOCK_RES_MIGRATING));
}
-void dlm_init_lock(dlm_lock *newlock, int type, u8 node, u64 cookie);
void dlm_mle_node_down(dlm_ctxt *dlm, dlm_master_list_entry *mle,
struct nm_node *node, int idx);
Modified: trunk/fs/ocfs2/dlm/dlmlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmlock.c 2005-03-25 20:09:38 UTC (rev 2054)
+++ trunk/fs/ocfs2/dlm/dlmlock.c 2005-03-25 21:36:27 UTC (rev 2055)
@@ -55,6 +55,7 @@
static dlm_status dlm_send_remote_lock_request(dlm_ctxt *dlm,
dlm_lock_resource *res,
dlm_lock *lock, int flags);
+static void dlm_init_lock(dlm_lock *newlock, int type, u8 node, u64 cookie);
/* Tell us whether we can grant a new lock request.
* locking:
@@ -167,6 +168,7 @@
/* will exit this call with spinlock held */
__dlm_wait_on_lockres(res);
res->state |= DLM_LOCK_RES_IN_PROGRESS;
+
/* add lock to local (secondary) queue */
list_add_tail(&lock->list, &res->blocked);
spin_unlock(&res->spinlock);
@@ -231,9 +233,9 @@
return ret;
}
-void dlm_init_lock(dlm_lock *newlock, int type, u8 node, u64 cookie)
+
+static void dlm_init_lock(dlm_lock *newlock, int type, u8 node, u64 cookie)
{
- memset(newlock, 0, sizeof(dlm_lock));
INIT_LIST_HEAD(&newlock->list);
INIT_LIST_HEAD(&newlock->ast_list);
INIT_LIST_HEAD(&newlock->bast_list);
@@ -250,6 +252,33 @@
newlock->bast_pending = 0;
}
+dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie, dlm_lockstatus *lksb)
+{
+ dlm_lock *lock;
+
+ lock = kmalloc(sizeof(dlm_lock), GFP_KERNEL);
+ if (!lock)
+ return NULL;
+
+ memset(lock, 0, sizeof(dlm_lock));
+
+ if (!lksb) {
+ lksb = kmalloc(sizeof(dlm_lockstatus), GFP_KERNEL);
+ if (!lksb) {
+ kfree(lock);
+ return NULL;
+ }
+ /* memset only if kernel-allocated */
+ memset(lksb, 0, sizeof(dlm_lockstatus));
+ lksb->flags |= DLM_LKSB_KERNEL_ALLOCATED;
+ }
+
+ dlm_init_lock(lock, type, node, cookie);
+ lock->lksb = lksb;
+ lksb->lockid = lock;
+ return lock;
+}
+
/* handler for lock creation net message
* locking:
* caller needs: none
@@ -286,23 +315,14 @@
goto leave;
status = DLM_SYSERR;
- newlock = kmalloc(sizeof(dlm_lock), GFP_KERNEL);
+ newlock = dlm_new_lock(create->requested_type,
+ create->node_idx,
+ create->cookie, NULL);
if (!newlock)
goto leave;
- lksb = kmalloc(sizeof(dlm_lockstatus), GFP_KERNEL);
- if (!lksb)
- goto leave;
+ lksb = newlock->lksb;
- memset(lksb, 0, sizeof(dlm_lockstatus));
-
- dlm_init_lock(newlock, create->requested_type,
- create->node_idx, create->cookie);
-
- newlock->lksb = lksb;
- lksb->lockid = newlock;
- lksb->flags |= DLM_LKSB_KERNEL_ALLOCATED;
-
if (create->flags & LKM_GET_LVB) {
lksb->flags |= DLM_LKSB_GET_LVB;
dlmprintk("set DLM_LKSB_GET_LVB flag\n");
@@ -327,7 +347,7 @@
goto leave;
}
spin_unlock(&res->spinlock);
-
+
newlock->lockres = res;
status = dlmlock_master(dlm, res, newlock, create->flags);
leave:
@@ -437,6 +457,8 @@
goto retry_convert;
}
} else {
+ u64 tmpcookie;
+
/* LOCK request */
status = DLM_BADARGS;
if (!name)
@@ -446,12 +468,11 @@
if (strlen(name) > DLM_LOCKID_NAME_MAX)
goto error;
- lock = kmalloc(sizeof(dlm_lock), GFP_KERNEL); /* dlm_lock */
+ dlm_get_next_cookie(lock->ml.node, &tmpcookie);
+ lock = dlm_new_lock(mode, dlm->node_num, tmpcookie, lksb);
if (!lock)
goto error;
- lksb->lockid = lock;
-
if (!recovery)
down_read(&dlm->recovery_sem);
@@ -465,26 +486,11 @@
dlmprintk("type=%d, flags = 0x%x\n", mode, flags);
dlmprintk("creating lock: lock=%p res=%p\n", lock, res);
-#warning move this into dlm_init_lock
- memset(lock, 0, sizeof(dlm_lock));
- INIT_LIST_HEAD(&lock->list);
- INIT_LIST_HEAD(&lock->ast_list);
- INIT_LIST_HEAD(&lock->bast_list);
- spin_lock_init(&lock->spinlock);
lock->lockres = res;
- lock->ml.type = mode;
- lock->ml.convert_type = LKM_IVMODE;
- lock->ml.highest_blocked = LKM_IVMODE;
- lock->ml.node = dlm->node_num;
lock->ast = ast;
lock->bast = bast;
lock->astdata = data;
- lock->lksb = lksb;
- lock->ast_pending = 0;
- lock->bast_pending = 0;
- dlm_get_next_cookie(lock->ml.node, &lock->ml.cookie);
-
retry_lock:
if (flags & LKM_VALBLK) {
dlmprintk("LKM_VALBLK passed by caller\n");
Modified: trunk/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmaster.c 2005-03-25 20:09:38 UTC (rev 2054)
+++ trunk/fs/ocfs2/dlm/dlmmaster.c 2005-03-25 21:36:27 UTC (rev 2055)
@@ -921,8 +921,7 @@
dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
char *name;
unsigned int namelen;
- int found, ret, bit;
- unsigned long nodemap[BITS_TO_LONGS(NM_MAX_NODES)];
+ int found, ret;
if (!dlm_grab(dlm))
return DLM_MASTER_RESP_NO;
@@ -945,17 +944,6 @@
spin_lock(&dlm->spinlock);
res = __dlm_lookup_lock(dlm, name, namelen);
if (res) {
- /* while we still have the dlm->spinlock,
- * save off the node map and clear out
- * all nodes from this node forward, and
- * the node that called us */
- memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
- clear_bit(request->node_idx, nodemap);
- clear_bit(dlm->node_num, nodemap);
- while ((bit = find_next_bit(nodemap, NM_MAX_NODES,
- dlm->node_num)) < NM_MAX_NODES) {
- clear_bit(bit, nodemap);
- }
spin_unlock(&dlm->spinlock);
/* take care of the easy cases up front */
@@ -983,10 +971,13 @@
* caused all nodes up to this one to
* create mles. this node now needs to
* go back and clean those up. */
-#warning this needs to move to the work queue
- ret = dlm_do_assert_master(dlm, res->lockname.name,
- res->lockname.len,
- nodemap);
+ ret = dlm_dispatch_assert_master(dlm, res, 1,
+ request->node_idx);
+ if (ret < 0) {
+ dlmerror0("failed to dispatch assert "
+ "master work\n");
+ response = DLM_MASTER_RESP_ERROR;
+ }
goto send_response;
} else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
spin_unlock(&res->spinlock);
@@ -1299,10 +1290,99 @@
int dlm_flush_lockres_asts(dlm_ctxt *dlm, dlm_lock_resource *res)
{
-#warning need to implement dlm_flush_lockres_asts
+ dlm_flush_asts(dlm);
+ /* still need to implement dlm_flush_lockres_asts */
return 0;
}
+int dlm_dispatch_assert_master(dlm_ctxt *dlm, dlm_lock_resource *res,
+ int ignore_higher, u8 request_from)
+{
+ dlm_work_item *item;
+ item = (dlm_work_item *)kmalloc(sizeof(dlm_work_item), GFP_KERNEL);
+ if (!item)
+ return -ENOMEM;
+
+
+ /* queue up work for dlm_assert_master_worker */
+ memset(item, 0, sizeof(dlm_work_item));
+ dlm_grab(dlm); /* get an extra ref for the work item */
+ dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
+ item->u.am.lockres = res; /* already have a ref */
+ /* can optionally ignore node numbers higher than this node */
+ item->u.am.ignore_higher = ignore_higher;
+ item->u.am.request_from = request_from;
+
+ spin_lock(&dlm->work_lock);
+ list_add_tail(&item->list, &dlm->work_list);
+ spin_unlock(&dlm->work_lock);
+
+ schedule_work(&dlm->dispatched_work);
+ return 0;
+}
+
+void dlm_assert_master_worker(dlm_work_item *item, void *data)
+{
+ dlm_ctxt *dlm = data;
+ int ret = 0;
+ dlm_lock_resource *res;
+ unsigned long nodemap[BITS_TO_LONGS(NM_MAX_NODES)];
+ int ignore_higher;
+ int bit;
+ u8 request_from;
+
+ DLM_ASSERT(item);
+ dlm = item->dlm;
+ DLM_ASSERT(dlm);
+
+ res = item->u.am.lockres;
+ DLM_ASSERT(res);
+
+ ignore_higher = item->u.am.ignore_higher;
+ request_from = item->u.am.request_from;
+
+ do {
+ spin_lock(&dlm->spinlock);
+ memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
+ spin_unlock(&dlm->spinlock);
+
+ clear_bit(dlm->node_num, nodemap);
+ if (ignore_higher) {
+ /* if is this just to clear up mles for nodes below
+ * this node, do not send the message to the original
+ * caller or any node number higher than this */
+ clear_bit(request_from, nodemap);
+ bit = dlm->node_num;
+ while (1) {
+ bit = find_next_bit(nodemap, NM_MAX_NODES,
+ bit+1);
+ if (bit >= NM_MAX_NODES)
+ break;
+ clear_bit(bit, nodemap);
+ }
+ }
+
+ ret = dlm_do_assert_master(dlm, res->lockname.name,
+ res->lockname.len,
+ nodemap);
+ if (ret < 0) {
+ /* no choice but to try again.
+ * maybe a node died. */
+ dlmerror("assert master returned %d!\n", ret);
+ }
+ } while (ret < 0);
+
+ dlm_lockres_put(dlm, res);
+
+ dlmprintk0("finished with dlm_assert_master_worker\n");
+}
+
+
+/*
+ * DLM_MIGRATE_LOCKRES
+ */
+
+
int dlm_migrate_lockres(dlm_ctxt *dlm, dlm_lock_resource *res, u8 target)
{
dlm_master_list_entry *mle = NULL;
Modified: trunk/fs/ocfs2/dlm/dlmrecovery.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmrecovery.c 2005-03-25 20:09:38 UTC (rev 2054)
+++ trunk/fs/ocfs2/dlm/dlmrecovery.c 2005-03-25 21:36:27 UTC (rev 2055)
@@ -1265,8 +1265,13 @@
spin_lock(&res->spinlock);
master = res->owner;
if (master == dlm->node_num) {
-#warning need to broadcast here that i own this
- dlmprintk0("need to broadcast here that i own this\n");
+ int ret = dlm_dispatch_assert_master(dlm, res, 0, 0);
+ if (ret < 0) {
+ dlmerror0("could not allocate enough memory "
+ "to send assert_master message!\n");
+ /* retry!? */
+ BUG();
+ }
}
spin_unlock(&res->spinlock);
}
@@ -1373,22 +1378,13 @@
}
/* lock is for another node. */
- newlock = kmalloc(sizeof(dlm_lock), GFP_KERNEL);
- lksb = kmalloc(sizeof(dlm_lockstatus), GFP_KERNEL);
- if (!newlock || !lksb) {
+ newlock = dlm_new_lock(ml->type, ml->node, ml->cookie, NULL);
+ if (!newlock) {
ret = -ENOMEM;
goto leave;
}
-
- memset(newlock, 0, sizeof(dlm_lock));
- memset(lksb, 0, sizeof(dlm_lockstatus));
-
- dlm_init_lock(newlock, ml->type, ml->node, ml->cookie);
- newlock->lksb = lksb;
- __dlm_lockres_get(res);
- newlock->lockres = res;
- lksb->lockid = newlock;
- lksb->flags |= DLM_LKSB_KERNEL_ALLOCATED;
+ lksb = newlock->lksb;
+ newlock->lockres = dlm_lockres_grab(dlm, res);
if (ml->convert_type != LKM_IVMODE) {
DLM_ASSERT(queue == &res->converting);
@@ -1473,15 +1469,42 @@
}
}
+static void dlm_free_dead_locks(dlm_ctxt *dlm, dlm_lock_resource *res,
+ u8 dead_node)
+{
+ struct list_head *iter, *tmpiter;
+ dlm_lock *lock;
+ assert_spin_locked(&res->spinlock);
+ /* TODO: check pending_asts, pending_basts here */
+ list_for_each_safe(iter, tmpiter, &res->granted) {
+ lock = list_entry (iter, dlm_lock, list);
+ if (lock->ml.node == dead_node) {
+ list_del_init(&lock->list);
+ kfree(lock);
+ }
+ }
+ list_for_each_safe(iter, tmpiter, &res->converting) {
+ lock = list_entry (iter, dlm_lock, list);
+ if (lock->ml.node == dead_node) {
+ list_del_init(&lock->list);
+ kfree(lock);
+ }
+ }
+ list_for_each_safe(iter, tmpiter, &res->blocked) {
+ lock = list_entry (iter, dlm_lock, list);
+ if (lock->ml.node == dead_node) {
+ list_del_init(&lock->list);
+ kfree(lock);
+ }
+ }
+}
-#warning may need to change kfree to put_lock and refcounting here
static void dlm_do_local_recovery_cleanup(dlm_ctxt *dlm, u8 dead_node)
{
- struct list_head *iter, *iter2, *tmpiter;
+ struct list_head *iter;
dlm_lock_resource *res;
- dlm_lock *lock;
int i;
struct list_head *bucket;
@@ -1511,31 +1534,10 @@
res->lockname.len))
continue;
spin_lock(&res->spinlock);
- if (res->owner == dead_node) {
+ if (res->owner == dead_node)
dlm_move_lockres_to_recovery_list(dlm, res);
- } else if (res->owner == dlm->node_num) {
- list_for_each_safe(iter2, tmpiter, &res->granted) {
- lock = list_entry (iter2, dlm_lock, list);
- if (lock->ml.node == dead_node) {
- list_del_init(&lock->list);
- kfree(lock);
- }
- }
- list_for_each_safe(iter2, tmpiter, &res->converting) {
- lock = list_entry (iter2, dlm_lock, list);
- if (lock->ml.node == dead_node) {
- list_del_init(&lock->list);
- kfree(lock);
- }
- }
- list_for_each_safe(iter2, tmpiter, &res->blocked) {
- lock = list_entry (iter2, dlm_lock, list);
- if (lock->ml.node == dead_node) {
- list_del_init(&lock->list);
- kfree(lock);
- }
- }
- }
+ else if (res->owner == dlm->node_num)
+ dlm_free_dead_locks(dlm, res, dead_node);
spin_unlock(&res->spinlock);
}
}
Modified: trunk/fs/ocfs2/dlm/dlmthread.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmthread.c 2005-03-25 20:09:38 UTC (rev 2054)
+++ trunk/fs/ocfs2/dlm/dlmthread.c 2005-03-25 21:36:27 UTC (rev 2055)
@@ -423,7 +423,7 @@
return empty;
}
-static void dlm_flush_asts(dlm_ctxt *dlm)
+void dlm_flush_asts(dlm_ctxt *dlm)
{
struct list_head *iter, *iter2;
dlm_lock *lock;
More information about the Ocfs2-commits
mailing list