[Ocfs2-commits] khackel commits r1696 - in trunk: cluster src
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Tue Dec 7 19:34:06 CST 2004
Author: khackel
Date: 2004-12-07 19:34:04 -0600 (Tue, 07 Dec 2004)
New Revision: 1696
Modified:
trunk/cluster/dlmmod.c
trunk/cluster/dlmmod.h
trunk/cluster/heartbeat.c
trunk/cluster/tcp.c
trunk/cluster/tcp.h
trunk/src/vote.c
Log:
fixes several bugs in ocfs, tcp and dlm. THIS IS THE FIRST REVISION THAT MOUNTS ON TWO NODES. fear not, it will die in i_revalidate as soon as you ls ;-)
Modified: trunk/cluster/dlmmod.c
===================================================================
--- trunk/cluster/dlmmod.c 2004-12-07 20:17:58 UTC (rev 1695)
+++ trunk/cluster/dlmmod.c 2004-12-08 01:34:04 UTC (rev 1696)
@@ -98,6 +98,7 @@
typedef struct _dlm_create_lock
{
+ u32 flags; // TODO: reduce the size of this
u16 node_idx;
s8 requested_type;
u8 namelen;
@@ -107,6 +108,7 @@
typedef struct _dlm_convert_lock
{
+ u32 flags; // TODO: reduce the size of this
u16 node_idx;
s8 requested_type;
u8 namelen;
@@ -116,7 +118,7 @@
typedef struct _dlm_unlock_lock
{
- u32 flags;
+ u32 flags; // TODO: reduce the size of this
u16 node_idx;
u8 namelen;
u8 name[NM_MAX_NAME_LEN];
@@ -425,6 +427,8 @@
spin_unlock(&res->spinlock);
dlm_kick_thread(dlm, res);
if (!got_it && (flags & LKM_NOQUEUE)) {
+ dlmprintk("did not get NOQUEUE lock %*s at level %d\n",
+ res->lockname.len, res->lockname.name, lock->type);
return DLM_NOTQUEUED;
}
return DLM_NORMAL;
@@ -494,7 +498,6 @@
dlm_status status = DLM_NORMAL;
struct list_head *iter;
dlm_lock *tmplock=NULL;
- int remote_in_place = 0;
dlmprintk("type=%d, convert_type=%d, new convert_type=%d\n", lock->type, lock->convert_type, type);
@@ -543,11 +546,8 @@
/* fall thru to grant */
grant:
- if (lock->node != dlm->group_index) {
- dlmprintk0("no in-place convert for nonlocal locks :( see if this helps...\n");
- remote_in_place = 1;
- goto switch_queues;
- }
+ if (lock->node != dlm->group_index)
+ dlmprintk0("doing in-place convert for nonlocal lock\n");
/* immediately grant the new lock type */
//printk("doing in-place %sconvert from %d to %d\n",
@@ -575,6 +575,8 @@
switch_queues:
if (flags & LKM_NOQUEUE) {
+ dlmprintk("failed to convert NOQUEUE lock %*s from %d to %d...\n",
+ res->lockname.len, res->lockname.name, lock->type, type);
spin_unlock(&lock->spinlock);
spin_unlock(&res->spinlock);
return DLM_NOTQUEUED;
@@ -582,11 +584,7 @@
lock->convert_type = type;
list_del(&lock->list);
- /* make sure the remote in-place convert gets handled right away */
- if (remote_in_place)
- list_add(&lock->list, &res->converting);
- else
- list_add_tail(&lock->list, &res->converting);
+ list_add_tail(&lock->list, &res->converting);
spin_unlock(&lock->spinlock);
spin_unlock(&res->spinlock);
@@ -652,6 +650,8 @@
dlm_lock_resource *res;
dlm_lock *lock = NULL;
int call_ast = 0;
+
+ dlmprintk0("\n");
if (!lksb)
return DLM_BADARGS;
@@ -670,6 +670,9 @@
lock = lksb->lockid;
res = lock->lockres;
+ spin_lock(&res->spinlock);
+ spin_lock(&lock->spinlock);
+
status = dlmunlock_local(dlm, res, lock, lksb, flags, &call_ast);
if (call_ast)
(*unlockast)(data, lksb->status);
@@ -682,6 +685,8 @@
dlm_status status;
int free_lock = 0, remote_ready = 0;
int local = 0, remove = 0, regrant = 0;
+
+ dlmprintk0("\n");
/* according to spec and opendlm code
* flags & LKM_CANCEL != 0: must be converting or blocked
@@ -692,14 +697,16 @@
*call_ast = 0;
recheck:
- spin_lock(&res->spinlock);
- spin_lock(&lock->spinlock);
local = (res->owner == dlm->group_index);
+ dlmprintk0("checking flags...\n");
+
if (flags & LKM_CANCEL) {
+ dlmprintk0("cancel request\n");
/* cancel request */
if (dlm_lock_on_list(&res->blocked, lock)) {
+ dlmprintk0("on blocked list\n");
/* cancel this outright */
lksb->status = DLM_NORMAL;
status = DLM_NORMAL;
@@ -708,6 +715,7 @@
remove = 1;
regrant = 0;
} else if (dlm_lock_on_list(&res->converting, lock)) {
+ dlmprintk0("on converting list\n");
/* cancel the request, put back on granted */
lksb->status = DLM_NORMAL;
status = DLM_NORMAL;
@@ -716,6 +724,7 @@
remove = 1;
regrant = 1;
} else if (dlm_lock_on_list(&res->granted, lock)) {
+ dlmprintk0("on granted list\n");
/* too late, already granted. DLM_CANCELGRANT */
lksb->status = DLM_CANCELGRANT;
status = DLM_NORMAL;
@@ -725,6 +734,7 @@
regrant = 0;
} else {
/* err. um. eek! */
+ dlmprintk0("on NO list!\n");
printk("lock to cancel is not on any list! bug!\n");
lksb->status = DLM_IVLOCKID;
status = DLM_IVLOCKID;
@@ -734,8 +744,11 @@
regrant = 0;
}
} else {
+ dlmprintk0("unlock request\n");
+
/* unlock request */
if (!dlm_lock_on_list(&res->granted, lock)) {
+ dlmprintk0("not on granted list\n");
lksb->status = DLM_DENIED;
status = DLM_DENIED;
free_lock = 0;
@@ -743,6 +756,7 @@
remove = 0;
regrant = 0;
} else {
+ dlmprintk0("on granted list\n");
/* unlock granted lock */
lksb->status = DLM_NORMAL;
status = DLM_NORMAL;
@@ -753,10 +767,14 @@
}
}
+ dlmprintk0("checking local/remote\n");
+
if (!local) {
+ dlmprintk0("nonlocal\n");
/* safe since nothing can change on this
- * seconndary queue without lockres lock */
+ * secondary queue without lockres lock */
spin_unlock(&lock->spinlock);
+ dlmprintk0("unlocked lock spinlock\n");
/* if there was an outstanding change on the
* lockres, conditions could have changed */
@@ -765,38 +783,56 @@
__dlm_wait_on_lockres(res);
res->state |= DLM_LOCK_RES_IN_PROGRESS;
remote_ready = 1;
- spin_unlock(&res->spinlock);
+ dlmprintk0("unlocked lockres spinlock, not ready, rechecking!\n");
+ spin_lock(&lock->spinlock);
goto recheck;
}
if (res->state & DLM_LOCK_RES_RECOVERING) {
/* !!!!! */
+ dlmprintk0("unlocking lock spinlock\n");
spin_unlock(&res->spinlock);
+ dlmprintk0("lockres is recovering!\n");
return DLM_RECOVERING;
} else {
+ dlmprintk0("unlocking lockres spinlock\n");
spin_unlock(&res->spinlock);
status = dlm_send_remote_unlock_request(dlm, res, lock, lksb, flags);
+ dlmprintk0("locking lockres spinlock\n");
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
}
+ dlmprintk0("locking lock spinlock\n");
spin_lock(&lock->spinlock);
}
- if (remove)
+ if (remove) {
+ dlmprintk0("removing lock from list\n");
list_del(&lock->list);
- if (regrant)
+ }
+ if (regrant) {
+ dlmprintk0("moving lock to granted list\n");
list_add_tail(&lock->list, &res->granted);
+ }
+ dlmprintk0("unlocking lock spinlock\n");
spin_unlock(&lock->spinlock);
+ dlmprintk0("unlocking lockres spinlock\n");
spin_unlock(&res->spinlock);
+
+ dlmprintk0("done with locks\n");
if (free_lock) {
+ dlmprintk0("need to free lock\n");
+
#warning this must change to proper refcounting
/* TODO: refcounting... tho for now this will work because
* the middle layer is keeping track of everything */
kfree(lock);
lksb->lockid = NULL;
+ dlmprintk0("done freeing\n");
}
+ dlmprintk("aha done with everything, returning %d\n", status);
return status;
}
@@ -925,6 +961,8 @@
dlm_lock_resource *tmpres=NULL;
struct list_head *bucket;
+ dlmprintk0("\n");
+
bucket = &(dlm->resources[lockname->hash & DLM_HASH_MASK]);
/* check for pre-existing lock */
@@ -1435,6 +1473,7 @@
create.requested_type = lock->type;
create.cookie = lock->cookie;
create.namelen = res->lockname.len;
+ create.flags = flags;
strncpy(create.name, res->lockname.name, create.namelen);
ret = DLM_NOLOCKMGR;
@@ -1489,8 +1528,16 @@
if (res) {
spin_lock(&res->spinlock);
newlock->lockres = res;
- status = dlmlock_local(dlm, res, newlock, 0);
+ status = dlmlock_local(dlm, res, newlock, create->flags);
spin_unlock(&res->spinlock);
+
+ if (create->flags & LKM_NOQUEUE &&
+ status == DLM_NOTQUEUED) {
+ dlmprintk("failed to get NOQUEUE lock %*s at level %d...\n",
+ res->lockname.len, res->lockname.name, newlock->type);
+ /* never added to blocked queue, just delete */
+ kfree(newlock);
+ }
}
return status;
@@ -1512,6 +1559,7 @@
convert.requested_type = type;
convert.cookie = lock->cookie;
convert.namelen = res->lockname.len;
+ convert.flags = flags;
strncpy(convert.name, res->lockname.name, convert.namelen);
ret = DLM_NOLOCKMGR;
@@ -1565,7 +1613,7 @@
lock->node == convert->node_idx) {
found = 1;
rdtsc(u5.hilo[0], u5.hilo[1]);
- status = dlmconvert_local(dlm, res, lock, 0, convert->requested_type);
+ status = dlmconvert_local(dlm, res, lock, convert->flags, convert->requested_type);
rdtsc(u6.hilo[0], u6.hilo[1]);
break;
}
Modified: trunk/cluster/dlmmod.h
===================================================================
--- trunk/cluster/dlmmod.h 2004-12-07 20:17:58 UTC (rev 1695)
+++ trunk/cluster/dlmmod.h 2004-12-08 01:34:04 UTC (rev 1696)
@@ -120,48 +120,48 @@
}
typedef enum _dlm_status {
- DLM_NORMAL, /* request in progress */
- DLM_GRANTED, /* request granted */
- DLM_DENIED, /* request denied */
- DLM_DENIED_NOLOCKS, /* request denied, out of system resources */
- DLM_WORKING, /* async request in progress */
- DLM_BLOCKED, /* lock request blocked */
- DLM_BLOCKED_ORPHAN, /* lock request blocked by a orphan lock*/
- DLM_DENIED_GRACE_PERIOD, /* topological change in progress */
- DLM_SYSERR, /* system error */
- DLM_NOSUPPORT, /* unsupported */
- DLM_CANCELGRANT, /* can't cancel convert: already granted */
- DLM_IVLOCKID, /* bad lockid */
- DLM_SYNC, /* synchronous request granted */
- DLM_BADTYPE, /* bad resource type */
- DLM_BADRESOURCE, /* bad resource handle */
- DLM_MAXHANDLES, /* no more resource handles */
- DLM_NOCLINFO, /* can't contact cluster manager */
- DLM_NOLOCKMGR, /* can't contact lock manager */
- DLM_NOPURGED, /* can't contact purge daemon */
- DLM_BADARGS, /* bad api args */
- DLM_VOID, /* no status */
- DLM_NOTQUEUED, /* NOQUEUE was specified and request failed */
- DLM_IVBUFLEN, /* invalid resource name length */
- DLM_CVTUNGRANT, /* attempted to convert ungranted lock */
- DLM_BADPARAM, /* invalid lock mode specified */
- DLM_VALNOTVALID, /* value block has been invalidated */
- DLM_REJECTED, /* request rejected, unrecognized client */
- DLM_ABORT, /* blocked lock request cancelled */
- DLM_CANCEL, /* conversion request cancelled */
- DLM_IVRESHANDLE, /* invalid resource handle */
- DLM_DEADLOCK, /* deadlock recovery refused this request */
- DLM_DENIED_NOASTS, /* failed to allocate AST */
- DLM_FORWARD, /* request must wait for primary's response */
- DLM_TIMEOUT, /* timeout value for lock has expired */
- DLM_IVGROUPID, /* invalid group specification */
- DLM_VERS_CONFLICT, /* version conflicts prevent request handling */
- DLM_BAD_DEVICE_PATH, /* Locks device does not exist or path wrong */
- DLM_NO_DEVICE_PERMISSION, /* Client has insufficient pers for device */
- DLM_NO_CONTROL_DEVICE, /* Cannot set options on opened device */
- DLM_MAXSTATS, /* upper limit for return code validation */
+ DLM_NORMAL, /* 0: request in progress */
+ DLM_GRANTED, /* 1: request granted */
+ DLM_DENIED, /* 2: request denied */
+ DLM_DENIED_NOLOCKS, /* 3: request denied, out of system resources */
+ DLM_WORKING, /* 4: async request in progress */
+ DLM_BLOCKED, /* 5: lock request blocked */
+ DLM_BLOCKED_ORPHAN, /* 6: lock request blocked by a orphan lock*/
+ DLM_DENIED_GRACE_PERIOD, /* 7: topological change in progress */
+ DLM_SYSERR, /* 8: system error */
+ DLM_NOSUPPORT, /* 9: unsupported */
+ DLM_CANCELGRANT, /* 10: can't cancel convert: already granted */
+ DLM_IVLOCKID, /* 11: bad lockid */
+ DLM_SYNC, /* 12: synchronous request granted */
+ DLM_BADTYPE, /* 13: bad resource type */
+ DLM_BADRESOURCE, /* 14: bad resource handle */
+ DLM_MAXHANDLES, /* 15: no more resource handles */
+ DLM_NOCLINFO, /* 16: can't contact cluster manager */
+ DLM_NOLOCKMGR, /* 17: can't contact lock manager */
+ DLM_NOPURGED, /* 18: can't contact purge daemon */
+ DLM_BADARGS, /* 19: bad api args */
+ DLM_VOID, /* 20: no status */
+ DLM_NOTQUEUED, /* 21: NOQUEUE was specified and request failed */
+ DLM_IVBUFLEN, /* 22: invalid resource name length */
+ DLM_CVTUNGRANT, /* 23: attempted to convert ungranted lock */
+ DLM_BADPARAM, /* 24: invalid lock mode specified */
+ DLM_VALNOTVALID, /* 25: value block has been invalidated */
+ DLM_REJECTED, /* 26: request rejected, unrecognized client */
+ DLM_ABORT, /* 27: blocked lock request cancelled */
+ DLM_CANCEL, /* 28: conversion request cancelled */
+ DLM_IVRESHANDLE, /* 29: invalid resource handle */
+ DLM_DEADLOCK, /* 30: deadlock recovery refused this request */
+ DLM_DENIED_NOASTS, /* 31: failed to allocate AST */
+ DLM_FORWARD, /* 32: request must wait for primary's response */
+ DLM_TIMEOUT, /* 33: timeout value for lock has expired */
+ DLM_IVGROUPID, /* 34: invalid group specification */
+ DLM_VERS_CONFLICT, /* 35: version conflicts prevent request handling */
+ DLM_BAD_DEVICE_PATH, /* 36: Locks device does not exist or path wrong */
+ DLM_NO_DEVICE_PERMISSION, /* 37: Client has insufficient pers for device */
+ DLM_NO_CONTROL_DEVICE, /* 38: Cannot set options on opened device */
+ DLM_MAXSTATS, /* 39: upper limit for return code validation */
- DLM_RECOVERING /* our lame addition to allow caller to fail a lock
+ DLM_RECOVERING /* 40: our lame addition to allow caller to fail a lock
request if it is being recovered */
} dlm_status;
Modified: trunk/cluster/heartbeat.c
===================================================================
--- trunk/cluster/heartbeat.c 2004-12-07 20:17:58 UTC (rev 1695)
+++ trunk/cluster/heartbeat.c 2004-12-08 01:34:04 UTC (rev 1696)
@@ -191,6 +191,11 @@
bh = slot->bh;
node = slot->inode;
+ if (!node) {
+ printk("no inode in slot %d!\n", idx);
+ idx++;
+ continue;
+ }
ino = nm_get_node_global_index(node);
if (ino == nm_this_node(group)) {
@@ -235,6 +240,11 @@
while ((slot = nm_iterate_group_disk_slots(group, &idx))) {
bh = slot->bh;
node = slot->inode;
+ if (!node) {
+ printk("no inode in slot %d!\n", idx);
+ idx++;
+ continue;
+ }
ino = nm_get_node_global_index(node);
Modified: trunk/cluster/tcp.c
===================================================================
--- trunk/cluster/tcp.c 2004-12-07 20:17:58 UTC (rev 1695)
+++ trunk/cluster/tcp.c 2004-12-08 01:34:04 UTC (rev 1696)
@@ -1059,6 +1059,7 @@
netprintk0("no handler for message.\n");
goto error;
}
+printk("about to dispatch message\n");
rdtsc(u1.hilo[0], u1.hilo[1]);
err = net_dispatch_message(inode, sock, &hdr, hnd);
rdtsc(u2.hilo[0], u2.hilo[1]);
@@ -1073,6 +1074,7 @@
#endif
hdr.status = err;
hdr.magic = NET_MSG_STATUS_MAGIC; // twiddle the magic
+printk("about to send return message, status=%d\n", err);
rdtsc(u3.hilo[0], u3.hilo[1]);
tmperr = net_send_tcp_msg(inode, sock, &hdr, sizeof(net_msg));
rdtsc(u4.hilo[0], u4.hilo[1]);
@@ -1089,6 +1091,7 @@
spin_lock(&net_list_lock);
list_add_tail(&net->list, &net_recv_list);
spin_unlock(&net_list_lock);
+printk("all done with this round, starting over\n");
goto start_over;
error:
Modified: trunk/cluster/tcp.h
===================================================================
--- trunk/cluster/tcp.h 2004-12-07 20:17:58 UTC (rev 1695)
+++ trunk/cluster/tcp.h 2004-12-08 01:34:04 UTC (rev 1696)
@@ -112,8 +112,9 @@
#define NET_MAX_MSG_LEN (8192)
-#define NET_ALREADY_CONNECTED 2
-#define NET_UNKNOWN_HOST 3
+/* RESERVED */
+#define NET_ALREADY_CONNECTED (0xfff0)
+#define NET_UNKNOWN_HOST (0xfff1)
static inline int net_is_valid_error_type(u32 err_type)
Modified: trunk/src/vote.c
===================================================================
--- trunk/src/vote.c 2004-12-07 20:17:58 UTC (rev 1695)
+++ trunk/src/vote.c 2004-12-08 01:34:04 UTC (rev 1696)
@@ -473,6 +473,7 @@
int status, i, remote_err;
ocfs2_net_wait_ctxt *w = NULL;
struct inode *remote_node;
+ int dequeued = 0;
w = ocfs2_new_net_wait_ctxt(osb, response_id);
if (!w) {
@@ -486,6 +487,7 @@
ocfs2_queue_net_wait_ctxt(osb, w);
i = ocfs_node_map_iterate(osb, &osb->mounted_map, 0);
+ printk("node map iterate: starting at %d, i am %d\n", i, osb->node_num);
while (i != OCFS_INVALID_NODE_NUM) {
if (i != osb->node_num) {
ocfs_node_map_set_bit(osb, &w->n_node_map, i);
@@ -521,8 +523,11 @@
goto bail;
}
}
+ i++;
i = ocfs_node_map_iterate(osb, &osb->mounted_map, i);
+ printk("node map iterate: next is %d, i am %d\n", i, osb->node_num);
}
+ printk("done sending, now waiting on responses...\n");
status = ocfs2_wait_on_vote_responses(osb, w);
if (status < 0) {
@@ -532,10 +537,12 @@
}
ocfs2_dequeue_net_wait_ctxt(osb, w);
+ dequeued = 1;
status = w->n_response;
bail:
if (w) {
- ocfs2_dequeue_net_wait_ctxt(osb, w);
+ if (!dequeued)
+ ocfs2_dequeue_net_wait_ctxt(osb, w);
kfree(w);
}
@@ -554,7 +561,9 @@
OCFS_ASSERT(type == OCFS2_VOTE_REQ_DELETE ||
type == OCFS2_VOTE_REQ_UNLINK ||
- type == OCFS2_VOTE_REQ_RENAME);
+ type == OCFS2_VOTE_REQ_RENAME ||
+ type == OCFS2_VOTE_REQ_UMOUNT ||
+ type == OCFS2_VOTE_REQ_MOUNT);
request = kmalloc(sizeof(*request), GFP_KERNEL);
if (!request) {
More information about the Ocfs2-commits
mailing list