[Ocfs2-commits] khackel commits r1718 - trunk/cluster
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Thu Dec 23 15:48:19 CST 2004
Author: khackel
Date: 2004-12-23 15:48:17 -0600 (Thu, 23 Dec 2004)
New Revision: 1718
Modified:
trunk/cluster/dlmmaster.c
trunk/cluster/dlmmod.c
trunk/cluster/dlmmod.h
trunk/cluster/dlmthread.c
trunk/cluster/nodemanager.c
trunk/cluster/nodemanager.h
trunk/cluster/tcp.c
trunk/cluster/tcp.h
Log:
* made lksb consistent on both the lock request node and the master node;
this means that a placeholder lksb is created for the requesting node when
the lock is created; used to copy status and/or lvb in/out from network
* made lksb->status updates consistent with this; fewer special cases now
* added lvb to convert, unlock and proxy ast structures; these are now
variable length network messages (may contain the extra 64 bytes for lvb)
* added flags field to lksb; this is not to be used by the caller; these flags
are used to determine if an lvb update needs to take place on this lock
(whether it is a DLM_LKSB_PUT_LVB or a DLM_LKSB_GET_LVB) and whether or not
this lksb was allocated by the kernel or by the caller of dlmlock
* added a couple of LKM_* FLAGS and made reserved values for the remaining
* in tcp, made it possible to send a multipart message by giving a net_data
array of pointer/len and calling net_send_message_arr
* fixed dumb bug with variable length net messages
* fixed dumb bug with net status messages and EINTR
Modified: trunk/cluster/dlmmaster.c
===================================================================
--- trunk/cluster/dlmmaster.c 2004-12-23 21:40:45 UTC (rev 1717)
+++ trunk/cluster/dlmmaster.c 2004-12-23 21:48:17 UTC (rev 1718)
@@ -414,6 +414,15 @@
atomic_set(&mle->woken, 0);
ret = util_wait_atomic_eq(&mle->wq, &mle->woken, 1, 5000);
+ if (ret == -EINTR) {
+ dlmprintk0("interrupted during lock mastery!\n");
+ break;
+ }
+ if (ret == -ETIMEDOUT) {
+ dlmprintk("timed out during lock mastery: vote_map=%08x, response_map=%08x\n",
+ mle->vote_map[0], mle->response_map[0]);
+ continue;
+ }
}
dlm_put_mle(mle);
Modified: trunk/cluster/dlmmod.c
===================================================================
--- trunk/cluster/dlmmod.c 2004-12-23 21:40:45 UTC (rev 1717)
+++ trunk/cluster/dlmmod.c 2004-12-23 21:48:17 UTC (rev 1718)
@@ -114,7 +114,9 @@
u8 namelen;
u8 name[NM_MAX_NAME_LEN];
u64 cookie;
+ s8 lvb[0];
} dlm_convert_lock;
+#define DLM_CONVERT_LOCK_MAX_LEN (sizeof(dlm_convert_lock) + DLM_LVB_LEN)
typedef struct _dlm_unlock_lock
{
@@ -123,18 +125,25 @@
u8 namelen;
u8 name[NM_MAX_NAME_LEN];
u64 cookie;
+ s8 lvb[0];
} dlm_unlock_lock;
+#define DLM_UNLOCK_LOCK_MAX_LEN (sizeof(dlm_unlock_lock) + DLM_LVB_LEN)
+
typedef struct _dlm_proxy_ast
{
+ u32 flags; // TODO: reduce the size of this
u16 node_idx;
u8 type;
u8 blocked_type;
u8 namelen;
u8 name[NM_MAX_NAME_LEN];
u64 cookie;
+ s8 lvb[0];
} dlm_proxy_ast;
+#define DLM_PROXY_AST_MAX_LEN (sizeof(dlm_proxy_ast) + DLM_LVB_LEN)
+
int dlm_create_lock_handler(net_msg *msg, u32 len, void *data);
int dlm_convert_lock_handler(net_msg *msg, u32 len, void *data);
int dlm_proxy_ast_handler(net_msg *msg, u32 len, void *data);
@@ -192,7 +201,6 @@
return;
} /* dlm_driver_exit */
-
dlm_status dlmlock(dlm_ctxt *dlm, int mode, dlm_lockstatus *lksb, int flags, char *name,
dlm_astlockfunc_t *ast, void *data, dlm_bastlockfunc_t *bast)
{
@@ -281,16 +289,7 @@
if (!recovery)
down_read(&dlm->recovery_sem);
-{
- union {
- u64 q;
- u32 hilo[2];
- } u1, u2;
- rdtsc(u1.hilo[0], u1.hilo[1]);
res = dlm_get_lock_resource(dlm, &q, flags);
- rdtsc(u2.hilo[0], u2.hilo[1]);
- dlmprintk("dlm_get_lock_resource took %llu cycles\n", u2.q-u1.q);
-}
if (!res) {
status = DLM_IVLOCKID;
goto up_error;
@@ -338,6 +337,7 @@
INIT_LIST_HEAD(&tmplock->ast_list);
spin_lock_init(&tmplock->spinlock);
tmplock->lockres = res;
+dlmprintk("creating lock: lock=%p res=%p\n", tmplock, res);
tmplock->type = type;
tmplock->convert_type = LKM_IVMODE;
tmplock->highest_blocked = LKM_IVMODE;
@@ -384,14 +384,11 @@
dlm_lock *tmplock;
int got_it = 0;
- BUG_ON(!lock);
- BUG_ON(!res);
- BUG_ON(!dlm);
+ DLM_ASSERT(lock);
+ DLM_ASSERT(res);
+ DLM_ASSERT(dlm);
+ DLM_ASSERT(lock->lksb);
- if (lock->node == dlm->group_index) {
- BUG_ON(!lock->lksb);
- }
-
dlmprintk("type=%d\n", lock->type);
list_for_each(iter, &res->granted) {
@@ -412,10 +409,7 @@
/* got it right away */
- /* if it is a remote request, proxy
- * handler will set the lksb status */
- if (lock->node == dlm->group_index)
- lock->lksb->status = DLM_NORMAL;
+ lock->lksb->status = DLM_NORMAL;
list_add_tail(&lock->list, &res->granted);
@@ -475,24 +469,28 @@
{
dlm_status status;
-{
- union {
- u64 q;
- u32 hilo[2];
- } u1, u2;
- rdtsc(u1.hilo[0], u1.hilo[1]);
-
if (res->owner == dlm->group_index)
status = dlmconvert_local(dlm, res, lock, flags, type);
else
status = dlmconvert_remote(dlm, res, lock, flags, type);
- rdtsc(u2.hilo[0], u2.hilo[1]);
- dlmprintk("dlmconvert took %llu cycles\n", u2.q-u1.q);
-}
return status;
}
+static inline const char * dlm_lock_mode_name(int mode)
+{
+ switch (mode) {
+ case LKM_EXMODE:
+ return "EX";
+ case LKM_PRMODE:
+ return "PR";
+ case LKM_NLMODE:
+ return "NL";
+ }
+ return "UNKNOWN";
+}
+
+
/* must be already holding lockres->spinlock */
dlm_status dlmconvert_local(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int flags, int type)
{
@@ -519,8 +517,32 @@
spin_unlock(&res->spinlock);
return DLM_DENIED;
}
+
+ if (flags & LKM_VALBLK) {
+ switch (lock->type) {
+ case LKM_EXMODE:
+ /* EX + LKM_VALBLK + convert == set lvb */
+ dlmprintk("will set lvb: converting %s->%s\n",
+ dlm_lock_mode_name(lock->type), dlm_lock_mode_name(type));
+ lock->lksb->flags |= DLM_LKSB_PUT_LVB;
+ break;
+ case LKM_PRMODE:
+ case LKM_NLMODE:
+ /* refetch if new level is not NL */
+ if (type > LKM_NLMODE) {
+ dlmprintk("will fetch new value into lvb: converting %s->%s\n",
+ dlm_lock_mode_name(lock->type), dlm_lock_mode_name(type));
+ lock->lksb->flags |= DLM_LKSB_GET_LVB;
+ } else {
+ dlmprintk("will NOT fetch new value into lvb: converting %s->%s\n",
+ dlm_lock_mode_name(lock->type), dlm_lock_mode_name(type));
+ flags &= ~(LKM_VALBLK);
+ }
+ break;
+ }
+ }
+
-
/* in-place downconvert? */
if (type <= lock->type)
goto grant;
@@ -547,20 +569,16 @@
/* fall thru to grant */
grant:
- if (lock->node != dlm->group_index)
+ /* immediately grant the new lock type */
+
+ lock->lksb->status = DLM_NORMAL;
+ if (lock->node == dlm->group_index)
dlmprintk0("doing in-place convert for nonlocal lock\n");
- /* immediately grant the new lock type */
- //dlmprintk("doing in-place %sconvert from %d to %d\n",
- // type > lock->type ? "up" : "down", lock->type, type);
+
lock->type = type;
status = DLM_NORMAL;
- /* if it is a remote request, proxy
- * handler will set the lksb status */
- if (lock->node == dlm->group_index)
- lock->lksb->status = DLM_NORMAL;
-
if (dlm_do_ast(dlm, res, lock) < 0)
dlmprintk0("eek\n");
@@ -619,6 +637,21 @@
BUG();
}
lock->convert_type = type;
+
+ if (flags & LKM_VALBLK) {
+ if (lock->type == LKM_EXMODE) {
+ flags |= LKM_PUT_LVB;
+ lock->lksb->flags |= DLM_LKSB_PUT_LVB;
+ } else {
+ if (lock->convert_type == LKM_NLMODE) {
+ dlmprintk0("erm, no point in specifying LKM_VALBLK if converting to NL\n");
+ flags &= ~LKM_VALBLK;
+ } else {
+ flags |= LKM_GET_LVB;
+ lock->lksb->flags |= DLM_LKSB_GET_LVB;
+ }
+ }
+ }
spin_unlock(&res->spinlock);
/* spec seems to say that you will get DLM_NORMAL when the lock
@@ -633,6 +666,7 @@
list_del(&lock->list);
list_add_tail(&lock->list, &res->granted);
lock->convert_type = LKM_IVMODE;
+ lock->lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
}
bail:
spin_unlock(&res->spinlock);
@@ -671,6 +705,10 @@
lock = lksb->lockid;
res = lock->lockres;
+
+ DLM_ASSERT(lock);
+ DLM_ASSERT(res);
+dlmprintk("lock=%p res=%p\n", lock, res);
spin_lock(&res->spinlock);
spin_lock(&lock->spinlock);
@@ -766,11 +804,16 @@
*call_ast = 1;
remove = 1;
regrant = 0;
+
+ /* make the final update to the lvb */
+ if (local && lksb->flags & DLM_LKSB_PUT_LVB)
+ memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN);
}
}
dlmprintk0("checking local/remote\n");
+ /* lockres mastered locally or remote? */
if (!local) {
dlmprintk0("nonlocal\n");
/* safe since nothing can change on this
@@ -809,6 +852,7 @@
wake_up(&res->wq);
}
+
if (remove) {
dlmprintk0("removing lock from list\n");
list_del(&lock->list);
@@ -835,6 +879,11 @@
lksb->lockid = NULL;
dlmprintk0("done freeing\n");
}
+
+ /* if cancel or unlock succeeded, lvb work is done */
+ if (status == DLM_NORMAL)
+ lksb->flags &= ~(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB);
+
dlmprintk("aha done with everything, returning %d\n", status);
return status;
}
@@ -848,6 +897,10 @@
dlm_status ret;
int status = 0;
+ int arrsz = 1, msgsz = sizeof(dlm_unlock_lock);
+ net_data nd[2];
+
+
dlmprintk0("\n");
memset(&unlock, 0, sizeof(unlock));
@@ -857,11 +910,22 @@
unlock.namelen = res->lockname.len;
strncpy(unlock.name, res->lockname.name, unlock.namelen);
+ nd[0].bytes = msgsz;
+ nd[0].ptr = &unlock;
+
+ if (flags & LKM_PUT_LVB) {
+ /* extra data to send if we are updating lvb */
+ nd[1].bytes = DLM_LVB_LEN;
+ nd[1].ptr = lock->lksb->lvb;
+ arrsz++;
+ msgsz += DLM_LVB_LEN;
+ }
+
ret = DLM_NOLOCKMGR;
lksb->status = DLM_NOLOCKMGR;
inode = nm_get_group_node_by_index(dlm->group, res->owner);
if (inode) {
- tmpret = net_send_message(DLM_UNLOCK_LOCK_MSG, dlm->key, &unlock, sizeof(unlock), inode, &status);
+ tmpret = net_send_message_arr(DLM_UNLOCK_LOCK_MSG, dlm->key, arrsz, nd, msgsz, inode, &status);
if (tmpret >= 0) {
// successfully sent and received
if (status == DLM_CANCELGRANT)
@@ -889,12 +953,25 @@
dlm_lock *lock;
dlm_status status = DLM_NORMAL;
int found = 0;
- dlm_lockstatus lksb;
+ dlm_lockstatus *lksb = NULL;
int ignore;
struct qstr lockname = { .name=unlock->name, .len=unlock->namelen };
+ u32 flags = unlock->flags;
- dlmprintk0("\n");
+ if (flags & LKM_GET_LVB) {
+ dlmprintk0("bad args! GET_LVB specified on unlock!\n");
+ return DLM_BADARGS;
+ }
+ if ((flags & (LKM_PUT_LVB|LKM_CANCEL)) ==
+ (LKM_PUT_LVB|LKM_CANCEL)) {
+ dlmprintk0("bad args! cannot modify lvb on a CANCEL request!\n");
+ return DLM_BADARGS;
+ }
+
+
+ dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : "none");
+
lockname.hash = full_name_hash(lockname.name, lockname.len);
status = DLM_IVLOCKID;
@@ -908,8 +985,15 @@
if (lock->cookie == unlock->cookie &&
lock->node == unlock->node_idx) {
found = 1;
+ lksb = lock->lksb;
/* unlockast only called on originating node */
- status = dlmunlock_local(dlm, res, lock, &lksb, unlock->flags, &ignore);
+ if (flags & LKM_PUT_LVB) {
+ lksb->flags |= DLM_LKSB_PUT_LVB;
+ memcpy(&lksb->lvb[0], &unlock->lvb[0], DLM_LVB_LEN);
+ }
+ status = dlmunlock_local(dlm, res, lock, lksb, flags, &ignore);
+ if (flags & LKM_PUT_LVB)
+ lksb->flags &= ~DLM_LKSB_PUT_LVB;
break;
}
}
@@ -925,7 +1009,7 @@
if (!found)
dlmprintk("failed to find lock to unlock! cookie=%llu\n", unlock->cookie);
else
- status = lksb.status;
+ status = lksb->status;
return status;
}
@@ -1178,30 +1262,34 @@
if (tmpret)
goto error;
netbuf += L1_CACHE_ALIGN(sizeof(dlm_create_lock));
- tmpret = net_register_handler(DLM_CONVERT_LOCK_MSG, key, 0,
- sizeof(dlm_convert_lock),
+ tmpret = net_register_handler(DLM_CONVERT_LOCK_MSG, key,
+ NET_HND_VAR_LEN,
+ DLM_CONVERT_LOCK_MAX_LEN,
dlm_convert_lock_handler,
dlm, netbuf);
if (tmpret)
goto error;
- netbuf += L1_CACHE_ALIGN(sizeof(dlm_convert_lock));
+ netbuf += L1_CACHE_ALIGN(DLM_CONVERT_LOCK_MAX_LEN);
- tmpret = net_register_handler(DLM_UNLOCK_LOCK_MSG, key, 0,
- sizeof(dlm_unlock_lock),
+ tmpret = net_register_handler(DLM_UNLOCK_LOCK_MSG, key,
+ NET_HND_VAR_LEN,
+ DLM_UNLOCK_LOCK_MAX_LEN,
dlm_unlock_lock_handler,
dlm, netbuf);
if (tmpret)
goto error;
- netbuf += L1_CACHE_ALIGN(sizeof(dlm_unlock_lock));
+ netbuf += L1_CACHE_ALIGN(DLM_UNLOCK_LOCK_MAX_LEN);
- tmpret = net_register_handler(DLM_PROXY_AST_MSG, key, 0,
- sizeof(dlm_proxy_ast),
+ tmpret = net_register_handler(DLM_PROXY_AST_MSG, key,
+ NET_HND_VAR_LEN,
+ DLM_PROXY_AST_MAX_LEN,
dlm_proxy_ast_handler,
dlm, netbuf);
if (tmpret)
goto error;
- netbuf += L1_CACHE_ALIGN(sizeof(dlm_proxy_ast));
-// dlmprintk("netbuf=%p net_buf=%p diff=%d\n", netbuf, dlm->net_buf, ((char *)netbuf - (char *)dlm->net_buf)); // currently 768
+ netbuf += L1_CACHE_ALIGN(DLM_PROXY_AST_MAX_LEN);
+
+dlmprintk("netbuf=%p net_buf=%p diff=%d\n", netbuf, dlm->net_buf, ((char *)netbuf - (char *)dlm->net_buf)); // currently 960
tmpret = dlm_launch_thread(dlm);
if (tmpret == 0)
@@ -1292,20 +1380,48 @@
int dlm_do_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock)
{
- dlm_astlockfunc_t *fn = lock->ast;
+ int ret;
+ dlm_astlockfunc_t *fn;
+ dlm_lockstatus *lksb;
+
dlmprintk0("\n");
+ DLM_ASSERT(lock);
+ DLM_ASSERT(res);
+ DLM_ASSERT(lock->lksb);
+
+ lksb = lock->lksb;
+ fn = lock->ast;
+
+ if (res->owner == dlm->group_index) {
+ /* this node is the lockres master */
+ if (lksb->flags & DLM_LKSB_GET_LVB) {
+ dlmprintk("getting lvb from lockres for %s node\n",
+ lock->node == dlm->group_index ? "master" :
+ "remote");
+ memcpy(lksb->lvb, res->lvb, DLM_LVB_LEN);
+ } else if (lksb->flags & DLM_LKSB_PUT_LVB) {
+ dlmprintk("setting lvb from lockres for %s node\n",
+ lock->node == dlm->group_index ? "master" :
+ "remote");
+ memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN);
+ }
+ }
+
+ ret = 0;
if (lock->node != dlm->group_index) {
- return dlm_send_proxy_ast(dlm, res, lock, DLM_AST, 0);
+ /* lock request came from another node
+ * go do the ast over there */
+ ret = dlm_send_proxy_ast(dlm, res, lock, DLM_AST, 0);
+ } else {
+ DLM_ASSERT(fn);
+ (*fn)(lock->astdata);
}
- if (!fn) {
- dlmprintk("eek! lock has no ast %*s! cookie=%llu\n",
- res->lockname.len, res->lockname.name, lock->cookie);
- return -EINVAL;
- }
- (*fn)(lock->astdata);
- return 0;
+
+ /* reset any lvb flags on the lksb */
+ lksb->flags &= ~(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB);
+ return ret;
}
@@ -1328,14 +1444,17 @@
return 0;
}
+
int dlm_send_proxy_ast(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, int type, int blocked_type)
{
- int ret = 0;
+ int ret = 0, arrsz = 1, msgsz = sizeof(dlm_proxy_ast);
dlm_proxy_ast past;
struct inode *inode = NULL;
+ net_data nd[2];
dlmprintk("to=%u, type=%d, blocked_type=%d\n", lock->node, type, blocked_type);
+ memset(&past, 0, sizeof(dlm_proxy_ast));
past.node_idx = dlm->group_index;
past.type = type;
past.blocked_type = blocked_type;
@@ -1343,10 +1462,20 @@
strncpy(past.name, res->lockname.name, past.namelen);
past.cookie = lock->cookie;
+ nd[0].bytes = msgsz;
+ nd[0].ptr = &past;
+ if (lock->lksb->flags & DLM_LKSB_GET_LVB) {
+ past.flags |= LKM_GET_LVB;
+ nd[1].bytes = DLM_LVB_LEN;
+ nd[1].ptr = lock->lksb->lvb;
+ arrsz++;
+ msgsz += DLM_LVB_LEN;
+ }
+
ret = -EINVAL;
inode = nm_get_group_node_by_index(dlm->group, lock->node);
if (inode) {
- ret = net_send_message(DLM_PROXY_AST_MSG, dlm->key, &past, sizeof(past), inode, NULL);
+ ret = net_send_message_arr(DLM_PROXY_AST_MSG, dlm->key, arrsz, nd, msgsz, inode, NULL);
iput(inode);
}
if (ret < 0) {
@@ -1365,7 +1494,17 @@
struct qstr lockname = { .name=past->name, .len=past->namelen };
struct list_head *iter, *head=NULL;
u64 cookie = past->cookie;
+ u32 flags = past->flags;
+ if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
+ (LKM_PUT_LVB|LKM_GET_LVB)) {
+ dlmprintk("both PUT and GET lvb specified\n");
+ return DLM_BADARGS;
+ }
+
+ dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" :
+ (flags & LKM_GET_LVB ? "get lvb" : "none"));
+
lockname.hash = full_name_hash(lockname.name, lockname.len);
dlmprintk("type=%d, blocked_type=%d\n", past->type, past->blocked_type);
@@ -1431,6 +1570,11 @@
lock->lksb->status = DLM_NORMAL;
+ /* if we requested the lvb, fetch it into our lksb now */
+ if (flags & LKM_GET_LVB) {
+ DLM_ASSERT(lock->lksb->flags & DLM_LKSB_GET_LVB);
+ memcpy(lock->lksb->lvb, past->lvb, DLM_LVB_LEN);
+ }
status = dlm_do_ast(dlm, res, lock);
dlmprintk("ast done: now... type=%d, convert_type=%d\n",
lock->type, lock->convert_type);
@@ -1505,6 +1649,7 @@
dlm_create_lock *create = (dlm_create_lock *)msg->buf;
dlm_lock_resource *res;
dlm_lock *newlock;
+ dlm_lockstatus *lksb;
dlm_status status = DLM_NORMAL;
struct qstr lockname = { .name=create->name, .len=create->namelen };
@@ -1516,6 +1661,12 @@
if (!newlock)
return DLM_SYSERR;
+ lksb = kmalloc(sizeof(dlm_lockstatus), GFP_KERNEL);
+ if (!lksb) {
+ kfree(newlock);
+ return DLM_SYSERR;
+ }
+
memset(newlock, 0, sizeof(dlm_lock));
INIT_LIST_HEAD(&newlock->list);
INIT_LIST_HEAD(&newlock->ast_list);
@@ -1529,6 +1680,11 @@
newlock->astdata = NULL;
newlock->cookie = create->cookie;
+ memset(lksb, 0, sizeof(dlm_lockstatus));
+ newlock->lksb = lksb;
+ lksb->lockid = newlock;
+ lksb->flags |= DLM_LKSB_KERNEL_ALLOCATED;
+
status = DLM_IVLOCKID;
res = dlm_lookup_lock(dlm, &lockname);
if (res) {
@@ -1557,21 +1713,34 @@
int tmpret;
dlm_status ret;
int status = 0;
+ int arrsz = 1, msgsz = sizeof(dlm_convert_lock);
+ net_data nd[2];
dlmprintk0("\n");
- memset(&convert, 0, sizeof(convert));
+ memset(&convert, 0, sizeof(dlm_convert_lock));
convert.node_idx = dlm->group_index;
convert.requested_type = type;
convert.cookie = lock->cookie;
convert.namelen = res->lockname.len;
convert.flags = flags;
strncpy(convert.name, res->lockname.name, convert.namelen);
+
+ nd[0].bytes = msgsz;
+ nd[0].ptr = &convert;
+ if (flags & LKM_PUT_LVB) {
+ /* extra data to send if we are updating lvb */
+ nd[1].bytes = DLM_LVB_LEN;
+ nd[1].ptr = lock->lksb->lvb;
+ arrsz++;
+ msgsz += DLM_LVB_LEN;
+ }
+
ret = DLM_NOLOCKMGR;
inode = nm_get_group_node_by_index(dlm->group, res->owner);
if (inode) {
- tmpret = net_send_message(DLM_CONVERT_LOCK_MSG, dlm->key, &convert, sizeof(convert), inode, &status);
+ tmpret = net_send_message_arr(DLM_CONVERT_LOCK_MSG, dlm->key, arrsz, nd, msgsz, inode, &status);
if (tmpret >= 0) {
// successfully sent and received
ret = status; // this is already a dlm_status
@@ -1585,6 +1754,8 @@
return ret;
}
+
+
int dlm_convert_lock_handler(net_msg *msg, u32 len, void *data)
{
dlm_ctxt *dlm = data;
@@ -1592,35 +1763,45 @@
dlm_lock_resource *res;
struct list_head *iter;
dlm_lock *lock;
+ dlm_lockstatus *lksb;
dlm_status status = DLM_NORMAL;
int found = 0;
struct qstr lockname = { .name=convert->name, .len=convert->namelen };
- union {
- u64 q;
- u32 hilo[2];
- } u1, u2, u3, u4, u5, u6, u7;
+ u32 flags = convert->flags;
+ if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
+ (LKM_PUT_LVB|LKM_GET_LVB)) {
+ dlmprintk("both PUT and GET lvb specified\n");
+ return DLM_BADARGS;
+ }
- dlmprintk0("\n");
- rdtsc(u1.hilo[0], u1.hilo[1]);
-
+ dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" :
+ (flags & LKM_GET_LVB ? "get lvb" : "none"));
+
lockname.hash = full_name_hash(lockname.name, lockname.len);
- rdtsc(u2.hilo[0], u2.hilo[1]);
status = DLM_IVLOCKID;
res = dlm_lookup_lock(dlm, &lockname);
- rdtsc(u3.hilo[0], u3.hilo[1]);
if (res) {
spin_lock(&res->spinlock);
- rdtsc(u4.hilo[0], u4.hilo[1]);
list_for_each(iter, &res->granted) {
lock = list_entry(iter, dlm_lock, list);
if (lock->cookie == convert->cookie &&
lock->node == convert->node_idx) {
found = 1;
- rdtsc(u5.hilo[0], u5.hilo[1]);
- status = dlmconvert_local(dlm, res, lock, convert->flags, convert->requested_type);
- rdtsc(u6.hilo[0], u6.hilo[1]);
+ lksb = lock->lksb;
+ if (flags & LKM_PUT_LVB) {
+ DLM_ASSERT(!(lksb->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB)));
+ lksb->flags |= DLM_LKSB_PUT_LVB;
+ memcpy(&lksb->lvb[0], &convert->lvb[0], DLM_LVB_LEN);
+ } else if (flags & LKM_GET_LVB) {
+ DLM_ASSERT(!(lksb->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB)));
+ lksb->flags |= DLM_LKSB_GET_LVB;
+ }
+ status = dlmconvert_local(dlm, res, lock, flags, convert->requested_type);
+ if (status != DLM_NORMAL) {
+ lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
+ }
break;
}
}
@@ -1629,9 +1810,6 @@
if (!found)
dlmprintk("failed to find lock to convert on grant queue! cookie=%llu\n", convert->cookie);
- rdtsc(u7.hilo[0], u7.hilo[1]);
- dlmprintk("1-2:%llu 2-3:%llu 3-4:%llu 4-5:%llu 5-6:%llu 6-7:%llu\n",
- u2.q-u1.q, u3.q-u2.q, u4.q-u3.q, u5.q-u4.q, u6.q-u5.q, u7.q-u6.q);
return status;
}
Modified: trunk/cluster/dlmmod.h
===================================================================
--- trunk/cluster/dlmmod.h 2004-12-23 21:40:45 UTC (rev 1717)
+++ trunk/cluster/dlmmod.h 2004-12-23 21:48:17 UTC (rev 1718)
@@ -74,36 +74,42 @@
#define LKM_MODEMASK 0xff
-/* TODO: Flags which OCFS2 will require:
- * - LKM_LOCAL
- * - LKM_VALBLK
- * - LKM_NOQUEUE
- * - LKM_CONVERT
- * - LKM_CANCEL */
-#define LKM_ORPHAN 0x10 /* this lock is orphanable */ /* unsupported */
-#define LKM_PARENTABLE 0x20 /* this lock was orphaned */ /* unsupported */
-#define LKM_BLOCK 0x40 /* blocking lock request */ /* unsupported */
-#define LKM_LOCAL 0x80 /* local lock request */
-#define LKM_VALBLK 0x100 /* lock value block request */
-#define LKM_NOQUEUE 0x200 /* non blocking request */
-#define LKM_CONVERT 0x400 /* conversion request */
-#define LKM_NODLCKWT 0x800 /* this lock wont deadlock */ /* unsupported */
-#define LKM_UNLOCK 0x1000 /* deallocate this lock */
-#define LKM_CANCEL 0x2000 /* cancel conversion request */
-#define LKM_DEQALL 0x4000 /* remove all locks held by proc */ /* unsupported */
-#define LKM_INVVALBLK 0x8000 /* invalidate lock value block */
-#define LKM_SYNCSTS 0x10000 /* return synchronous status if poss */ /* unsupported */
-#define LKM_TIMEOUT 0x20000 /* lock request contains timeout */ /* unsupported */
-#define LKM_SNGLDLCK 0x40000 /* request can self-deadlock */ /* unsupported */
-#define LKM_FINDLOCAL 0x80000 /* find local lock request */ /* unsupported */
-#define LKM_PROC_OWNED 0x100000 /* owned by process, not group */ /* unsupported */
-#define LKM_XID 0x200000 /* use transaction id for deadlock */ /* unsupported */
-#define LKM_XID_CONFLICT 0x400000 /* do not allow lock inheritance */ /* unsupported */
-#define LKM_FORCE 0x800000 /* force unlock flag */
-#define LKM_REVVALBLK 0x1000000 /* temporary solution: re-validate lock value block */ /* unsupported */
+/* reserved: flags used by the "real" dlm, only a few are supported by this dlm */
+#define LKM_ORPHAN 0x00000010 /* this lock is orphanable */ /* unsupported */
+#define LKM_PARENTABLE 0x00000020 /* this lock was orphaned */ /* unsupported */
+#define LKM_BLOCK 0x00000040 /* blocking lock request */ /* unsupported */
+#define LKM_LOCAL 0x00000080 /* local lock request */
+#define LKM_VALBLK 0x00000100 /* lock value block request */
+#define LKM_NOQUEUE 0x00000200 /* non blocking request */
+#define LKM_CONVERT 0x00000400 /* conversion request */
+#define LKM_NODLCKWT 0x00000800 /* this lock wont deadlock */ /* unsupported */
+#define LKM_UNLOCK 0x00001000 /* deallocate this lock */
+#define LKM_CANCEL 0x00002000 /* cancel conversion request */
+#define LKM_DEQALL 0x00004000 /* remove all locks held by proc */ /* unsupported */
+#define LKM_INVVALBLK 0x00008000 /* invalidate lock value block */
+#define LKM_SYNCSTS 0x00010000 /* return synchronous status if poss */ /* unsupported */
+#define LKM_TIMEOUT 0x00020000 /* lock request contains timeout */ /* unsupported */
+#define LKM_SNGLDLCK 0x00040000 /* request can self-deadlock */ /* unsupported */
+#define LKM_FINDLOCAL 0x00080000 /* find local lock request */ /* unsupported */
+#define LKM_PROC_OWNED 0x00100000 /* owned by process, not group */ /* unsupported */
+#define LKM_XID 0x00200000 /* use transaction id for deadlock */ /* unsupported */
+#define LKM_XID_CONFLICT 0x00400000 /* do not allow lock inheritance */ /* unsupported */
+#define LKM_FORCE 0x00800000 /* force unlock flag */
+#define LKM_REVVALBLK 0x01000000 /* temporary solution: re-validate lock value block */ /* unsupported */
+/* unused */
+#define LKM_UNUSED1 0x00000001 /* unused */
+#define LKM_UNUSED2 0x00000002 /* unused */
+#define LKM_UNUSED3 0x00000004 /* unused */
+#define LKM_UNUSED4 0x00000008 /* unused */
+#define LKM_UNUSED5 0x02000000 /* unused */
+#define LKM_UNUSED6 0x04000000 /* unused */
+#define LKM_UNUSED7 0x08000000 /* unused */
+#define LKM_UNUSED8 0x10000000 /* unused */
+/* ocfs2 extensions: internal only; should never be used by caller */
+#define LKM_PUT_LVB 0x20000000 /* extension: lvb is being passed, should be applied to lockres */
+#define LKM_GET_LVB 0x40000000 /* extension: lvb should be copied from lockres when lock granted */
+#define LKM_RECOVERY 0x80000000 /* extension: flag for recovery lock, used to avoid recovery rwsem */
-#define LKM_RECOVERY 0x80000000 /* extension: flag for recovery lock, used to avoid recovery rwsem */
-
#define LKM_VALID_FLAGS (LKM_VALBLK | LKM_CONVERT | LKM_UNLOCK | \
LKM_CANCEL | LKM_INVVALBLK | LKM_FORCE | \
LKM_RECOVERY | LKM_LOCAL | LKM_NOQUEUE)
@@ -247,8 +253,18 @@
} dlm_lock;
+#define DLM_LKSB_KERNEL_ALLOCATED 0x01 // allocated on master node on behalf of remote node
+#define DLM_LKSB_PUT_LVB 0x02
+#define DLM_LKSB_GET_LVB 0x04
+#define DLM_LKSB_UNUSED2 0x08
+#define DLM_LKSB_UNUSED3 0x10
+#define DLM_LKSB_UNUSED4 0x20
+#define DLM_LKSB_UNUSED5 0x40
+#define DLM_LKSB_UNUSED6 0x80
+
struct _dlm_lockstatus {
- dlm_status status;
+ dlm_status status; // can we just change this to a u8 or u16?
+ u32 flags;
dlm_lock *lockid;
char lvb[DLM_LVB_LEN];
};
Modified: trunk/cluster/dlmthread.c
===================================================================
--- trunk/cluster/dlmthread.c 2004-12-23 21:40:45 UTC (rev 1717)
+++ trunk/cluster/dlmthread.c 2004-12-23 21:48:17 UTC (rev 1718)
@@ -136,14 +136,9 @@
list_del(&target->list);
list_add_tail(&target->list, &res->granted);
- if (target->node == dlm->group_index) {
- DLM_ASSERT(target->lksb);
+ DLM_ASSERT(target->lksb);
+ target->lksb->status = DLM_NORMAL;
- target->lksb->status = DLM_NORMAL;
- } else {
- dlmprintk0("nonlocal lock, not setting DLM_NORMAL in lksb\n");
- }
-
spin_unlock(&target->spinlock);
if (dlm_do_ast(dlm, res, target) < 0)
@@ -197,14 +192,9 @@
list_del(&target->list);
list_add_tail(&target->list, &res->granted);
- if (target->node == dlm->group_index) {
- DLM_ASSERT(target->lksb);
+ DLM_ASSERT(target->lksb);
+ target->lksb->status = DLM_NORMAL;
- target->lksb->status = DLM_NORMAL;
- } else {
- dlmprintk0("nonlocal lock, not setting DLM_NORMAL in lksb\n");
- }
-
spin_unlock(&target->spinlock);
if (dlm_do_ast(dlm, res, target) < 0)
Modified: trunk/cluster/nodemanager.c
===================================================================
--- trunk/cluster/nodemanager.c 2004-12-23 21:40:45 UTC (rev 1717)
+++ trunk/cluster/nodemanager.c 2004-12-23 21:48:17 UTC (rev 1718)
@@ -1077,7 +1077,11 @@
{
struct dentry *dentry = NULL;
struct inode *inode = NULL;
-
+
+ NM_ASSERT(node_name);
+ NM_ASSERT(single_sb);
+ NM_ASSERT(single_sb->s_root);
+
dentry = lookup_one_len(node_name, single_sb->s_root, strlen(node_name));
if (!IS_ERR(dentry)) {
inode = dentry->d_inode;
Modified: trunk/cluster/nodemanager.h
===================================================================
--- trunk/cluster/nodemanager.h 2004-12-23 21:40:45 UTC (rev 1717)
+++ trunk/cluster/nodemanager.h 2004-12-23 21:48:17 UTC (rev 1718)
@@ -28,6 +28,7 @@
#ifndef CLUSTER_NODEMANAGER_H
#define CLUSTER_NODEMANAGER_H
+#define NM_ASSERT(x) ({ if (!(x)) { printk("nm: assert failed! %s:%d\n", __FILE__, __LINE__); BUG(); } })
struct _nm_ctxt
Modified: trunk/cluster/tcp.c
===================================================================
--- trunk/cluster/tcp.c 2004-12-23 21:40:45 UTC (rev 1717)
+++ trunk/cluster/tcp.c 2004-12-23 21:48:17 UTC (rev 1718)
@@ -97,6 +97,12 @@
static struct completion net_recv_complete;
+static inline void net_abort_status_return(net_status_ctxt *nsc)
+{
+ spin_lock(&net_status_lock);
+ list_del(&nsc->list);
+ spin_unlock(&net_status_lock);
+}
/////////////////////
static void net_shutdown(void);
@@ -147,7 +153,7 @@
{
atomic_dec(&nmh->refcnt);
if (!atomic_read(&nmh->refcnt)) {
- if (net_handler_in_use(nmh))
+ if (nmh->flags & NET_HND_IN_USE)
netprintk0("EEEEK! killing inuse handler! bugbug!\n");
kfree(nmh);
}
@@ -156,7 +162,7 @@
static inline void net_put_handler(net_msg_handler *nmh)
{
if (atomic_dec_and_lock(&nmh->refcnt, &net_handler_lock)) {
- if (net_handler_in_use(nmh))
+ if (nmh->flags & NET_HND_IN_USE)
netprintk0("EEEEK! killing inuse handler! bugbug!\n");
kfree(nmh);
spin_unlock(&net_handler_lock);
@@ -649,7 +655,7 @@
netprintk("no such message type: %u/%u\n", msg_type, key);
return NULL;
}
- if (net_handler_msg_len_ok(handler, len)) {
+ if (!net_handler_msg_len_ok(handler, len)) {
netprintk("len for message type %u incorrect: %u, should be %u\n",
msg_type, len, handler->max_len);
goto done;
@@ -873,7 +879,7 @@
return -EINVAL;
}
- if (net_handler_msg_len_ok(handler, len)) {
+ if (!net_handler_msg_len_ok(handler, len)) {
netprintk("len for message type %u incorrect: %u, should be %u\n",
msg_type, len, handler->max_len);
ret = -EINVAL;
@@ -925,27 +931,167 @@
add_wait_queue(net->sock->sk->sk_sleep, &sleep);
spin_unlock(&net->sock_lock);
}
+
+ ret = net_send_tcp_msg(inode, NULL, msg, packet_len);
+
+ if (status) {
+ if (ret >= 0) {
+ /* wait on other node's handler */
+ tmpret = util_wait_atomic_eq(&nsc.wq, &nsc.woken, 1, 0);
+ if (tmpret==0) {
+ *status = nsc.status;
+ netprintk("status return requested, status is %d\n", *status);
+ } else {
+ ret = tmpret;
+ net_abort_status_return(&nsc);
+ netprintk0("net_abort_status_return called\n");
+ netprintk("status return requested, and error occurred while waiting=%d\n", ret);
+ *status = ret;
+ }
+ remove_wait_queue(recv_sock->sk->sk_sleep, &sleep);
+ } else {
+ netprintk("status return requested, and error returned from net_send_tcp_msg=%d\n", ret);
+ /* return bad status right away */
+ *status = ret;
+ }
+ } else if (ret < 0) {
+ netprintk("no status return requested, but error returned from net_send_tcp_msg=%d\n", ret);
+ }
+
+done:
+ if (handler)
+ net_put_handler(handler);
+ if (msg)
+ kfree(msg);
+ return ret;
+}
+
+
+int net_send_message_arr(u32 msg_type, u32 key, int arrlen, net_data *arr, u32 len, struct inode *inode, int *status)
{
- union {
- u64 q;
- u32 hilo[2];
- } u1, u2;
- rdtsc(u1.hilo[0], u1.hilo[1]);
+ int ret = 0, tmpret, i;
+ net_msg *msg = NULL;
+ net_msg_handler *handler = NULL;
+ u32 packet_len;
+ net_status_ctxt nsc;
+ wait_queue_t sleep;
+ nm_node_inode_private *priv = NULL;
+ net_inode_private *net = NULL;
+ char *src, *dest;
+ if (!inode || !inode->u.generic_ip) {
+ netprintk0("bad inode, cannot send message\n");
+ return -EINVAL;
+ }
+ if (arrlen <= 0) {
+ netprintk0("bad data array length\n");
+ return -EINVAL;
+ }
+ priv = (nm_node_inode_private *)inode->u.generic_ip;
+ net = &priv->net;
+ spin_lock(&net->sock_lock);
+ if (!net->sock) {
+ spin_unlock(&net->sock_lock);
+ ret = net_init_tcp_sock(inode);
+ if (!(ret == 0 || ret == -EEXIST)) {
+ netprintk0("failed to create socket!");
+ return -EINVAL;
+ }
+ }
+ spin_unlock(&net->sock_lock);
+
+ spin_lock(&net_handler_lock);
+ handler = net_lookup_handler(msg_type, key);
+ spin_unlock(&net_handler_lock);
+
+ if (!handler) {
+ netprintk("no such message type: %u/%u\n", msg_type, key);
+ return -EINVAL;
+ }
+
+ if (!net_handler_msg_len_ok(handler, len)) {
+ netprintk("len for message type %u incorrect: %u, should be %u\n",
+ msg_type, len, handler->max_len);
+ ret = -EINVAL;
+ goto done;
+ }
+ packet_len = len + sizeof(net_msg);
+ msg = kmalloc(packet_len, GFP_KERNEL);
+ if (!msg) {
+ netprintk("failed to allocate %u bytes for message!\n", packet_len);
+ ret = -ENOMEM;
+ goto done;
+ }
+ memset(msg, 0, packet_len);
+ msg->magic = NET_MSG_MAGIC;
+ msg->data_len = len;
+ msg->msg_type = msg_type;
+ msg->key = key;
+ spin_lock(&net_msg_num_lock);
+ msg->msg_num = net_msg_num;
+ if (net_msg_num == NET_MSG_NUM_MAX) {
+ netprintk0("eek! net_msg_num wrapping to 1 now...\n");
+ net_msg_num = 1;
+ }
+ spin_unlock(&net_msg_num_lock);
+ if (len > 0) {
+ int tmplen = len;
+ dest = &(msg->buf[0]);
+ for (i=0; i<arrlen; i++) {
+ src = arr[i].ptr;
+ if (arr[i].bytes > tmplen) {
+ netprintk0("data array is too large!\n");
+ kfree(msg);
+ return -EINVAL;
+ }
+ memcpy(dest, src, arr[i].bytes);
+ tmplen -= arr[i].bytes;
+ dest += arr[i].bytes;
+ }
+ }
+
+ /* does the caller want to wait for a simple status? */
+ if (status) {
+ msg->status = 1;
+
+ INIT_LIST_HEAD(&nsc.list);
+ init_waitqueue_head(&nsc.wq);
+ atomic_set(&nsc.woken, 0);
+ nsc.msg_num = msg->msg_num;
+ nsc.status = 0;
+ spin_lock(&net_status_lock);
+ list_add(&nsc.list, &net_status_list);
+ spin_unlock(&net_status_lock);
+
+ init_waitqueue_entry(&sleep, current);
+ spin_lock(&net->sock_lock);
+ if (!net->sock) {
+ spin_unlock(&net->sock_lock);
+ netprintk0("caller wanted status return but socket went away!\n");
+ kfree(msg);
+ return -EINVAL;
+ }
+ add_wait_queue(net->sock->sk->sk_sleep, &sleep);
+ spin_unlock(&net->sock_lock);
+ }
+
ret = net_send_tcp_msg(inode, NULL, msg, packet_len);
- rdtsc(u2.hilo[0], u2.hilo[1]);
- netprintk("net_send_tcp_msg took %llu cycles\n", u2.q-u1.q);
if (status) {
if (ret >= 0) {
/* wait on other node's handler */
- rdtsc(u1.hilo[0], u1.hilo[1]);
tmpret = util_wait_atomic_eq(&nsc.wq, &nsc.woken, 1, 0);
- rdtsc(u2.hilo[0], u2.hilo[1]);
- netprintk("waiting on status took %llu cycles\n", u2.q-u1.q);
- *status = nsc.status;
- netprintk("status return requested, status is %d\n", *status);
+ if (tmpret==0) {
+ *status = nsc.status;
+ netprintk("status return requested, status is %d\n", *status);
+ } else {
+ ret = tmpret;
+ net_abort_status_return(&nsc);
+ netprintk0("net_abort_status_return called\n");
+ netprintk("status return requested, and error occurred while waiting=%d\n", ret);
+ *status = ret;
+ }
remove_wait_queue(recv_sock->sk->sk_sleep, &sleep);
} else {
netprintk("status return requested, and error returned from net_send_tcp_msg=%d\n", ret);
@@ -955,8 +1101,7 @@
} else if (ret < 0) {
netprintk("no status return requested, but error returned from net_send_tcp_msg=%d\n", ret);
}
-}
-
+
done:
if (handler)
net_put_handler(handler);
@@ -984,10 +1129,6 @@
net_msg_handler *hnd = NULL;
int err = 0;
int tmperr;
- union {
- u64 q;
- u32 hilo[2];
- } u1, u2, u3, u4;
start_over:
@@ -1031,13 +1172,9 @@
hdr.magic, hdr.msg_type, hdr.key);
if (hdr.magic == NET_MSG_STATUS_MAGIC) {
-rdtsc(u1.hilo[0], u1.hilo[1]);
net_dump_msg(sock, inode);
/* special type for returning message status */
-rdtsc(u2.hilo[0], u2.hilo[1]);
net_do_status_return(hdr.msg_num, hdr.status);
-rdtsc(u3.hilo[0], u3.hilo[1]);
-netprintk("status return: net_dump_msg took %llu, net_do_status_return took %llu\n", u2.q-u1.q, u3.q-u2.q);
err = 0;
goto error;
} else if (hdr.magic != NET_MSG_MAGIC) {
@@ -1068,11 +1205,7 @@
netprintk0("no handler for message.\n");
goto error;
}
-netprintk0("about to dispatch message\n");
-rdtsc(u1.hilo[0], u1.hilo[1]);
err = net_dispatch_message(inode, sock, &hdr, hnd);
-rdtsc(u2.hilo[0], u2.hilo[1]);
-netprintk("net_dispatch_message took %llu\n", u2.q-u1.q);
/* if node has requested status return, do it now */
if (hdr.status) {
@@ -1083,11 +1216,7 @@
#endif
hdr.status = err;
hdr.magic = NET_MSG_STATUS_MAGIC; // twiddle the magic
-netprintk("about to send return message, status=%d\n", err);
-rdtsc(u3.hilo[0], u3.hilo[1]);
tmperr = net_send_tcp_msg(inode, sock, &hdr, sizeof(net_msg));
-rdtsc(u4.hilo[0], u4.hilo[1]);
-netprintk("status return (net_send_tcp_msg) took %llu\n", u4.q-u3.q);
} else if (err < 0) {
netprintk("dispatch (%u/%u) returned %d\n",
hdr.msg_type, hdr.key, err);
@@ -1100,7 +1229,6 @@
spin_lock(&net_list_lock);
list_add_tail(&net->list, &net_recv_list);
spin_unlock(&net_list_lock);
-netprintk0("all done with this round, starting over\n");
goto start_over;
error:
@@ -1131,6 +1259,8 @@
}
+
+
void net_do_status_return(u64 msg_num, s32 status)
{
net_status_ctxt *nsc;
@@ -1160,7 +1290,7 @@
packet_len = len + sizeof(net_msg);
spin_lock(&hnd->lock);
- if (net_handler_in_use(hnd)) {
+ if (hnd->flags & NET_HND_IN_USE) {
netprintk0("EEEEEK! handler in use! bugbug\n");
spin_unlock(&hnd->lock);
return -EINVAL;
@@ -1171,7 +1301,7 @@
spin_unlock(&hnd->lock);
return -EINVAL;
}
- hnd->flags |= (1 << NET_HND_IN_USE);
+ hnd->flags |= NET_HND_IN_USE;
spin_unlock(&hnd->lock);
memset(hnd->buf, 0, packet_len);
@@ -1184,7 +1314,7 @@
}
spin_lock(&hnd->lock);
- hnd->flags &= ~(1 << NET_HND_IN_USE);
+ hnd->flags &= ~NET_HND_IN_USE;
spin_unlock(&hnd->lock);
return ret;
Modified: trunk/cluster/tcp.h
===================================================================
--- trunk/cluster/tcp.h 2004-12-23 21:40:45 UTC (rev 1717)
+++ trunk/cluster/tcp.h 2004-12-23 21:48:17 UTC (rev 1718)
@@ -126,18 +126,15 @@
return 0;
}
-enum {
- NET_HND_VAR_LEN = 0,
- NET_HND_IN_USE,
-};
-#define net_handler_variable_len(h) ((h)->flags & (1 << NET_HND_VAR_LEN))
-#define net_handler_in_use(h) ((h)->flags & (1 << NET_HND_IN_USE))
+#define NET_HND_VAR_LEN 0x00000001
+#define NET_HND_IN_USE 0x00000002
static inline int net_handler_msg_len_ok(net_msg_handler *handler, u32 len)
{
- return (net_handler_variable_len(handler) ?
- len > handler->max_len : len != handler->max_len);
+ return ((handler->flags & NET_HND_VAR_LEN) ?
+ len <= handler->max_len :
+ len == handler->max_len);
}
@@ -196,6 +193,11 @@
NET_DRIVER_READY,
};
+typedef struct _net_data
+{
+ int bytes;
+ void *ptr;
+} net_data;
int net_register_handler(u32 msg_type, u32 key, int flags,
u32 max_len, net_msg_handler_func *func, void *data, void *buf);
@@ -205,6 +207,7 @@
int net_send_error(struct socket *sock, u32 err_type);
int net_init_tcp_sock(struct inode *inode);
int net_send_message(u32 msg_type, u32 key, void *data, u32 len, struct inode *inode, int *status);
+int net_send_message_arr(u32 msg_type, u32 key, int arrlen, net_data *arr, u32 len, struct inode *inode, int *status);
int net_broadcast_message(u32 msg_type, u32 key, void *data, u32 len, struct inode *group);
net_msg_handler * net_lookup_handler(u32 msg_type, u32 key);
More information about the Ocfs2-commits
mailing list