[Ocfs2-commits] mfasheh commits r1891 - trunk/fs/ocfs2/dlm
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Thu Feb 17 18:13:18 CST 2005
Author: mfasheh
Date: 2005-02-17 18:13:17 -0600 (Thu, 17 Feb 2005)
New Revision: 1891
Modified:
trunk/fs/ocfs2/dlm/dlmast.c
trunk/fs/ocfs2/dlm/dlmconvert.c
trunk/fs/ocfs2/dlm/dlmlock.c
trunk/fs/ocfs2/dlm/dlmmaster.c
trunk/fs/ocfs2/dlm/dlmmod.c
trunk/fs/ocfs2/dlm/dlmmod.h
trunk/fs/ocfs2/dlm/dlmrecovery.c
Log:
* ref counting for dlm_ctxt structures. This doesn't give us umount yet
because we need to do lock migration.
-the kref api changed mid 2.6 so i'll have some compat glue for this ASAP.
* fix a bug where the dlm_ctxt was being overwritten due to bad arguments to
hb_fill_node_map
* fix a bad flag check in dlm_create_lock_handler
* do a little bit of cleanup in dlm_get_lock_resource. "a little bit" --
it's still a beast.
Modified: trunk/fs/ocfs2/dlm/dlmast.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmast.c 2005-02-18 00:12:37 UTC (rev 1890)
+++ trunk/fs/ocfs2/dlm/dlmast.c 2005-02-18 00:13:17 UTC (rev 1891)
@@ -56,7 +56,6 @@
int dlm_do_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock)
{
int ret;
-
dlm_astlockfunc_t *fn;
dlm_lockstatus *lksb;
@@ -105,7 +104,7 @@
{
int ret;
dlm_bastlockfunc_t *fn = lock->bast;
-
+
dlmprintk0("\n");
if (lock->node != dlm->group_index) {
@@ -140,6 +139,9 @@
u64 cookie;
u32 flags;
+ if (!dlm_grab(dlm))
+ return DLM_REJECTED;
+
dlm_proxy_ast_to_host(past);
lockname.name = past->name;
lockname.len = past->namelen;
@@ -157,7 +159,7 @@
(flags & LKM_GET_LVB ? "get lvb" : "none"));
lockname.hash = full_name_hash(lockname.name, lockname.len);
-
+
dlmprintk("type=%d, blocked_type=%d\n", past->type, past->blocked_type);
if (past->type != DLM_AST &&
@@ -214,7 +216,7 @@
up_read(&dlm->recovery_sem);
ret = DLM_NORMAL;
goto leave;
-
+
do_ast:
ret = DLM_NORMAL;
if (past->type == DLM_AST) {
@@ -228,7 +230,7 @@
} else {
// should already be there....
}
-
+
lock->lksb->status = DLM_NORMAL;
/* if we requested the lvb, fetch it into our lksb now */
@@ -255,6 +257,8 @@
up_read(&dlm->recovery_sem);
leave:
+
+ dlm_put(dlm);
return ret;
}
@@ -283,6 +287,7 @@
iov[0].iov_len = sizeof(dlm_proxy_ast);
iov[0].iov_base = &past;
if (lock->lksb->flags & DLM_LKSB_GET_LVB) {
+ dlmprintk("sending LKM_GET_LVB flag\n");
past.flags |= LKM_GET_LVB;
iov[1].iov_len = DLM_LVB_LEN;
iov[1].iov_base = lock->lksb->lvb;
@@ -302,5 +307,3 @@
current->pid, ret);
return ret;
}
-
-
Modified: trunk/fs/ocfs2/dlm/dlmconvert.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmconvert.c 2005-02-18 00:12:37 UTC (rev 1890)
+++ trunk/fs/ocfs2/dlm/dlmconvert.c 2005-02-18 00:13:17 UTC (rev 1891)
@@ -379,6 +379,9 @@
int call_ast = 0, kick_thread = 0;
int found = 0;
+ if (!dlm_grab(dlm))
+ return DLM_REJECTED;
+
dlm_convert_lock_to_host(cnv);
lockname.name = cnv->name;
lockname.len = cnv->namelen;
@@ -448,5 +451,7 @@
if (kick_thread)
dlm_kick_thread(dlm, res);
+ dlm_put(dlm);
+
return status;
}
Modified: trunk/fs/ocfs2/dlm/dlmlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmlock.c 2005-02-18 00:12:37 UTC (rev 1890)
+++ trunk/fs/ocfs2/dlm/dlmlock.c 2005-02-18 00:13:17 UTC (rev 1891)
@@ -108,6 +108,7 @@
__dlm_wait_on_lockres(res);
if (dlm_can_grant_new_lock(res, lock)) {
+ dlmprintk("I can grant this lock right away\n");
/* got it right away */
lock->lksb->status = DLM_NORMAL;
status = DLM_NORMAL;
@@ -145,8 +146,10 @@
dlm_lock *lock, int flags)
{
dlm_status status = DLM_DENIED;
-
+
dlmprintk("type=%d\n", lock->type);
+ dlmprintk("lockres %*s, flags = 0x%x\n", res->lockname.len,
+ res->lockname.name, flags);
spin_lock(&res->spinlock);
if (res->state & DLM_LOCK_RES_RECOVERING) {
@@ -164,7 +167,7 @@
/* spec seems to say that you will get DLM_NORMAL when the lock
* has been queued, meaning we need to wait for a reply here. */
status = dlm_send_remote_lock_request(dlm, res, lock, flags);
-
+
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
if (status != DLM_NORMAL) {
@@ -245,11 +248,14 @@
DLM_ASSERT(dlm);
+ dlmprintk0("\n");
+
+ if (!dlm_grab(dlm))
+ return DLM_REJECTED;
+
dlm_create_lock_to_host(create);
lockname.name = create->name;
lockname.len = create->namelen;
-
- dlmprintk0("\n");
lockname.hash = full_name_hash(lockname.name, lockname.len);
@@ -257,11 +263,11 @@
newlock = kmalloc(sizeof(dlm_lock), GFP_KERNEL);
if (!newlock)
goto leave;
-
+
lksb = kmalloc(sizeof(dlm_lockstatus), GFP_KERNEL);
if (!lksb)
goto leave;
-
+
memset(newlock, 0, sizeof(dlm_lock));
INIT_LIST_HEAD(&newlock->list);
INIT_LIST_HEAD(&newlock->ast_list);
@@ -280,8 +286,10 @@
lksb->lockid = newlock;
lksb->flags |= DLM_LKSB_KERNEL_ALLOCATED;
- if (create->flags | LKM_GET_LVB)
+ if (create->flags & LKM_GET_LVB) {
lksb->flags |= DLM_LKSB_GET_LVB;
+ dlmprintk("set DLM_LKSB_GET_LVB flag\n");
+ }
status = DLM_IVLOCKID;
res = dlm_lookup_lock(dlm, &lockname);
@@ -300,5 +308,7 @@
kfree(lksb);
}
+ dlm_put(dlm);
+
return status;
}
Modified: trunk/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmaster.c 2005-02-18 00:12:37 UTC (rev 1890)
+++ trunk/fs/ocfs2/dlm/dlmmaster.c 2005-02-18 00:13:17 UTC (rev 1891)
@@ -207,34 +207,41 @@
int blocked = 0;
int map_changed = 0, restart = 0, assert = 0;
int ret, start, bit;
-
+
bucket = &(dlm->resources[lockname->hash & DLM_HASH_MASK]);
- /* pre-allocate a dlm_lock_resource and master stuff */
- mle = kmalloc(sizeof(dlm_master_list_entry), GFP_KERNEL);
- res = kmalloc(sizeof(dlm_lock_resource), GFP_KERNEL);
- if (!mle || !res) {
- dlmprintk0("could not allocate memory for new lock resource\n");
+lookup:
+ spin_lock(&dlm->spinlock);
+ tmpres = __dlm_lookup_lock(dlm, lockname);
+ if (tmpres) {
+ spin_unlock(&dlm->spinlock);
+
if (mle)
kfree(mle);
if (res)
kfree(res);
- res = NULL;
- goto leave;
+
+ return tmpres;
}
- /* check for pre-existing lock */
- spin_lock(&dlm->spinlock);
- tmpres = __dlm_lookup_lock(dlm, lockname);
- if (tmpres) {
+ if (!res) {
spin_unlock(&dlm->spinlock);
- /* TODO: return error, or return the lockres ?!? */
- kfree(res);
- kfree(mle);
- res = tmpres;
- goto leave;
+
+ /* nothing found and we need to allocate one. */
+ mle = kmalloc(sizeof(dlm_master_list_entry), GFP_KERNEL);
+ if (!mle)
+ return NULL;
+
+ res = kmalloc(sizeof(dlm_lock_resource), GFP_KERNEL);
+ if (!res) {
+ kfree(mle);
+ return NULL;
+ }
+
+ goto lookup;
}
+ /* Ok, no lockres found and we have one to insert... */
dlm_init_lockres(res, lockname);
if (flags & LKM_LOCAL) {
@@ -247,7 +254,7 @@
/* lockres still marked IN_PROGRESS */
goto wake_waiters;
}
-
+
/* check master list to see if another node has started mastering it */
spin_lock(&dlm_master_lock);
list_for_each(iter, &dlm_master_list) {
@@ -256,7 +263,8 @@
continue;
if (tmpmle->type == DLM_MLE_MASTER) {
- dlmprintk0("eek! master entry for nonexistent lock!\n");
+ dlmprintk0("eek! master entry for nonexistent "
+ "lock!\n");
BUG();
}
dlm_get_mle(tmpmle);
@@ -275,15 +283,15 @@
}
spin_unlock(&dlm_master_lock);
- /* at this point there is either a DLM_MLE_BLOCK or a DLM_MLE_MASTER
- * on the master list, so it's safe to add the lockres to the hashtable.
- * anyone who finds the lock will still have to wait on the IN_PROGRESS.
- * also, any new nodes that try to join at this point will have to wait
- * until my dlm_master_lock list is empty, so they cannot possibly
- * do any master requests yet... TODO
- * ?? should i have a special type of mle just for joining nodes ??
- * ?? could allow them to come in and put their mle
- * on the list and sleep ?? */
+ /* at this point there is either a DLM_MLE_BLOCK or a
+ * DLM_MLE_MASTER on the master list, so it's safe to add the
+ * lockres to the hashtable. anyone who finds the lock will
+ * still have to wait on the IN_PROGRESS. also, any new nodes
+ * that try to join at this point will have to wait until my
+ * dlm_master_lock list is empty, so they cannot possibly do
+ * any master requests yet... TODO ?? should i have a special
+ * type of mle just for joining nodes ?? ?? could allow them
+ * to come in and put their mle on the list and sleep ?? */
/* finally add the lockres to its hash bucket */
list_add_tail(&res->list, bucket);
@@ -472,13 +480,12 @@
spin_unlock(&res->spinlock);
wake_up(&res->wq);
-leave:
return res;
}
-
+
/*
* locks that can be taken here:
* dlm->spinlock
@@ -499,6 +506,9 @@
int found;
struct list_head *iter;
+ if (!dlm_grab(dlm))
+ return DLM_MASTER_RESP_NO;
+
dlm_master_request_to_host(request);
lockname.name = request->name;
lockname.len = request->namelen;
@@ -572,7 +582,7 @@
dlmprintk0("bug bug bug!!! no mle found for this lock!\n");
BUG();
}
-
+
/*
* lockres doesn't exist on this node
* if there is an MLE_BLOCK, return NO
@@ -596,7 +606,7 @@
if (!mle) {
spin_unlock(&dlm_master_lock);
spin_unlock(&dlm->spinlock);
-
+
mle = kmalloc(sizeof(dlm_master_list_entry) +
lockname.len, GFP_KERNEL);
if (!mle) {
@@ -634,6 +644,7 @@
spin_unlock(&dlm->spinlock);
send_response:
+ dlm_put(dlm);
//ret = dlm_do_master_request_resp(dlm, &lockname, response,
// request->node_idx);
//dlmprintk("response returned %d\n", ret);
@@ -660,6 +671,9 @@
struct list_head *iter;
struct qstr lockname;
+ if (!dlm_grab(dlm))
+ return 0;
+
dlm_master_request_resp_to_host(resp);
lockname.name = resp->name;
lockname.len = resp->namelen;
@@ -698,8 +712,8 @@
wake = 1;
break;
case DLM_MASTER_RESP_MAYBE:
- // dlmprintk("node %u is not the master, but IS"
- // " in-progress\n", resp->node_idx);
+ //dlmprintk("node %u is not the master, but IS"
+ //" in-progress\n", resp->node_idx);
set_bit(resp->node_idx, mle->response_map);
set_bit(resp->node_idx, mle->maybe_map);
if (memcmp(mle->response_map, mle->vote_map,
@@ -731,6 +745,8 @@
else
dlmprintk0("hrrm... got a master resp but found no matching "
"request\n");
+
+ dlm_put(dlm);
return 0;
}
@@ -753,6 +769,9 @@
struct list_head *iter;
struct qstr lockname;
+ if (!dlm_grab(dlm))
+ return 0;
+
dlm_assert_master_to_host(assert);
lockname.name = assert->name;
lockname.len = assert->namelen;
@@ -835,6 +854,8 @@
/* if this is the last put, it will be removed from the list */
dlm_put_mle(mle);
}
+
+ dlm_put(dlm);
return 0;
}
Modified: trunk/fs/ocfs2/dlm/dlmmod.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmod.c 2005-02-18 00:12:37 UTC (rev 1890)
+++ trunk/fs/ocfs2/dlm/dlmmod.c 2005-02-18 00:13:17 UTC (rev 1891)
@@ -71,6 +71,7 @@
LIST_HEAD(dlm_domains);
spinlock_t dlm_domain_lock = SPIN_LOCK_UNLOCKED;
+DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
u8 dlm_global_index = NM_MAX_NODES;
static spinlock_t dlm_cookie_lock = SPIN_LOCK_UNLOCKED;
static u64 dlm_next_cookie = 1;
@@ -220,7 +221,7 @@
status = DLM_BADARGS;
if (!name)
goto error;
-
+
status = DLM_IVBUFLEN;
q.len = strlen(name);
if (q.len > DLM_LOCKID_NAME_MAX)
@@ -253,7 +254,7 @@
goto up_error;
}
- dlmprintk("type=%d\n", mode);
+ dlmprintk("type=%d, flags = 0x%x\n", mode, flags);
dlmprintk("creating lock: lock=%p res=%p\n", lock, res);
memset(lock, 0, sizeof(dlm_lock));
@@ -273,6 +274,8 @@
dlm_get_next_cookie(lock->node, &lock->cookie);
if (flags & LKM_VALBLK) {
+ dlmprintk("LKM_VALBLK passed by caller\n");
+
/* LVB requests for non PR, PW or EX locks are
* ignored. */
if (mode < LKM_PRMODE)
@@ -379,9 +382,39 @@
}
EXPORT_SYMBOL(dlmunlock);
+dlm_lock_resource * __dlm_lookup_lock(dlm_ctxt *dlm, struct qstr *lockname)
+{
+ struct list_head *iter;
+ dlm_lock_resource *tmpres=NULL;
+ struct list_head *bucket;
-static dlm_ctxt * __dlm_lookup_domain(char *domain)
+ dlmprintk0("\n");
+
+ bucket = &(dlm->resources[lockname->hash & DLM_HASH_MASK]);
+
+ /* check for pre-existing lock */
+ list_for_each(iter, bucket) {
+ tmpres = list_entry(iter, dlm_lock_resource, list);
+ if (tmpres->lockname.len == lockname->len &&
+ strncmp(tmpres->lockname.name, lockname->name,
+ lockname->len) == 0)
+ break;
+ tmpres = NULL;
+ }
+ return tmpres;
+}
+
+dlm_lock_resource * dlm_lookup_lock(dlm_ctxt *dlm, struct qstr *lockname)
{
+ dlm_lock_resource *res;
+ spin_lock(&dlm->spinlock);
+ res = __dlm_lookup_lock(dlm, lockname);
+ spin_unlock(&dlm->spinlock);
+ return res;
+}
+
+static dlm_ctxt * __dlm_lookup_domain(const char *domain)
+{
dlm_ctxt *tmp = NULL;
struct list_head *iter;
@@ -395,111 +428,211 @@
return tmp;
}
-dlm_ctxt * dlm_lookup_domain(char *domain)
+/* returns true on one of two conditions:
+ * 1) the domain does not exist
+ * 2) the domain exists and it's state is "joined" */
+static int dlm_wait_on_domain_helper(const char *domain)
{
+ int ret = 0;
dlm_ctxt *tmp = NULL;
+
spin_lock(&dlm_domain_lock);
+
tmp = __dlm_lookup_domain(domain);
+ if (!tmp)
+ ret = 1;
+ else if (tmp->dlm_state == DLM_CTXT_JOINED)
+ ret = 1;
+
spin_unlock(&dlm_domain_lock);
- return tmp;
+ return ret;
}
-dlm_lock_resource * __dlm_lookup_lock(dlm_ctxt *dlm, struct qstr *lockname)
+static void dlm_free_ctxt_mem(dlm_ctxt *dlm)
{
+ BUG_ON(!dlm);
+
+ if (dlm->resources)
+ free_page((unsigned long) dlm->resources);
+
+ if (dlm->name)
+ kfree(dlm->name);
+
+ if (dlm->group)
+ iput(dlm->group);
+
+ kfree(dlm);
+}
+
+/* A little strange - this function will be called while holding
+ * dlm_domain_lock and is expected to be holding it on the way out. We
+ * will however drop and reacquire it multiple times */
+static void dlm_ctxt_release(struct kref *kref)
+{
+ dlm_ctxt *dlm;
+
+ BUG_ON(!kref);
+
+ dlm = container_of(kref, dlm_ctxt, dlm_refs);
+
+ BUG_ON(dlm->num_joins);
+ BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
+
+ /* we may still be in the list if we hit an error during join. */
+ list_del_init(&dlm->list);
+
+ spin_unlock(&dlm_domain_lock);
+
+ dlmprintk("freeing memory from domain %s\n", dlm->name);
+
+ wake_up(&dlm_domain_events);
+
+ dlm_free_ctxt_mem(dlm);
+
+ spin_lock(&dlm_domain_lock);
+}
+
+void dlm_put(dlm_ctxt *dlm)
+{
+ BUG_ON(!dlm);
+
+ spin_lock(&dlm_domain_lock);
+ kref_put(&dlm->dlm_refs);
+ spin_unlock(&dlm_domain_lock);
+}
+
+static void __dlm_get(dlm_ctxt *dlm)
+{
+ kref_get(&dlm->dlm_refs);
+}
+
+/* given a questionable reference to a dlm object, gets a reference if
+ * it can find it in the list, otherwise returns NULL in which case
+ * you shouldn't trust your pointer. */
+dlm_ctxt *dlm_grab(dlm_ctxt *dlm)
+{
struct list_head *iter;
- dlm_lock_resource *tmpres=NULL;
- struct list_head *bucket;
-
- dlmprintk0("\n");
+ dlm_ctxt *target = NULL;
- bucket = &(dlm->resources[lockname->hash & DLM_HASH_MASK]);
+ spin_lock(&dlm_domain_lock);
- /* check for pre-existing lock */
- list_for_each(iter, bucket) {
- tmpres = list_entry(iter, dlm_lock_resource, list);
- if (tmpres->lockname.len == lockname->len &&
- strncmp(tmpres->lockname.name, lockname->name,
- lockname->len) == 0)
+ list_for_each(iter, &dlm_domains) {
+ target = list_entry (iter, dlm_ctxt, list);
+
+ if (target == dlm) {
+ __dlm_get(target);
break;
- tmpres = NULL;
+ }
+
+ target = NULL;
}
- return tmpres;
+
+ spin_unlock(&dlm_domain_lock);
+
+ return target;
}
-dlm_lock_resource * dlm_lookup_lock(dlm_ctxt *dlm, struct qstr *lockname)
+void dlm_get(dlm_ctxt *dlm)
{
- dlm_lock_resource *res;
- spin_lock(&dlm->spinlock);
- res = __dlm_lookup_lock(dlm, lockname);
- spin_unlock(&dlm->spinlock);
- return res;
+ BUG_ON(!dlm);
+
+ spin_lock(&dlm_domain_lock);
+ __dlm_get(dlm);
+ spin_unlock(&dlm_domain_lock);
}
+static void dlm_leave_domain(dlm_ctxt *dlm)
+{
+ spin_lock(&dlm_domain_lock);
+ BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
+ BUG_ON(!dlm->num_joins);
-/*
- * dlm_register_domain: one-time setup per "domain"
- */
-dlm_ctxt * dlm_register_domain(char *domain, char *group_name, u32 key)
+ dlm->num_joins--;
+ if (dlm->num_joins) {
+ spin_unlock(&dlm_domain_lock);
+ return;
+ }
+
+ dlmprintk("shutting down domain %s\n", dlm->name);
+
+ dlm->dlm_state = DLM_CTXT_LEAVING;
+ spin_unlock(&dlm_domain_lock);
+
+ /* TODO: Any network communication involving shutting this guy
+ * down happens here. */
+
+ hb_unregister_callback(HB_NODE_UP_CB, dlm_hb_node_up_cb, dlm);
+ hb_unregister_callback(HB_NODE_DOWN_CB, dlm_hb_node_down_cb, dlm);
+
+ /* if the network code had any unregister calls, they would be here. */
+
+ if (dlm->thread.task)
+ dlm_complete_thread(dlm);
+
+ /* We've left the domain. Now we can take ourselves out of the
+ * list and allow the kref stuff to help us free the
+ * memory. */
+ spin_lock(&dlm_domain_lock);
+ list_del_init(&dlm->list);
+ spin_unlock(&dlm_domain_lock);
+
+ /* Wake up anyone waiting for us to remove this domain */
+ wake_up(&dlm_domain_events);
+}
+
+void dlm_unregister_domain(dlm_ctxt *dlm)
{
- dlm_ctxt *tmp = NULL, *dlm = NULL;
- struct inode *group = NULL;
- int tmpret, i;
+ BUG_ON(!dlm);
- if (strlen(domain) > NM_MAX_NAME_LEN) {
- dlmprintk0("domain name length too long\n");
- goto leave;
- }
+ dlm_leave_domain(dlm);
+ dlm_put(dlm);
+}
+EXPORT_SYMBOL(dlm_unregister_domain);
- group = nm_get_group_by_name(group_name);
- if (!group) {
- dlmprintk("no nm group %s for domain %s!\n",
- group_name, domain);
- goto leave;
- }
+static dlm_ctxt *dlm_alloc_ctxt(const char *domain,
+ struct inode *group,
+ u32 key)
+{
+ int i;
+ dlm_ctxt *dlm = NULL;
- /*
- * TODO: should i do some type of dlm-group-join business here?
- * I need to have new nodes communicate with other dlm nodes to
- * wait until their master lists are empty before allowing me to
- * join. does this belong here? or in hb?
- * seems like stuff that heartbeat shouldn't care about, cuz we
- * would actually be preventing a node that is "UP" from being
- * part of the dlm group.
- */
- dlm = dlm_lookup_domain(domain);
- if (dlm) {
- /* found a pre-existing domain */
+ /* if for some reason we can't get a reference on the group
+ * inode (required) then don't even try the rest. */
+ if (!igrab(group))
goto leave;
- }
dlm = kmalloc(sizeof(dlm_ctxt), GFP_KERNEL);
- if (dlm == NULL) {
+ if (!dlm) {
dlmprintk0("could not allocate dlm_ctxt\n");
goto leave;
}
memset(dlm, 0, sizeof(dlm_ctxt));
+
dlm->name = kmalloc(strlen(domain) + 1, GFP_KERNEL);
if (dlm->name == NULL) {
+ dlmprintk0("could not allocate dlm domain name\n");
kfree(dlm);
dlm = NULL;
- dlmprintk0("could not allocate dlm domain name\n");
goto leave;
}
+
dlm->resources = (struct list_head *) __get_free_page(GFP_KERNEL);
if (!dlm->resources) {
+ dlmprintk0("could not allocate dlm hash\n");
kfree(dlm->name);
kfree(dlm);
dlm = NULL;
- dlmprintk0("could not allocate dlm hash\n");
goto leave;
}
memset(dlm->resources, 0, PAGE_SIZE);
-
+
for (i=0; i<DLM_HASH_SIZE; i++)
INIT_LIST_HEAD(&dlm->resources[i]);
strcpy(dlm->name, domain);
+ dlm->key = key;
+
spin_lock_init(&dlm->spinlock);
INIT_LIST_HEAD(&dlm->list);
INIT_LIST_HEAD(&dlm->dirty_list);
@@ -508,167 +641,231 @@
util_thread_info_init(&dlm->thread);
util_thread_info_init(&dlm->reco.thread);
init_rwsem(&dlm->recovery_sem);
+
+ /* this eats the reference we got above. */
dlm->group = group;
dlm->group_index = nm_this_node(group);
- dlm->key = key;
+
dlm->reco.new_master = NM_INVALID_SLOT_NUM;
dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
dlm->reco.sending_node = NM_INVALID_SLOT_NUM;
dlm->reco.next_seq = 0;
- spin_lock(&dlm_domain_lock);
- tmp = __dlm_lookup_domain(domain);
- if (tmp) {
- spin_unlock(&dlm_domain_lock);
- /* found a pre-existing domain */
- kfree(dlm->name);
- kfree(dlm);
- dlm = NULL;
- goto leave;
- }
+ kref_init(&dlm->dlm_refs, dlm_ctxt_release);
+ dlm->dlm_state = DLM_CTXT_NEW;
- /* add the new domain */
- list_add_tail(&dlm->list, &dlm_domains);
- spin_unlock(&dlm_domain_lock);
+ dlmprintk("context init: refcount %u\n",
+ atomic_read(&dlm->dlm_refs.refcount));
- tmpret = hb_register_callback(HB_NODE_DOWN_CB, dlm_hb_node_down_cb, dlm,
+leave:
+ return dlm;
+}
+
+static int dlm_join_domain(dlm_ctxt *dlm)
+{
+ int status;
+
+ BUG_ON(!dlm);
+
+ dlmprintk("Join domain %s\n", dlm->name);
+
+ status = hb_register_callback(HB_NODE_DOWN_CB,
+ dlm_hb_node_down_cb,
+ dlm,
DLM_HB_NODE_DOWN_PRI);
- if (tmpret)
- goto error;
- tmpret = hb_register_callback(HB_NODE_UP_CB, dlm_hb_node_up_cb, dlm,
+ if (status)
+ goto bail;
+
+ status = hb_register_callback(HB_NODE_UP_CB,
+ dlm_hb_node_up_cb,
+ dlm,
DLM_HB_NODE_UP_PRI);
- if (tmpret)
- goto error;
+ if (status)
+ goto bail;
- /* TODO: need to use hb_fill_node_map to fill a temporary votemap
- * then communicate with each of these nodes that I want to come up
- * FOR THIS DLM. there may be many nodes in this group heartbeating
- * but they may not care about this particular dlm instance. once
- * everyone has come back with a response that i have been added or
- * that they are not a member I can put together the REAL node map
- * for this dlm in dlm->node_map */
- /* TODO: I guess we can fill this here as a superset of possible nodes
- * so that the hb_callbacks above have something to work on in the
- * meantime, then trim out the nodes that are not part of this dlm
- * once we know */
- /* TODO: I may need to register a special net handler on insmod of dlm.o
- * with a key of 0 so that I can respond to requests even if I am not
- * part of a dlm group. this would still leave a gap in time between
- * the start of heartbeating and the insmod dlm.o, unless I change the
- * module loading stuff in clusterbo to include dlm.o (which would work
+ /* TODO: need to use hb_fill_node_map to fill a temporary
+ * votemap then communicate with each of these nodes that I
+ * want to come up FOR THIS DLM. there may be many nodes in
+ * this group heartbeating but they may not care about this
+ * particular dlm instance. once everyone has come back with
+ * a response that i have been added or that they are not a
+ * member I can put together the REAL node map for this dlm in
+ * dlm->node_map */
+ /* TODO: I guess we can fill this here as a superset of
+ * possible nodes so that the hb_callbacks above have
+ * something to work on in the meantime, then trim out the
+ * nodes that are not part of this dlm once we know */
+ /* TODO: I may need to register a special net handler on
+ * insmod of dlm.o with a key of 0 so that I can respond to
+ * requests even if I am not part of a dlm group. this would
+ * still leave a gap in time between the start of heartbeating
+ * and the insmod dlm.o, unless I change the module loading
+ * stuff in clusterbo to include dlm.o (which would work
* fine) */
#warning WRONG WRONG WRONG
- tmpret = hb_fill_node_map(group, dlm->node_map, NM_MAX_NODES);
- if (tmpret)
- goto error;
+ status = hb_fill_node_map(dlm->group, dlm->node_map,
+ sizeof(dlm->node_map));
+ if (status)
+ goto bail;
- dlmprintk("hb_fill_node_map returned node map:\n");
- BUG_ON(ARRAY_SIZE(dlm->node_map) & 3); /* better be mult of 4 :) */
- for(i = 0; i < ARRAY_SIZE(dlm->node_map); i += 4)
- dlmprintk("%0lx%0lx%0lx%0lx\n",
- dlm->node_map[i], dlm->node_map[i + 1],
- dlm->node_map[i + 2], dlm->node_map[i + 3]);
-
-#if 0
- tmpret = net_register_handler("reco-request",
- DLM_NET_RECOVERY_REQUEST_MSG_TYPE,
- key, sizeof(dlm_reco_request),
- dlm_recovery_request_handler, dlm);
- if (tmpret)
- goto error;
- tmpret = net_register_handler("reco-lock-arr-req",
- DLM_NET_RECOVERY_LOCK_ARR_REQ_MSG_TYPE,
- key, sizeof(dlm_reco_lock_arr_req),
- dlm_recovery_lock_arr_req_handler, dlm);
- if (tmpret)
- goto error;
- tmpret = net_register_handler("reco-response",
- DLM_NET_RECOVERY_RESPONSE_MSG_TYPE,
- key, sizeof(dlm_reco_response),
- dlm_recovery_response_handler, dlm);
- if (tmpret)
- goto error;
-#endif
-
- tmpret = net_register_handler(DLM_MASTER_REQUEST_RESP_MSG, key, 0,
+ status = net_register_handler(DLM_MASTER_REQUEST_RESP_MSG, dlm->key, 0,
sizeof(dlm_master_request_resp),
dlm_master_request_resp_handler,
dlm);
- if (tmpret)
- goto error;
+ if (status)
+ goto bail;
- tmpret = net_register_handler(DLM_MASTER_REQUEST_MSG, key, 0,
+ status = net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key, 0,
sizeof(dlm_master_request),
dlm_master_request_handler,
dlm);
+ if (status)
+ goto bail;
- if (tmpret)
- goto error;
-
- tmpret = net_register_handler(DLM_ASSERT_MASTER_MSG, key, 0,
+ status = net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key, 0,
sizeof(dlm_assert_master),
dlm_assert_master_handler,
dlm);
- if (tmpret)
- goto error;
- tmpret = net_register_handler(DLM_CREATE_LOCK_MSG, key, 0,
+ if (status)
+ goto bail;
+
+ status = net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key, 0,
sizeof(dlm_create_lock),
dlm_create_lock_handler,
dlm);
- if (tmpret)
- goto error;
- tmpret = net_register_handler(DLM_CONVERT_LOCK_MSG, key,
+ if (status)
+ goto bail;
+
+ status = net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
NET_HND_VAR_LEN,
DLM_CONVERT_LOCK_MAX_LEN,
dlm_convert_lock_handler,
dlm);
- if (tmpret)
- goto error;
+ if (status)
+ goto bail;
- tmpret = net_register_handler(DLM_UNLOCK_LOCK_MSG, key,
+ status = net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
NET_HND_VAR_LEN,
DLM_UNLOCK_LOCK_MAX_LEN,
dlm_unlock_lock_handler,
dlm);
- if (tmpret)
- goto error;
-
- tmpret = net_register_handler(DLM_PROXY_AST_MSG, key,
+ if (status)
+ goto bail;
+
+ status = net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
NET_HND_VAR_LEN,
DLM_PROXY_AST_MAX_LEN,
dlm_proxy_ast_handler,
dlm);
- if (tmpret)
- goto error;
+ if (status)
+ goto bail;
- tmpret = dlm_launch_thread(dlm);
- if (tmpret == 0)
- goto leave;
+ status = dlm_launch_thread(dlm);
+ if (status < 0) {
+ dlmprintk("could not launch dlm thread!\n");
+ goto bail;
+ }
-error:
- hb_unregister_callback(HB_NODE_UP_CB, dlm_hb_node_up_cb, dlm);
- hb_unregister_callback(HB_NODE_DOWN_CB, dlm_hb_node_down_cb, dlm);
spin_lock(&dlm_domain_lock);
- list_del(&dlm->list);
+ dlm->num_joins++;
+ dlm->dlm_state = DLM_CTXT_JOINED;
spin_unlock(&dlm_domain_lock);
- free_page((unsigned long)dlm->resources);
- kfree(dlm->name);
- kfree(dlm);
+
+ status = 0;
+bail:
+ wake_up(&dlm_domain_events);
+
+ return status;
+}
+
+/*
+ * dlm_register_domain: one-time setup per "domain"
+ */
+dlm_ctxt * dlm_register_domain(const char *domain,
+ const char *group_name,
+ u32 key)
+{
+ int ret;
+ dlm_ctxt *dlm = NULL;
+ dlm_ctxt *new_ctxt = NULL;
+ struct inode *group = NULL;
+
+ if (strlen(domain) > NM_MAX_NAME_LEN) {
+ dlmprintk0("domain name length too long\n");
+ goto leave;
+ }
+
+ group = nm_get_group_by_name(group_name);
+ if (!group) {
+ dlmprintk("no nm group %s for domain %s!\n",
+ group_name, domain);
+ goto leave;
+ }
+
+ dlmprintk("register called for domain \"%s\"\n", domain);
+
+retry:
dlm = NULL;
+ if (signal_pending(current))
+ goto leave;
+ spin_lock(&dlm_domain_lock);
+
+ dlm = __dlm_lookup_domain(domain);
+ if (dlm) {
+ if (dlm->dlm_state != DLM_CTXT_JOINED) {
+ spin_unlock(&dlm_domain_lock);
+
+ dlmprintk("This ctxt is not joined yet!\n");
+ wait_event_interruptible(dlm_domain_events,
+ dlm_wait_on_domain_helper(
+ domain));
+ goto retry;
+ }
+
+ __dlm_get(dlm);
+ dlm->num_joins++;
+
+ spin_unlock(&dlm_domain_lock);
+ goto leave;
+ }
+
+ /* doesn't exist */
+ if (!new_ctxt) {
+ spin_unlock(&dlm_domain_lock);
+
+ new_ctxt = dlm_alloc_ctxt(domain, group, key);
+ if (new_ctxt)
+ goto retry;
+ goto leave;
+ }
+
+ /* a little variable switch-a-roo here... */
+ dlm = new_ctxt;
+ new_ctxt = NULL;
+
+ /* add the new domain */
+ list_add_tail(&dlm->list, &dlm_domains);
+ spin_unlock(&dlm_domain_lock);
+
+ ret = dlm_join_domain(dlm);
+ if (ret) {
+ dlmprintk("return code %d from join_domain!\n", ret);
+ dlm_put(dlm);
+ dlm = NULL;
+ }
+
leave:
- if (!dlm && group)
- iput(group);
+ if (new_ctxt)
+ dlm_free_ctxt_mem(new_ctxt);
+
+ if (group)
+ iput(group);
+
return dlm;
}
EXPORT_SYMBOL(dlm_register_domain);
-void dlm_unregister_domain(dlm_ctxt *dlm)
-{
- // fill me in please
-}
-EXPORT_SYMBOL(dlm_unregister_domain);
-
void dlm_init_lockres(dlm_lock_resource *res, struct qstr *lockname)
{
memset(res, 0, sizeof(dlm_lock_resource));
Modified: trunk/fs/ocfs2/dlm/dlmmod.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmod.h 2005-02-18 00:12:37 UTC (rev 1890)
+++ trunk/fs/ocfs2/dlm/dlmmod.h 2005-02-18 00:13:17 UTC (rev 1891)
@@ -28,8 +28,8 @@
#ifndef CLUSTER_DLMMOD_H
#define CLUSTER_DLMMOD_H
+#include <linux/kref.h>
-
#if 0
#define dlmprintk(x, arg...)
#define dlmprintk0(x)
@@ -185,6 +185,11 @@
unsigned long node_map[BITS_TO_LONGS(NM_MAX_NODES)];
} dlm_recovery_ctxt;
+typedef enum _dlm_ctxt_state {
+ DLM_CTXT_NEW = 0,
+ DLM_CTXT_LEAVING,
+ DLM_CTXT_JOINED
+} dlm_ctxt_state;
struct _dlm_ctxt
{
@@ -201,6 +206,10 @@
unsigned long node_map[BITS_TO_LONGS(NM_MAX_NODES)];
unsigned long recovery_map[BITS_TO_LONGS(NM_MAX_NODES)];
dlm_recovery_ctxt reco;
+ /* dlm_refs and dlm_state are protected by dlm_domain_lock */
+ struct kref dlm_refs;
+ dlm_ctxt_state dlm_state;
+ unsigned int num_joins;
};
#define DLM_LOCK_RES_UNINITED 0x00000001
@@ -557,9 +566,14 @@
return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 0);
}
+dlm_ctxt * dlm_register_domain(const char *domain,
+ const char *group_name,
+ u32 key);
+void dlm_unregister_domain(dlm_ctxt *dlm);
+void dlm_get(dlm_ctxt *dlm);
+void dlm_put(dlm_ctxt *dlm);
+dlm_ctxt *dlm_grab(dlm_ctxt *dlm);
-dlm_ctxt * dlm_register_domain(char *domain, char *group_name, u32 key);
-void dlm_unregister_domain(dlm_ctxt *dlm);
dlm_lock_resource * dlm_get_lock_resource(dlm_ctxt *dlm, struct qstr *lockname, int flags);
int dlm_lock_owner_broadcast(dlm_ctxt *dlm, dlm_lock_resource *res);
int dlm_refresh_lock_resource(dlm_ctxt *dlm, dlm_lock_resource *res);
@@ -572,7 +586,6 @@
int dlm_heartbeat_init(dlm_ctxt *dlm);
dlm_lock_resource * dlm_lookup_lock(dlm_ctxt *dlm, struct qstr *lockname);
-dlm_ctxt * dlm_lookup_domain(char *domain);
void dlm_hb_node_down_cb(struct inode *group, struct inode *node, int idx, void *data);
void dlm_hb_node_up_cb(struct inode *group, struct inode *node, int idx, void *data);
Modified: trunk/fs/ocfs2/dlm/dlmrecovery.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmrecovery.c 2005-02-18 00:12:37 UTC (rev 1890)
+++ trunk/fs/ocfs2/dlm/dlmrecovery.c 2005-02-18 00:13:17 UTC (rev 1891)
@@ -833,11 +833,11 @@
void dlm_hb_node_down_cb(struct inode *group, struct inode *node, int idx, void *data)
{
- //int ret;
- //struct inode *group = ptr1;
- //struct inode *node = ptr2;
dlm_ctxt *dlm = data;
-
+
+ if (!dlm_grab(dlm))
+ return;
+
spin_lock(&dlm->spinlock);
if (!test_bit(idx, dlm->node_map))
@@ -846,7 +846,7 @@
dlmprintk("node %u being removed from nodemap!\n", idx);
clear_bit(idx, dlm->node_map);
}
-
+
if (test_bit(idx, dlm->recovery_map))
dlmprintk("node %u already added to recovery map!\n", idx);
else {
@@ -854,14 +854,17 @@
dlm_do_local_recovery_cleanup(dlm, idx, 1);
}
spin_unlock(&dlm->spinlock);
+
+ dlm_put(dlm);
}
void dlm_hb_node_up_cb(struct inode *group, struct inode *node, int idx, void *data)
{
- //struct inode *group = ptr1;
- //struct inode *node = ptr2;
dlm_ctxt *dlm = data;
+ if (!dlm_grab(dlm))
+ return;
+
spin_lock(&dlm->spinlock);
if (test_bit(idx, dlm->recovery_map)) {
@@ -876,6 +879,8 @@
}
spin_unlock(&dlm->spinlock);
+
+ dlm_put(dlm);
}
int __dlm_hb_node_dead(dlm_ctxt *dlm, int node)
More information about the Ocfs2-commits
mailing list