[Ocfs2-commits] khackel commits r1751 - trunk/cluster
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Mon Jan 10 16:08:59 CST 2005
Author: khackel
Date: 2005-01-10 16:08:58 -0600 (Mon, 10 Jan 2005)
New Revision: 1751
Modified:
trunk/cluster/dlmmaster.c
trunk/cluster/dlmmod.c
trunk/cluster/dlmmod.h
trunk/cluster/dlmrecovery.c
trunk/cluster/tcp.c
trunk/cluster/tcp.h
Log:
* removed useless net_package_message function
* changed net_dispatch_message to take a length instead of net_msg*
* proper handling of network byteorder in dlm and tcp messages
Modified: trunk/cluster/dlmmaster.c
===================================================================
--- trunk/cluster/dlmmaster.c 2005-01-08 02:58:35 UTC (rev 1750)
+++ trunk/cluster/dlmmaster.c 2005-01-10 22:08:58 UTC (rev 1751)
@@ -453,10 +453,13 @@
dlm_lock_resource *res;
dlm_master_request *request = (dlm_master_request *) msg->buf;
dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
- struct qstr lockname = { .name=request->name, .len=request->namelen };
+ struct qstr lockname;
int found;
struct list_head *iter;
+ dlm_master_request_to_host(request);
+ lockname.name = request->name;
+ lockname.len = request->namelen;
lockname.hash = full_name_hash(lockname.name, lockname.len);
way_up_top:
@@ -606,9 +609,11 @@
dlm_master_request_resp *resp = (dlm_master_request_resp *) msg->buf;
int found = 0, wake = 0;
struct list_head *iter;
- struct qstr lockname = { .name=resp->name, .len=resp->namelen };
-
+ struct qstr lockname;
+ dlm_master_request_resp_to_host(resp);
+ lockname.name = resp->name;
+ lockname.len = resp->namelen;
lockname.hash = full_name_hash(lockname.name, lockname.len);
spin_lock(&dlm_master_lock);
@@ -689,8 +694,11 @@
dlm_lock_resource *res;
int bit;
struct list_head *iter;
- struct qstr lockname = { .name=assert->name, .len=assert->namelen };
+ struct qstr lockname;
+ dlm_assert_master_to_host(assert);
+ lockname.name = assert->name;
+ lockname.len = assert->namelen;
lockname.hash = full_name_hash(lockname.name, lockname.len);
spin_lock(&dlm->spinlock);
@@ -787,6 +795,7 @@
ret = -EINVAL;
inode = nm_get_group_node_by_index(dlm->group, to);
if (inode) {
+ dlm_master_request_to_net(&request);
ret = net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request, sizeof(request), inode, &response);
iput(inode);
if (ret >= 0) {
@@ -840,6 +849,8 @@
inode = nm_get_group_node_by_index(dlm->group, to);
if (!inode)
return -EINVAL;
+
+ dlm_master_request_resp_to_net(&resp);
ret = net_send_message(DLM_MASTER_REQUEST_RESP_MSG, dlm->key, &resp, sizeof(resp), inode, NULL);
iput(inode);
return ret;
@@ -882,6 +893,8 @@
ret = tmpret;
break;
}
+
+ dlm_assert_master_to_net(&assert);
tmpret = net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &assert, sizeof(assert), inode, NULL);
iput(inode);
Modified: trunk/cluster/dlmmod.c
===================================================================
--- trunk/cluster/dlmmod.c 2005-01-08 02:58:35 UTC (rev 1750)
+++ trunk/cluster/dlmmod.c 2005-01-10 22:08:58 UTC (rev 1751)
@@ -95,62 +95,6 @@
extern spinlock_t dlm_master_lock;
extern struct list_head dlm_master_list;
-
-typedef struct _dlm_create_lock
-{
- u32 flags; // TODO: reduce the size of this
- u16 node_idx;
- s8 requested_type;
- u8 namelen;
- u8 name[NM_MAX_NAME_LEN];
- u64 cookie;
-} dlm_create_lock;
-
-typedef struct _dlm_convert_lock
-{
- u32 flags; // TODO: reduce the size of this
- u16 node_idx;
- s8 requested_type;
- u8 namelen;
- u8 name[NM_MAX_NAME_LEN];
- u64 cookie;
- s8 lvb[0];
-} dlm_convert_lock;
-#define DLM_CONVERT_LOCK_MAX_LEN (sizeof(dlm_convert_lock) + DLM_LVB_LEN)
-
-typedef struct _dlm_unlock_lock
-{
- u32 flags; // TODO: reduce the size of this
- u16 node_idx;
- u8 namelen;
- u8 name[NM_MAX_NAME_LEN];
- u64 cookie;
- s8 lvb[0];
-} dlm_unlock_lock;
-#define DLM_UNLOCK_LOCK_MAX_LEN (sizeof(dlm_unlock_lock) + DLM_LVB_LEN)
-
-
-typedef struct _dlm_proxy_ast
-{
- u32 flags; // TODO: reduce the size of this
- u16 node_idx;
- u8 type;
- u8 blocked_type;
- u8 namelen;
- u8 name[NM_MAX_NAME_LEN];
- u64 cookie;
- s8 lvb[0];
-} dlm_proxy_ast;
-#define DLM_PROXY_AST_MAX_LEN (sizeof(dlm_proxy_ast) + DLM_LVB_LEN)
-
-
-int dlm_create_lock_handler(net_msg *msg, u32 len, void *data);
-int dlm_convert_lock_handler(net_msg *msg, u32 len, void *data);
-int dlm_proxy_ast_handler(net_msg *msg, u32 len, void *data);
-
-int dlm_unlock_lock_handler(net_msg *msg, u32 len, void *data);
-dlm_status dlm_send_remote_unlock_request(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, dlm_lockstatus *lksb, int flags);
-
/* ----------------------------------------------------------------- */
@@ -927,6 +871,7 @@
lksb->status = DLM_NOLOCKMGR;
inode = nm_get_group_node_by_index(dlm->group, res->owner);
if (inode) {
+ dlm_unlock_lock_to_net(&unlock);
tmpret = net_send_message_arr(DLM_UNLOCK_LOCK_MSG, dlm->key, arrsz, nd, msgsz, inode, &status);
if (tmpret >= 0) {
// successfully sent and received
@@ -957,9 +902,14 @@
int found = 0;
dlm_lockstatus *lksb = NULL;
int ignore;
- struct qstr lockname = { .name=unlock->name, .len=unlock->namelen };
- u32 flags = unlock->flags;
+ struct qstr lockname;
+ u32 flags;
+ dlm_unlock_lock_to_host(unlock);
+ lockname.name = unlock->name;
+ lockname.len = unlock->namelen;
+ flags = unlock->flags;
+
if (flags & LKM_GET_LVB) {
dlmprintk0("bad args! GET_LVB specified on unlock!\n");
return DLM_BADARGS;
@@ -1479,6 +1429,7 @@
ret = -EINVAL;
inode = nm_get_group_node_by_index(dlm->group, lock->node);
if (inode) {
+ dlm_proxy_ast_to_net(&past);
ret = net_send_message_arr(DLM_PROXY_AST_MSG, dlm->key, arrsz, nd, msgsz, inode, NULL);
iput(inode);
}
@@ -1495,11 +1446,17 @@
dlm_lock_resource *res;
dlm_lock *lock = NULL;
dlm_proxy_ast *past = (dlm_proxy_ast *) msg->buf;
- struct qstr lockname = { .name=past->name, .len=past->namelen };
+ struct qstr lockname;
struct list_head *iter, *head=NULL;
- u64 cookie = past->cookie;
- u32 flags = past->flags;
+ u64 cookie;
+ u32 flags;
+ dlm_proxy_ast_to_host(past);
+ lockname.name = past->name;
+ lockname.len = past->namelen;
+ cookie = past->cookie;
+ flags = past->flags;
+
if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
(LKM_PUT_LVB|LKM_GET_LVB)) {
dlmprintk("both PUT and GET lvb specified\n");
@@ -1633,6 +1590,7 @@
ret = DLM_NOLOCKMGR;
inode = nm_get_group_node_by_index(dlm->group, res->owner);
if (inode) {
+ dlm_create_lock_to_net(&create);
tmpret = net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create, sizeof(create), inode, &status);
if (tmpret >= 0) {
// successfully sent and received
@@ -1655,7 +1613,11 @@
dlm_lock *newlock;
dlm_lockstatus *lksb;
dlm_status status = DLM_NORMAL;
- struct qstr lockname = { .name=create->name, .len=create->namelen };
+ struct qstr lockname;
+
+ dlm_create_lock_to_host(create);
+ lockname.name = create->name;
+ lockname.len = create->namelen;
dlmprintk0("\n");
@@ -1744,6 +1706,7 @@
ret = DLM_NOLOCKMGR;
inode = nm_get_group_node_by_index(dlm->group, res->owner);
if (inode) {
+ dlm_convert_lock_to_net(&convert);
tmpret = net_send_message_arr(DLM_CONVERT_LOCK_MSG, dlm->key, arrsz, nd, msgsz, inode, &status);
if (tmpret >= 0) {
// successfully sent and received
@@ -1770,9 +1733,14 @@
dlm_lockstatus *lksb;
dlm_status status = DLM_NORMAL;
int found = 0;
- struct qstr lockname = { .name=convert->name, .len=convert->namelen };
- u32 flags = convert->flags;
+ struct qstr lockname;
+ u32 flags;
+ dlm_convert_lock_to_host(convert);
+ lockname.name = convert->name;
+ lockname.len = convert->namelen;
+ flags = convert->flags;
+
if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
(LKM_PUT_LVB|LKM_GET_LVB)) {
dlmprintk("both PUT and GET lvb specified\n");
Modified: trunk/cluster/dlmmod.h
===================================================================
--- trunk/cluster/dlmmod.h 2005-01-08 02:58:35 UTC (rev 1750)
+++ trunk/cluster/dlmmod.h 2005-01-10 22:08:58 UTC (rev 1751)
@@ -182,6 +182,7 @@
u16 sending_node;
u32 next_seq;
util_thread_info thread;
+ unsigned long node_map[BITS_TO_LONGS(NM_MAX_NODES)];
} dlm_recovery_ctxt;
@@ -317,18 +318,43 @@
#define DLM_PROXY_AST_MSG 505
#define DLM_UNLOCK_LOCK_MSG 506
+#define DLM_RECO_NODE_DATA_MSG 507
+
+typedef struct _dlm_reco_node_data
+{
+ int state;
+ u16 node_num;
+ struct list_head list;
+ struct list_head granted;
+ struct list_head converting;
+ struct list_head blocked;
+} dlm_reco_node_data;
+
enum {
+ DLM_RECO_NODE_DATA_DEAD = -1,
+ DLM_RECO_NODE_DATA_INIT = 0,
+ DLM_RECO_NODE_DATA_REQUESTING,
+ DLM_RECO_NODE_DATA_REQUESTED,
+ DLM_RECO_NODE_DATA_RECEIVING,
+ DLM_RECO_NODE_DATA_DONE,
+ DLM_RECO_NODE_DATA_FINALIZE_SENT,
+};
+
+
+enum {
DLM_MASTER_RESP_NO,
DLM_MASTER_RESP_YES,
DLM_MASTER_RESP_MAYBE,
DLM_MASTER_RESP_ERROR
};
+
typedef struct _dlm_master_request
{
u16 node_idx;
u8 namelen;
+ u8 pad1;
u8 name[NM_MAX_NAME_LEN];
} dlm_master_request;
@@ -344,13 +370,151 @@
{
u16 node_idx;
u8 namelen;
+ u8 pad1;
u8 name[NM_MAX_NAME_LEN];
} dlm_assert_master;
+typedef struct _dlm_create_lock
+{
+ u64 cookie;
+ u32 flags;
+ u16 node_idx;
+ s8 requested_type;
+ u8 namelen;
+ u8 name[NM_MAX_NAME_LEN];
+} dlm_create_lock;
+typedef struct _dlm_convert_lock
+{
+ u64 cookie;
+ u32 flags;
+ u16 node_idx;
+ s8 requested_type;
+ u8 namelen;
+ u8 name[NM_MAX_NAME_LEN];
+ s8 lvb[0];
+} dlm_convert_lock;
+#define DLM_CONVERT_LOCK_MAX_LEN (sizeof(dlm_convert_lock) + DLM_LVB_LEN)
+typedef struct _dlm_unlock_lock
+{
+ u64 cookie;
+ u32 flags;
+ u16 node_idx;
+ u8 namelen;
+ u8 pad1;
+ u8 name[NM_MAX_NAME_LEN];
+ s8 lvb[0];
+} dlm_unlock_lock;
+#define DLM_UNLOCK_LOCK_MAX_LEN (sizeof(dlm_unlock_lock) + DLM_LVB_LEN)
+typedef struct _dlm_proxy_ast
+{
+ u64 cookie;
+ u32 flags;
+ u16 node_idx;
+ u16 pad1;
+ u8 type;
+ u8 blocked_type;
+ u8 namelen;
+ u8 pad2;
+ u8 name[NM_MAX_NAME_LEN];
+ s8 lvb[0];
+} dlm_proxy_ast;
+#define DLM_PROXY_AST_MAX_LEN (sizeof(dlm_proxy_ast) + DLM_LVB_LEN)
+
+static inline void dlm_master_request_to_net(dlm_master_request *m)
+{
+ m->node_idx = htons(m->node_idx);
+}
+static inline void dlm_master_request_to_host(dlm_master_request *m)
+{
+ m->node_idx = ntohs(m->node_idx);
+}
+
+static inline void dlm_master_request_resp_to_net(dlm_master_request_resp *m)
+{
+ m->node_idx = htons(m->node_idx);
+}
+static inline void dlm_master_request_resp_to_host(dlm_master_request_resp *m)
+{
+ m->node_idx = ntohs(m->node_idx);
+}
+
+static inline void dlm_assert_master_to_net(dlm_assert_master *m)
+{
+ m->node_idx = htons(m->node_idx);
+}
+static inline void dlm_assert_master_to_host(dlm_assert_master *m)
+{
+ m->node_idx = ntohs(m->node_idx);
+}
+
+static inline void dlm_create_lock_to_net(dlm_create_lock *c)
+{
+ c->cookie = cpu_to_be64(c->cookie);
+ c->flags = htonl(c->flags);
+ c->node_idx = htons(c->node_idx);
+}
+static inline void dlm_create_lock_to_host(dlm_create_lock *c)
+{
+ c->cookie = be64_to_cpu(c->cookie);
+ c->flags = ntohl(c->flags);
+ c->node_idx = ntohs(c->node_idx);
+}
+
+static inline void dlm_convert_lock_to_net(dlm_convert_lock *c)
+{
+ c->cookie = cpu_to_be64(c->cookie);
+ c->flags = htonl(c->flags);
+ c->node_idx = htons(c->node_idx);
+}
+static inline void dlm_convert_lock_to_host(dlm_convert_lock *c)
+{
+ c->cookie = be64_to_cpu(c->cookie);
+ c->flags = ntohl(c->flags);
+ c->node_idx = ntohs(c->node_idx);
+}
+
+static inline void dlm_unlock_lock_to_net(dlm_unlock_lock *u)
+{
+ u->cookie = cpu_to_be64(u->cookie);
+ u->flags = htonl(u->flags);
+ u->node_idx = htons(u->node_idx);
+}
+static inline void dlm_unlock_lock_to_host(dlm_unlock_lock *u)
+{
+ u->cookie = be64_to_cpu(u->cookie);
+ u->flags = ntohl(u->flags);
+ u->node_idx = ntohs(u->node_idx);
+}
+
+static inline void dlm_proxy_ast_to_net(dlm_proxy_ast *a)
+{
+ a->cookie = cpu_to_be64(a->cookie);
+ a->flags = htonl(a->flags);
+ a->node_idx = htons(a->node_idx);
+}
+static inline void dlm_proxy_ast_to_host(dlm_proxy_ast *a)
+{
+ a->cookie = be64_to_cpu(a->cookie);
+ a->flags = ntohl(a->flags);
+ a->node_idx = ntohs(a->node_idx);
+}
+
+
+int dlm_create_lock_handler(net_msg *msg, u32 len, void *data);
+int dlm_convert_lock_handler(net_msg *msg, u32 len, void *data);
+int dlm_proxy_ast_handler(net_msg *msg, u32 len, void *data);
+
+int dlm_unlock_lock_handler(net_msg *msg, u32 len, void *data);
+dlm_status dlm_send_remote_unlock_request(dlm_ctxt *dlm, dlm_lock_resource *res, dlm_lock *lock, dlm_lockstatus *lksb, int flags);
+
+
+
+
+
void dlm_shuffle_lists(dlm_ctxt *dlm, dlm_lock_resource *res);
void dlm_thread_run_lock_resources(dlm_ctxt *dlm);
int dlm_thread(void *data);
Modified: trunk/cluster/dlmrecovery.c
===================================================================
--- trunk/cluster/dlmrecovery.c 2005-01-08 02:58:35 UTC (rev 1750)
+++ trunk/cluster/dlmrecovery.c 2005-01-10 22:08:58 UTC (rev 1751)
@@ -63,20 +63,26 @@
u16 dlm_pick_recovery_master(dlm_ctxt *dlm, u16 *new_dead_node);
static int dlm_remaster_locks_local(dlm_ctxt *dlm);
-int dlm_init_recovery_area(dlm_ctxt *dlm, u16 dead_node, u16 num_nodes);
+int dlm_init_recovery_area(dlm_ctxt *dlm);
int dlm_request_all_locks(dlm_ctxt *dlm, u16 request_from, u16 dead_node);
void dlm_destroy_recovery_area(dlm_ctxt *dlm, u16 dead_node);
#define DLM_RECOVERY_THREAD_MS 2000
-#if 0
+
+#ifdef LOUSY_RECOVERY
+
/*
* RECOVERY THREAD
*/
void dlm_kick_recovery_thread(dlm_ctxt *dlm)
{
- /* wake the recovery thread */
+ /* wake the recovery thread
+ * this will wake the reco thread in one of three places
+ * 1) sleeping with no recovery happening
+ * 2) sleeping with recovery mastered elsewhere
+ * 3) recovery mastered here, waiting on reco data */
atomic_set(&dlm->reco.thread.woken, 1);
wake_up(&dlm->reco.thread.thread_wq);
}
@@ -168,12 +174,12 @@
dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
}
- spin_unlock(&dlm->spinlock);
-
if (dlm->reco.dead_node == NM_INVALID_SLOT_NUM) {
dlmprintk0("nothing to recover! sleeping now!\n");
+ spin_unlock(&dlm->spinlock);
goto sleep;
}
+ spin_unlock(&dlm->spinlock);
/* take write barrier */
/* (stops the list reshuffling thread, proxy ast handling) */
@@ -198,49 +204,53 @@
spin_unlock(&dlm->spinlock);
}
}
-
+
+ dlmprintk("RECOVERY! new_master=%u, this node=%u, dead_node=%u\n",
+ dlm->reco.new_master, dlm->group_index, dlm->reco.dead_node);
- if (dlm->reco.new_master == dlm->group_index) {
- status = dlm_remaster_locks_local(dlm);
- if (status < 0) {
- dlmprintk("error remastering locks for node %u!!!! retrying!\n",
- dlm->reco.dead_node);
- } else {
- // success! see if any other nodes need recovery
- spin_lock(&dlm->spinlock);
- clear_bit(dlm->reco.dead_node, dlm->recovery_map);
- spin_unlock(&dlm->spinlock);
- dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
- dlm->reco.new_master = NM_INVALID_SLOT_NUM;
- dlm->reco.sending_node = NM_INVALID_SLOT_NUM;
- dlm->reco.next_seq = 0;
- }
+ if (dlm->reco.new_master != dlm->group_index) {
+ /* it is safe to start everything back up here
+ * because all of the dead node's lock resources
+ * have been marked as in-recovery */
up_write(&dlm->recovery_sem);
- // pick another dead node
- continue;
- } else {
+
// sit around until new_master is dead or done
// we will get signalled by the waitqueue either way
dlmprintk("new_master %u is recovering dead_node %u... waiting...\n",
- dlm->reco.new_master, dlm->reco.dead_node);
+ dlm->reco.new_master, dlm->reco.dead_node);
+sleep:
+ atomic_set(&dlm->reco.thread.woken, 0);
+ status = util_wait_atomic_eq(&dlm->reco.thread.thread_wq,
+ &dlm->reco.thread.woken,
+ 1, DLM_RECOVERY_THREAD_MS);
+ if (status == 0 || status == -ETIMEDOUT) {
+ if (atomic_read(&dlm->reco.thread.woken))
+ dlmprintk0("aha!!! recovery thread woken!\n");
+ else
+ dlmprintk0("timed out waiting, running again\n");
+ continue;
+ }
+ dlmprintk("recovery thread got %d while waiting\n", status);
+ break;
}
+ /* new_master == dlm->group_index */
+ status = dlm_remaster_locks_local(dlm);
+ if (status < 0) {
+ dlmprintk("error remastering locks for node %u!!!! retrying!\n",
+ dlm->reco.dead_node);
+ } else {
+ // success! see if any other nodes need recovery
+ spin_lock(&dlm->spinlock);
+ clear_bit(dlm->reco.dead_node, dlm->recovery_map);
+ spin_unlock(&dlm->spinlock);
+ dlm->reco.dead_node = NM_INVALID_SLOT_NUM;
+ dlm->reco.new_master = NM_INVALID_SLOT_NUM;
+ dlm->reco.sending_node = NM_INVALID_SLOT_NUM;
+ dlm->reco.next_seq = 0;
+ }
up_write(&dlm->recovery_sem);
-
-sleep:
- atomic_set(&dlm->reco.thread.woken, 0);
- status = util_wait_atomic_eq(&dlm->reco.thread.thread_wq,
- &dlm->reco.thread.woken,
- 1, DLM_RECOVERY_THREAD_MS);
- if (status == 0 || status == -ETIMEDOUT) {
- if (atomic_read(&dlm->reco.thread.woken))
- dlmprintk0("aha!!! recovery thread woken!\n");
- else
- dlmprintk0("timed out waiting, running again\n");
- continue;
- }
- dlmprintk("recovery thread got %d while waiting\n", status);
- break;
+ // continue and look for another dead node
}
flush_scheduled_work();
@@ -262,11 +272,15 @@
/* +--- remove dead_node from recovery map */
/* +--- unset new_master and dead_node and start all over */
+static spinlock_t dlm_reco_state_lock = SPIN_LOCK_UNLOCKED;
+
static int dlm_remaster_locks_local(dlm_ctxt *dlm)
{
- int num_nodes = 255, i, status = 0;
- u32 node_map[8];
+ int num_nodes = NM_MAX_NODES, next=0, status = 0;
+ dlm_reco_node_data *ndata;
+ struct list_head *iter;
+ int all_nodes_done;
/* +- if this node is the new master, init the temp recovery area */
@@ -276,36 +290,173 @@
/* |- apply all temp area changes to real lock */
/* +- send ALL DONE message to each node */
-
- status = dlm_init_recovery_area(dlm, dlm->reco.dead_node, num_nodes);
+ status = dlm_init_recovery_area(dlm);
if (status < 0)
return status;
- spin_lock(&dlm->spinlock);
- num_nodes = nm_get_group_max_slots(dlm->group);
- memcpy(node_map, dlm->node_map, sizeof(node_map));
- spin_unlock(&dlm->spinlock);
+ spin_lock(&dlm_reco_state_lock);
+ list_for_each(iter, &dlm->reco.node_data) {
+ ndata = list_entry (iter, dlm_reco_node_data, list);
+ DLM_ASSERT(ndata->state == DLM_RECO_NODE_DATA_INIT);
+ ndata->state = DLM_RECO_NODE_DATA_REQUESTING;
- for (i=0; i<num_nodes; i++) {
- if (test_bit(i, node_map)) {
- spin_lock(&dlm->spinlock);
- dlm->reco.sending_node = i;
- dlm->reco.next_seq = 0;
- spin_unlock(&dlm->spinlock);
- status = dlm_request_all_locks(dlm, i, dlm->reco.dead_node);
- if (status < 0) {
- spin_lock(&dlm->spinlock);
- dlm->reco.sending_node = NM_INVALID_SLOT_NUM;
- dlm->reco.next_seq = 0;
- spin_unlock(&dlm->spinlock);
+ status = dlm_request_all_locks(dlm, ndata->node_num, dlm->reco.dead_node);
+ if (status < 0) {
+ dlm_destroy_recovery_area(dlm, dlm->reco.dead_node);
+ return status;
+ }
+
+ switch (ndata->state) {
+ case DLM_RECO_NODE_DATA_INIT:
+ case DLM_RECO_NODE_DATA_FINALIZE_SENT:
+ case DLM_RECO_NODE_DATA_REQUESTED:
+ DLM_ASSERT(0);
+ break;
+ case DLM_RECO_NODE_DATA_DEAD:
+ dlmprintk("eek. node %u died after requesting recovery info for node %u\n",
+ ndata->node_num, dlm->reco.dead_node);
+ spin_unlock(&dlm_reco_state_lock);
+ // start all over
dlm_destroy_recovery_area(dlm, dlm->reco.dead_node);
- return status;
+ return -EAGAIN;
+ case DLM_RECO_NODE_DATA_REQUESTING:
+ ndata->state = DLM_RECO_NODE_DATA_REQUESTED;
+ dlmprintk("now receiving recovery data from node %u for dead node %u\n",
+ ndata->node_num, dlm->reco.dead_node);
+ break;
+ case DLM_RECO_NODE_DATA_RECEIVING:
+ dlmprintk("already receiving recovery data from node %u for dead node %u\n",
+ ndata->node_num, dlm->reco.dead_node);
+ break;
+ case DLM_RECO_NODE_DATA_DONE:
+ dlmprintk("already DONE receiving recovery data from node %u for dead node %u\n",
+ ndata->node_num, dlm->reco.dead_node);
+ break;
+ }
+ }
+ spin_unlock(&dlm_reco_state_lock);
+
+ /* nodes should be sending reco data now
+ * just need to wait */
+
+ while (1) {
+ /* wait to be signalled, with periodic timeout
+ * to check for node death */
+ atomic_set(&dlm->reco.thread.woken, 0);
+ ret = util_wait_atomic_eq(&dlm->reco.thread.thread_wq,
+ &dlm->reco.thread.woken, 1,
+ DLM_RECOVERY_THREAD_MS);
+ if (ret == 0 || ret == -ETIMEDOUT) {
+ if (atomic_read(&dlm->reco.thread.woken))
+ dlmprintk0("waiting on reco data... aha!!! recovery thread woken!\n");
+ else
+ dlmprintk0("waiting on reco data... timed out waiting\n");
+ }
+
+ /* either way, recheck all the nodes now to see if we are
+ * done, or if anyone died */
+ all_nodes_done = 1;
+ spin_lock(&dlm_reco_state_lock);
+ list_for_each(iter, &dlm->reco.node_data) {
+ ndata = list_entry (iter, dlm_reco_node_data, list);
+
+ switch (ndata->state) {
+ case DLM_RECO_NODE_DATA_INIT:
+ case DLM_RECO_NODE_DATA_REQUESTING:
+ DLM_ASSERT(0);
+ break;
+ case DLM_RECO_NODE_DATA_DEAD:
+ dlmprintk("eek. node %u died after requesting recovery info for node %u\n",
+ ndata->node_num, dlm->reco.dead_node);
+ spin_unlock(&dlm_reco_state_lock);
+ // start all over
+ dlm_destroy_recovery_area(dlm, dlm->reco.dead_node);
+ return -EAGAIN;
+ case DLM_RECO_NODE_DATA_RECEIVING:
+ case DLM_RECO_NODE_DATA_REQUESTED:
+ all_nodes_done = 0;
+ break;
+ case DLM_RECO_NODE_DATA_DONE:
+ break;
+ case DLM_RECO_NODE_DATA_FINALIZE_SENT:
+ break;
}
}
+ spin_unlock(&dlm_reco_state_lock);
+
+ if (all_nodes_done) {
+ /* all nodes are now in DLM_RECO_NODE_DATA_DONE state
+ * just send a finalize message to everyone and
+ * clean up */
+ ret = dlm_finalize_recovery(dlm);
+ if (ret < 0) {
+ dlmprintk("dlm_finalize_recovery returned %d\n", ret);
+ }
+ dlm_destroy_recovery_area(dlm, dlm->reco.dead_node);
+ status = ret;
+ break;
+ }
}
+
return status;
}
+int dlm_init_recovery_area(dlm_ctxt *dlm)
+{
+ int num=0, ret;
+ dlm_reco_node_data *ndata;
+ LIST_HEAD(tmplist);
+
+ spin_lock(&dlm->spinlock);
+ memcpy(dlm->reco.node_map, dlm->node_map, sizeof(dlm->node_map));
+ /* nodes can only be removed (by dying) after dropping
+ * this lock, and death will be trapped later, so this should do */
+ spin_unlock(&dlm->spinlock);
+
+ while (1) {
+ num = find_next_bit (dlm->reco.node_map, NM_MAX_NODES, num);
+ if (num >= NM_MAX_NODES) {
+ break;
+ }
+ DLM_ASSERT(num != dead_node);
+
+ ndata = kmalloc(sizeof(dlm_reco_node_data), GFP_KERNEL);
+ if (!ndata) {
+ dlm_destroy_recovery_area(dlm, dead_node);
+ return -ENOMEM;
+ }
+ memset(ndata, 0, sizeof(dlm_reco_node_data));
+ ndata->node_num = num;
+ ndata->state = DLM_RECO_NODE_DATA_INIT;
+ LIST_HEAD_INIT(&ndata->granted);
+ LIST_HEAD_INIT(&ndata->converting);
+ LIST_HEAD_INIT(&ndata->blocked);
+ spin_lock(&dlm_reco_state_lock);
+ list_add_tail(&ndata->list, &dlm->reco.node_data);
+ spin_unlock(&dlm_reco_state_lock);
+ num++;
+ }
+
+ return 0;
+}
+
+void dlm_destroy_recovery_area(dlm_ctxt *dlm, u16 dead_node)
+{
+ struct list_head *iter, *iter2;
+ dlm_reco_node_data *ndata;
+ LIST_HEAD(tmplist);
+
+ spin_lock(&dlm_reco_state_lock);
+ list_splice_init(&dlm->reco.node_data, &tmplist);
+ spin_unlock(&dlm_reco_state_lock);
+
+#warning this probably needs to be smarter
+ list_for_each_safe(iter, iter2, &tmplist) {
+ ndata = list_entry (iter, dlm_reco_node_data, list);
+ kfree(ndata);
+ }
+}
+
int dlm_request_all_locks(dlm_ctxt *dlm, u16 request_from, u16 dead_node)
{
dlmprintk("dlm_request_all_locks: dead node is %u, sending request to %u\n",
@@ -315,10 +466,102 @@
return 0;
}
+typedef struct _dlm_reco_request_locks
+{
+ u16 dead_node;
+} dlm_reco_request_locks;
+
+typedef struct _dlm_reco_node_data
+{
+} dlm_reco_node_data;
+
+int dlm_request_all_locks_handler(net_msg *msg, u32 len, void *data)
+{
+#if 0
+ int status;
+ dlm_ctxt *dlm = data;
+ dlm_lock_resource *res;
+ dlm_lock *lock = NULL;
+ dlm_proxy_ast *past = (dlm_proxy_ast *) msg->buf;
+ struct qstr lockname = { .name=past->name, .len=past->namelen };
+ struct list_head *iter, *head=NULL;
+ u64 cookie = past->cookie;
+ u32 flags = past->flags;
+
+ if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
+ (LKM_PUT_LVB|LKM_GET_LVB)) {
+ dlmprintk("both PUT and GET lvb specified\n");
+ return DLM_BADARGS;
+ }
+
+ dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" :
+ (flags & LKM_GET_LVB ? "get lvb" : "none"));
+
+ lockname.hash = full_name_hash(lockname.name, lockname.len);
+
+ dlmprintk("type=%d, blocked_type=%d\n", past->type, past->blocked_type);
+
+ if (past->type != DLM_AST &&
+ past->type != DLM_BAST) {
+ dlmprintk("Eeeek unknown ast type! %d, cookie=%llu, name=%*s\n",
+ past->type, cookie, lockname.len, lockname.name);
+ return 0;
+ }
+
+ res = dlm_lookup_lock(dlm, &lockname);
+ if (!res) {
+ dlmprintk("eek! got %sast for unknown lockres! cookie=%llu, name=%*s, namelen=%d\n",
+ past->type == DLM_AST ? "" : "b", cookie, lockname.len, lockname.name, lockname.len);
+ return 0;
+ }
#endif
+}
+
+int dlm_reco_node_data_handler(net_msg *msg, u32 len, void *data)
+{
#if 0
+ int status;
+ dlm_ctxt *dlm = data;
+ dlm_lock_resource *res;
+ dlm_lock *lock = NULL;
+ dlm_proxy_ast *past = (dlm_proxy_ast *) msg->buf;
+ struct qstr lockname = { .name=past->name, .len=past->namelen };
+ struct list_head *iter, *head=NULL;
+ u64 cookie = past->cookie;
+ u32 flags = past->flags;
+ if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
+ (LKM_PUT_LVB|LKM_GET_LVB)) {
+ dlmprintk("both PUT and GET lvb specified\n");
+ return DLM_BADARGS;
+ }
+
+ dlmprintk("lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" :
+ (flags & LKM_GET_LVB ? "get lvb" : "none"));
+
+ lockname.hash = full_name_hash(lockname.name, lockname.len);
+
+ dlmprintk("type=%d, blocked_type=%d\n", past->type, past->blocked_type);
+
+ if (past->type != DLM_AST &&
+ past->type != DLM_BAST) {
+ dlmprintk("Eeeek unknown ast type! %d, cookie=%llu, name=%*s\n",
+ past->type, cookie, lockname.len, lockname.name);
+ return 0;
+ }
+
+ res = dlm_lookup_lock(dlm, &lockname);
+ if (!res) {
+ dlmprintk("eek! got %sast for unknown lockres! cookie=%llu, name=%*s, namelen=%d\n",
+ past->type == DLM_AST ? "" : "b", cookie, lockname.len, lockname.name, lockname.len);
+ return 0;
+ }
+#endif
+}
+
+
+
int dlm_recovery_request_handler(net_msg *msg, u32 len, void *data);
int dlm_recovery_response_handler(net_msg *msg, u32 len, void *data);
int dlm_recovery_lock_arr_req_handler(net_msg *msg, u32 len, void *data);
@@ -421,9 +664,6 @@
}
-/*
- * gawd i hate udp
- */
int dlm_recovery_lock_arr_req_handler(net_msg *msg, u32 len, void *data)
{
dlm_ctxt *dlm = data;
@@ -434,8 +674,6 @@
dlm_lock *lock = NULL;
int ret, i, out_of_order = 0;
- // TODO: ntoh(req)
-
ret = 0;
if (req->num_locks == 0)
goto send_response;
@@ -541,7 +779,7 @@
}
-#endif
+#endif /* LOUSY_RECOVERY */
#warning may need to change kfree to put_lock and refcounting here
static void dlm_do_local_recovery_cleanup(dlm_ctxt *dlm, u16 dead_node, int locked)
Modified: trunk/cluster/tcp.c
===================================================================
--- trunk/cluster/tcp.c 2005-01-08 02:58:35 UTC (rev 1750)
+++ trunk/cluster/tcp.c 2005-01-10 22:08:58 UTC (rev 1751)
@@ -149,7 +149,7 @@
static int net_receive(void);
static int net_accept_tcp_connections(void);
static void net_release_tcp_sock(void);
-static int net_dispatch_message(struct inode *inode, struct socket *sock, net_msg *hdr, net_msg_handler *hnd);
+static int net_dispatch_message(struct inode *inode, struct socket *sock, int len, net_msg_handler *hnd);
static int net_ioctl (struct inode *inode, struct file *filp, unsigned int cmd, unsigned long arg);
int gsd_message_action(gsd_message *g);
@@ -287,6 +287,7 @@
ret = gsd_message_action(&g);
} else {
/* create the group on remote node */
+ gsd_message_to_net(&g);
ret = net_send_message(GSD_MESSAGE, 0, &g, sizeof(g), to, &response);
if (ret == 0)
ret = response;
@@ -318,6 +319,7 @@
ret = gsd_message_action(&g);
} else {
/* create the group on remote node */
+ gsd_message_to_net(&g);
ret = net_send_message(GSD_MESSAGE, 0, &g, sizeof(g), to, &response);
if (ret == 0)
ret = response;
@@ -522,7 +524,9 @@
int gsd_message_handler(net_msg *msg, u32 len, void *data)
{
- return gsd_message_action((gsd_message *)msg->buf);
+ gsd_message *g = (gsd_message *)msg->buf;
+ gsd_message_to_host(g);
+ return gsd_message_action(g);
}
int gsd_message_action(gsd_message *g)
@@ -680,45 +684,7 @@
-net_msg * net_package_message(u32 msg_type, u32 key, void *data, u32 len)
-{
- net_msg *ret = NULL;
- net_msg_handler *handler = NULL;
- u32 packet_len;
- spin_lock(&net_handler_lock);
- handler = net_lookup_handler(msg_type, key);
- spin_unlock(&net_handler_lock);
-
- if (!handler) {
- netprintk("no such message type: %u/%u\n", msg_type, key);
- return NULL;
- }
- if (!net_handler_msg_len_ok(handler, len)) {
- netprintk("len for message type %u incorrect: %u, should be %u\n",
- msg_type, len, handler->max_len);
- goto done;
- }
- packet_len = len + sizeof(net_msg);
- ret = kmalloc(packet_len, GFP_KERNEL);
- if (!ret) {
- netprintk("failed to allocate %u bytes for message!\n", packet_len);
- goto done;
- }
- memset(ret, 0, packet_len);
- ret->magic = NET_MSG_MAGIC;
- ret->data_len = len;
- ret->msg_type = msg_type;
- ret->key = key;
- if (len > 0)
- memcpy(&(ret->buf[0]), data, len);
-
-done:
- if (handler)
- net_put_handler(handler);
- return ret;
-}
-
/* TODO Fix */
static void net_remove_handlers(void)
{
@@ -888,6 +854,8 @@
* - all you need prior to this call is to have inited the
* net stuff, to have a valid inode for the node to contact
* in nm, and to have registered the message handler
+ * - if status was requested, it will be returned to the caller
+ * already converted to host byteorder
*/
int net_send_message(u32 msg_type, u32 key, void *data, u32 len, struct inode *inode, int *status)
{
@@ -980,6 +948,8 @@
spin_unlock(&net->sock_lock);
}
+ /* finally, convert the message header to network byte-order and send */
+ net_msg_to_net(msg);
ret = net_send_tcp_msg(inode, NULL, msg, packet_len);
if (status) {
@@ -1124,6 +1094,8 @@
spin_unlock(&net->sock_lock);
}
+ /* finally, convert the message header to network byte-order and send */
+ net_msg_to_net(msg);
ret = net_send_tcp_msg(inode, NULL, msg, packet_len);
if (status) {
@@ -1216,6 +1188,9 @@
netprintk0("failed to receive message!\n");
goto error;
}
+
+ /* convert the header to host byteorder */
+ net_msg_to_host(&hdr);
netprintk("received message header... magic=%u type=%u key=%u\n",
hdr.magic, hdr.msg_type, hdr.key);
@@ -1253,7 +1228,7 @@
netprintk0("no handler for message.\n");
goto error;
}
- err = net_dispatch_message(inode, sock, &hdr, hnd);
+ err = net_dispatch_message(inode, sock, hdr.data_len, hnd);
/* if node has requested status return, do it now */
if (hdr.status) {
@@ -1266,6 +1241,9 @@
hdr.magic = NET_MSG_STATUS_MAGIC; // twiddle the magic
hdr.data_len = 0;
netprintk("about to send status %d\n", err);
+
+ /* hdr has been in host byteorder this whole time */
+ net_msg_to_net(&hdr);
tmperr = net_send_tcp_msg(inode, sock, &hdr, sizeof(net_msg));
netprintk0("yay, sent!\n");
} else if (err < 0) {
@@ -1332,12 +1310,11 @@
spin_unlock(&net_status_lock);
}
-static int net_dispatch_message(struct inode *inode, struct socket *sock, net_msg *hdr, net_msg_handler *hnd)
+static int net_dispatch_message(struct inode *inode, struct socket *sock, int len, net_msg_handler *hnd)
{
int ret = -EINVAL;
- int len, packet_len;
+ int packet_len;
- len = hdr->data_len;
packet_len = len + sizeof(net_msg);
spin_lock(&hnd->lock);
@@ -1360,6 +1337,10 @@
if (ret < 0) {
netprintk("net_recv_tcp_msg returned: %d\n", ret);
} else {
+ /* convert just the header to host byteorder
+ * handler func is responsible for the data */
+ net_msg_to_host((net_msg *)hnd->buf);
+
net_num_dispatched++;
ret = (hnd->func)((net_msg *)hnd->buf, packet_len, hnd->data);
}
@@ -1492,6 +1473,7 @@
err.magic = NET_MSG_MAGIC;
err.msg_type = err_type;
err.data_len = 0;
+ net_msg_to_net(&err);
msg.msg_name = 0;
msg.msg_namelen = 0;
Modified: trunk/cluster/tcp.h
===================================================================
--- trunk/cluster/tcp.h 2005-01-08 02:58:35 UTC (rev 1750)
+++ trunk/cluster/tcp.h 2005-01-10 22:08:58 UTC (rev 1751)
@@ -61,6 +61,29 @@
__u64 msg_num;
__u8 buf[0];
} net_msg;
+
+static inline void net_msg_to_net(net_msg *m)
+{
+ m->magic = htonl(m->magic);
+ m->data_len = htonl(m->data_len);
+ m->src_node = htons(m->src_node);
+ m->dst_node = htons(m->dst_node);
+ m->msg_type = htonl(m->msg_type);
+ m->key = htonl(m->key);
+ m->status = htonl(m->status);
+ m->msg_num = cpu_to_be64(m->msg_num);
+}
+static inline void net_msg_to_host(net_msg *m)
+{
+ m->magic = ntohl(m->magic);
+ m->data_len = ntohl(m->data_len);
+ m->src_node = ntohs(m->src_node);
+ m->dst_node = ntohs(m->dst_node);
+ m->msg_type = ntohl(m->msg_type);
+ m->key = ntohl(m->key);
+ m->status = ntohl(m->status);
+ m->msg_num = be64_to_cpu(m->msg_num);
+}
#else
#define NET_MSG_MAGIC ((u16)0xfa55)
@@ -77,6 +100,24 @@
__u8 buf[0];
} net_msg;
+static inline void net_msg_to_net(net_msg *m)
+{
+ m->magic = htons(m->magic);
+ m->data_len = htons(m->data_len);
+ m->msg_type = htons(m->msg_type);
+ m->status = htons(m->status);
+ m->key = htonl(m->key);
+ m->msg_num = htonl(m->msg_num);
+}
+static inline void net_msg_to_host(net_msg *m)
+{
+ m->magic = ntohs(m->magic);
+ m->data_len = ntohs(m->data_len);
+ m->msg_type = ntohs(m->msg_type);
+ m->status = ntohs(m->status);
+ m->key = ntohl(m->key);
+ m->msg_num = ntohl(m->msg_num);
+}
#endif
typedef int (net_msg_handler_func)(net_msg *msg, u32 len, void *data);
@@ -200,7 +241,6 @@
int net_register_handler(u32 msg_type, u32 key, int flags,
u32 max_len, net_msg_handler_func *func, void *data, void *buf);
-net_msg * net_package_message(u32 msg_type, u32 key, void *data, u32 len);
int net_recv_tcp_msg (struct inode *inode, struct socket *sock, void *data, u32 *packet_len);
int net_send_tcp_msg (struct inode *inode, struct socket *sock, void *data, u32 packet_len);
int net_send_error(struct socket *sock, u32 err_type);
@@ -222,4 +262,13 @@
u8 name[NM_MAX_NAME_LEN];
} gsd_message;
+static inline void gsd_message_to_net(gsd_message *g)
+{
+ g->from = htons(g->from);
+}
+static inline void gsd_message_to_host(gsd_message *g)
+{
+ g->from = ntohs(g->from);
+}
+
#endif /* CLUSTER_TCP_H */
More information about the Ocfs2-commits
mailing list