[Ocfs2-commits] mfasheh commits r1655 - branches/dlm-glue/src
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Fri Nov 19 17:44:30 CST 2004
Author: mfasheh
Date: 2004-11-19 17:44:28 -0600 (Fri, 19 Nov 2004)
New Revision: 1655
Added:
branches/dlm-glue/src/slot_map.c
branches/dlm-glue/src/slot_map.h
Modified:
branches/dlm-glue/src/Makefile
branches/dlm-glue/src/alloc.c
branches/dlm-glue/src/dlmglue.c
branches/dlm-glue/src/dlmglue.h
branches/dlm-glue/src/heartbeat.c
branches/dlm-glue/src/heartbeat.h
branches/dlm-glue/src/inode.c
branches/dlm-glue/src/journal.c
branches/dlm-glue/src/localalloc.c
branches/dlm-glue/src/namei.c
branches/dlm-glue/src/ocfs.h
branches/dlm-glue/src/ocfs2_fs.h
branches/dlm-glue/src/ocfs_journal.h
branches/dlm-glue/src/ocfs_log.h
branches/dlm-glue/src/proc.c
branches/dlm-glue/src/suballoc.c
branches/dlm-glue/src/super.c
branches/dlm-glue/src/sysfile.c
branches/dlm-glue/src/sysfile.h
Log:
* too many changes to list here. highlights:
- dlm-glue stuff is even more abstracted out. it could still use more
though..
- we now have a cluster wide super block lock
- we now use a disk slot map to figure out who's mounted and translate
between global node numbers and disk slots.
- first cut at getting cluster recovery going with this new stuff.
- lots more...
Modified: branches/dlm-glue/src/Makefile
===================================================================
--- branches/dlm-glue/src/Makefile 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/Makefile 2004-11-19 23:44:28 UTC (rev 1655)
@@ -79,6 +79,7 @@
localalloc.c \
namei.c \
proc.c \
+ slot_map.c \
suballoc.c \
super.c \
symlink.c \
@@ -107,6 +108,7 @@
localalloc.h \
namei.h \
proc.h \
+ slot_map.h \
suballoc.h \
super.h \
symlink.h \
Modified: branches/dlm-glue/src/alloc.c
===================================================================
--- branches/dlm-glue/src/alloc.c 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/alloc.c 2004-11-19 23:44:28 UTC (rev 1655)
@@ -209,7 +209,7 @@
/* we always use node zeros suballocator */
eb->h_suballoc_node = 0;
#else
- eb->h_suballoc_node = osb->node_num;
+ eb->h_suballoc_node = osb->slot_num;
#endif
eb->h_suballoc_bit = suballoc_bit_start;
eb->h_list.l_count = ocfs2_extent_recs_per_eb(osb->sb);
Modified: branches/dlm-glue/src/dlmglue.c
===================================================================
--- branches/dlm-glue/src/dlmglue.c 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/dlmglue.c 2004-11-19 23:44:28 UTC (rev 1655)
@@ -32,6 +32,7 @@
#include <dlmutil.h>
#include <dlmcommon.h>
+#include <dlmhb.h>
#include <dlmnm.h>
#include <dlmnet.h>
#include <dlmmod.h>
@@ -45,6 +46,7 @@
#include "extent_map.h"
#include "heartbeat.h"
#include "inode.h"
+#include "slot_map.h"
#include "util.h"
#include "ocfs_journal.h"
@@ -52,6 +54,7 @@
#define OCFS_DEBUG_CONTEXT OCFS_DEBUG_CONTEXT_DLMGLUE
+#if 0
#warning REMOVE THESE STUBS
dlm_status dlmlock(dlm_ctxt *dlm,
@@ -88,6 +91,16 @@
kfree(dlm);
}
+int hb_register_callback(int type, hb_cb_func *func, void *data, int priority)
+{
+ return 0;
+}
+int hb_unregister_callback(int type, hb_cb_func *func, void *data)
+{
+ return 0;
+}
+#endif
+
/* lock ids are made up in the following manner:
* name[0] --> type
* name[1-6] --> 6 pad characters, reserved for now
@@ -99,7 +112,8 @@
static char ocfs2_lock_type_char[OCFS_NUM_LOCK_TYPES] = {
[OCFS_TYPE_META] 'M',
- [OCFS_TYPE_DATA] 'D'
+ [OCFS_TYPE_DATA] 'D',
+ [OCFS_TYPE_SUPER] 'S'
};
static int ocfs2_build_lock_name(enum ocfs2_lock_type type,
@@ -107,18 +121,49 @@
u32 generation,
char **ret);
-static void ocfs2_ast_func(void *opaque);
-/* so far, all locks have gotten along with the same BAST. */
-static void ocfs2_bast_func(void *opaque, int level);
+static void ocfs2_inode_ast_func(void *opaque);
+static void ocfs2_inode_bast_func(void *opaque, int level);
+static void ocfs2_super_ast_func(void *opaque);
+static void ocfs2_super_bast_func(void *opaque, int level);
+/* so far, all locks have gotten along with the same unlock ast */
+static void ocfs2_unlock_ast_func(void *opaque, dlm_status status);
static dlm_astlockfunc_t *ocfs2_lock_type_asts[OCFS_NUM_LOCK_TYPES] = {
- [OCFS_TYPE_META] ocfs2_ast_func,
- [OCFS_TYPE_DATA] ocfs2_ast_func
+ [OCFS_TYPE_META] ocfs2_inode_ast_func,
+ [OCFS_TYPE_DATA] ocfs2_inode_ast_func,
+ [OCFS_TYPE_SUPER] ocfs2_super_ast_func
};
static dlm_bastlockfunc_t *ocfs2_lock_type_basts[OCFS_NUM_LOCK_TYPES] = {
- [OCFS_TYPE_META] ocfs2_bast_func,
- [OCFS_TYPE_DATA] ocfs2_bast_func
+ [OCFS_TYPE_META] ocfs2_inode_bast_func,
+ [OCFS_TYPE_DATA] ocfs2_inode_bast_func,
+ [OCFS_TYPE_SUPER] ocfs2_super_bast_func
};
+
+static inline int ocfs2_is_inode_lock(ocfs2_lock_res *lockres)
+{
+ return lockres->l_type == OCFS_TYPE_META ||
+ lockres->l_type == OCFS_TYPE_DATA;
+}
+
+static inline int ocfs2_is_super_lock(ocfs2_lock_res *lockres)
+{
+ return lockres->l_type == OCFS_TYPE_SUPER;
+}
+
+static inline ocfs_super * ocfs2_lock_res_super(ocfs2_lock_res *lockres)
+{
+ OCFS_ASSERT(ocfs2_is_super_lock(lockres));
+
+ return (ocfs_super *) lockres->l_priv;
+}
+
+static inline struct inode * ocfs2_lock_res_inode(ocfs2_lock_res *lockres)
+{
+ OCFS_ASSERT(ocfs2_is_inode_lock(lockres));
+
+ return (struct inode *) lockres->l_priv;
+}
+
static int ocfs2_lock_create(ocfs_super *osb,
ocfs2_lock_res *lockres,
int level,
@@ -127,11 +172,15 @@
int wanted);
static int ocfs2_cluster_lock(ocfs_super *osb,
ocfs2_lock_res *lockres,
- int level);
-static void ocfs2_unlock_ast_func(void *opaque, dlm_status status);
+ int level,
+ int lkm_flags);
+void ocfs2_cluster_unlock(ocfs_super *osb,
+ ocfs2_lock_res *lockres,
+ int level);
static void ocfs2_inc_inode_seq(ocfs_super *osb,
struct inode *inode);
static void ocfs2_schedule_blocked_inode(struct inode *inode);
+static void ocfs2_schedule_blocked_super(ocfs_super *osb);
static inline void ocfs2_recover_from_dlm_error(ocfs2_lock_res *lockres,
int convert);
static void ocfs2_vote_on_unlock(ocfs_super *osb,
@@ -168,7 +217,8 @@
static int ocfs2_process_blocked_data(struct inode *inode,
int *requeue);
static int ocfs2_do_request_vote(ocfs_super *osb,
- struct inode *inode,
+ u64 blkno,
+ unsigned int generation,
enum ocfs2_vote_request type);
static inline int ocfs2_lvb_is_trustable(ocfs2_lock_res *lockres)
@@ -246,22 +296,24 @@
return (len);
}
-int ocfs2_lock_res_init(ocfs2_lock_res *res,
- enum ocfs2_lock_type type,
- struct inode *inode)
+int ocfs2_inode_lock_res_init(ocfs2_lock_res *res,
+ enum ocfs2_lock_type type,
+ struct inode *inode)
{
int status;
LOG_ENTRY();
+ OCFS_ASSERT(type == OCFS_TYPE_META ||
+ type == OCFS_TYPE_DATA);
+
memset(res, 0, sizeof(ocfs2_lock_res));
-
spin_lock_init(&res->l_lock);
init_waitqueue_head(&res->l_event);
- res->l_inode = inode;
res->l_type = type;
res->l_level = LKM_IVMODE;
+ res->l_priv = inode;
status = ocfs2_build_lock_name(type,
OCFS_I(inode)->ip_blkno,
inode->i_generation,
@@ -276,6 +328,35 @@
return status;
}
+int ocfs2_super_lock_res_init(ocfs2_lock_res *res,
+ ocfs_super *osb)
+{
+ enum ocfs2_lock_type type = OCFS_TYPE_SUPER;
+ int status;
+
+ LOG_ENTRY();
+
+ memset(res, 0, sizeof(ocfs2_lock_res));
+ spin_lock_init(&res->l_lock);
+ init_waitqueue_head(&res->l_event);
+ res->l_type = type;
+ res->l_level = LKM_IVMODE;
+
+ res->l_priv = osb;
+ status = ocfs2_build_lock_name(type,
+ 0ULL,
+ 0,
+ &res->l_name);
+ if (status < 0) {
+ LOG_ERROR_STATUS(status);
+ goto bail;
+ }
+
+bail:
+ LOG_EXIT_STATUS(status);
+ return status;
+}
+
void ocfs2_lock_res_free(ocfs2_lock_res *res)
{
if (res->l_name)
@@ -318,7 +399,7 @@
}
}
-static inline void ocfs2_handle_downconvert_action(ocfs2_lock_res *lockres)
+static inline void ocfs2_generic_handle_downconvert_action(ocfs2_lock_res *lockres)
{
OCFS_ASSERT(lockres->l_flags & OCFS2_LOCK_BUSY);
OCFS_ASSERT(lockres->l_flags & OCFS2_LOCK_ATTACHED);
@@ -331,16 +412,6 @@
wake_up_all(&lockres->l_event);
}
-static inline void ocfs2_handle_data_convert_action(struct inode *inode,
- ocfs2_lock_res *lockres)
-{
- OCFS_ASSERT(lockres->l_flags & OCFS2_LOCK_BUSY);
- OCFS_ASSERT(lockres->l_flags & OCFS2_LOCK_ATTACHED);
-
- lockres->l_level = lockres->l_requested;
- lockres->l_flags &= ~OCFS2_LOCK_BUSY;
-}
-
static void ocfs2_inc_inode_seq(ocfs_super *osb,
struct inode *inode)
{
@@ -365,66 +436,63 @@
atomic_read(seq));
}
-static inline void ocfs2_handle_meta_convert_action(struct inode *inode,
- ocfs2_lock_res *lockres)
+static inline void ocfs2_generic_handle_convert_action(ocfs2_lock_res *lockres)
{
- ocfs_super *osb = OCFS2_SB(inode->i_sb);
-
OCFS_ASSERT(lockres->l_flags & OCFS2_LOCK_BUSY);
OCFS_ASSERT(lockres->l_flags & OCFS2_LOCK_ATTACHED);
/* Convert from RO to EX doesn't really need anything as our
* information is already up to data. Convert from NL to
- * *anything* however should mark the inode as needing an
- * update. */
- if (lockres->l_level == LKM_NLMODE) {
- ocfs2_inc_inode_seq(osb, inode);
+ * *anything* however should mark ourselves as needing an
+ * update */
+ if (lockres->l_level == LKM_NLMODE)
lockres->l_flags |= OCFS2_LOCK_NEEDS_REFRESH;
- }
lockres->l_level = lockres->l_requested;
lockres->l_flags &= ~OCFS2_LOCK_BUSY;
}
-static inline void ocfs2_handle_attach_action(struct inode *inode,
- ocfs2_lock_res *lockres)
+static inline void ocfs2_handle_meta_convert_action(struct inode *inode,
+ ocfs2_lock_res *lockres)
{
ocfs_super *osb = OCFS2_SB(inode->i_sb);
+ /* generic_handle_convert_action will set the refresh flag for us. */
+ if (lockres->l_level == LKM_NLMODE)
+ ocfs2_inc_inode_seq(osb, inode);
+ ocfs2_generic_handle_convert_action(lockres);
+}
+
+static inline void ocfs2_generic_handle_attach_action(ocfs2_lock_res *lockres)
+{
OCFS_ASSERT(lockres->l_flags & OCFS2_LOCK_BUSY);
OCFS_ASSERT(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
- /* skip the cache thunk for nlmode requests and local (new
- * inode) locks. */
if (lockres->l_requested > LKM_NLMODE &&
- !(lockres->l_flags & OCFS2_LOCK_LOCAL)) {
- ocfs2_inc_inode_seq(osb, inode);
+ !(lockres->l_flags & OCFS2_LOCK_LOCAL))
lockres->l_flags |= OCFS2_LOCK_NEEDS_REFRESH;
- }
lockres->l_level = lockres->l_requested;
lockres->l_flags |= OCFS2_LOCK_ATTACHED;
- /* should this part be in ocfs2_ast_func? */
lockres->l_flags &= ~OCFS2_LOCK_BUSY;
}
-/* can we get a lock type in this proto to? */
-static void ocfs2_ast_func(void *opaque)
+static void ocfs2_inode_ast_func(void *opaque)
{
ocfs2_lock_res *lockres = opaque;
- struct inode *inode = lockres->l_inode;
+ struct inode *inode = ocfs2_lock_res_inode(lockres);
+ ocfs_super *osb = OCFS2_SB(inode->i_sb);
dlm_lockstatus *lksb;
#ifdef OCFS2_VERBOSE_LOCKING_TRACE
printk("AST fired for inode %llu\n", OCFS_I(inode)->ip_blkno);
#endif
- OCFS_ASSERT(lockres->l_type == OCFS_TYPE_META ||
- lockres->l_type == OCFS_TYPE_DATA);
+ OCFS_ASSERT(ocfs2_is_inode_lock(lockres));
spin_lock(&lockres->l_lock);
lksb = &(lockres->l_lksb);
if (lksb->status != DLM_NORMAL) {
- printk("ocfs2_meta_ast_func: lksb status value of %u on "
+ printk("ocfs2_inode_ast_func: lksb status value of %u on "
"inode %llu\n", lksb->status, OCFS_I(inode)->ip_blkno);
spin_unlock(&lockres->l_lock);
return;
@@ -432,20 +500,30 @@
switch(lockres->l_action) {
case OCFS2_AST_ATTACH:
- ocfs2_handle_attach_action(inode, lockres);
+ if (lockres->l_type == OCFS_TYPE_META &&
+ lockres->l_requested > LKM_NLMODE &&
+ !(lockres->l_flags & OCFS2_LOCK_LOCAL))
+ ocfs2_inc_inode_seq(osb, inode);
+
+ ocfs2_generic_handle_attach_action(lockres);
break;
case OCFS2_AST_CONVERT:
if (lockres->l_type == OCFS_TYPE_META)
ocfs2_handle_meta_convert_action(inode, lockres);
else
- ocfs2_handle_data_convert_action(inode, lockres);
+ ocfs2_generic_handle_convert_action(lockres);
break;
case OCFS2_AST_DOWNCONVERT:
- ocfs2_handle_downconvert_action(lockres);
+ ocfs2_generic_handle_downconvert_action(lockres);
break;
default:
BUG();
}
+
+ /* data locking ignores refresh flag for now. */
+ if (lockres->l_type == OCFS_TYPE_DATA)
+ lockres->l_flags &= ~OCFS2_LOCK_NEEDS_REFRESH;
+
/* set it to something invalid so if we get called again we
* can catch it. */
lockres->l_action = OCFS2_AST_INVALID;
@@ -453,21 +531,80 @@
wake_up_all(&lockres->l_event);
}
-static void ocfs2_bast_func(void *opaque, int level)
+static void ocfs2_generic_handle_bast(ocfs2_lock_res *lockres, int level)
{
+ spin_lock(&lockres->l_lock);
+ lockres->l_flags |= OCFS2_LOCK_BLOCKED;
+ if (level > lockres->l_blocking)
+ lockres->l_blocking = level;
+ spin_unlock(&lockres->l_lock);
+}
+
+static void ocfs2_inode_bast_func(void *opaque, int level)
+{
ocfs2_lock_res *lockres = opaque;
- struct inode *inode = lockres->l_inode;
+ struct inode *inode = ocfs2_lock_res_inode(lockres);
ocfs_super *osb = OCFS2_SB(inode->i_sb);
#ifdef OCFS2_VERBOSE_LOCKING_TRACE
printk("BAST fired for inode %llu\n", OCFS_I(inode)->ip_blkno);
#endif
+ ocfs2_generic_handle_bast(lockres, level);
+
+ ocfs2_schedule_blocked_inode(inode);
+ ocfs2_kick_vote_thread(osb);
+}
+
+static void ocfs2_super_ast_func(void *opaque)
+{
+ ocfs2_lock_res *lockres = opaque;
+ dlm_lockstatus *lksb;
+
+#ifdef OCFS2_VERBOSE_LOCKING_TRACE
+ printk("AST fired for inode %llu\n", OCFS_I(inode)->ip_blkno);
+#endif
+ OCFS_ASSERT(ocfs2_is_super_lock(lockres));
+
spin_lock(&lockres->l_lock);
- lockres->l_flags |= OCFS2_LOCK_BLOCKED;
- if (level > lockres->l_blocking)
- lockres->l_blocking = level;
+ lksb = &(lockres->l_lksb);
+ if (lksb->status != DLM_NORMAL) {
+ printk("ocfs2_super_ast_func: lksb status value of %u!\n",
+ lksb->status);
+ spin_unlock(&lockres->l_lock);
+ return;
+ }
+
+ switch(lockres->l_action) {
+ case OCFS2_AST_ATTACH:
+ ocfs2_generic_handle_attach_action(lockres);
+ break;
+ case OCFS2_AST_CONVERT:
+ ocfs2_generic_handle_convert_action(lockres);
+ break;
+ case OCFS2_AST_DOWNCONVERT:
+ ocfs2_generic_handle_downconvert_action(lockres);
+ break;
+ default:
+ BUG();
+ }
+ /* set it to something invalid so if we get called again we
+ * can catch it. */
+ lockres->l_action = OCFS2_AST_INVALID;
spin_unlock(&lockres->l_lock);
- ocfs2_schedule_blocked_inode(inode);
+ wake_up_all(&lockres->l_event);
+}
+
+static void ocfs2_super_bast_func(void *opaque, int level)
+{
+ ocfs2_lock_res *lockres = opaque;
+ ocfs_super *osb = ocfs2_lock_res_super(lockres);
+
+#ifdef OCFS2_VERBOSE_LOCKING_TRACE
+ printk("Superblock BAST fired\n");
+#endif
+ ocfs2_generic_handle_bast(lockres, level);
+
+ ocfs2_schedule_blocked_super(osb);
ocfs2_kick_vote_thread(osb);
}
@@ -570,7 +707,8 @@
static int ocfs2_cluster_lock(ocfs_super *osb,
ocfs2_lock_res *lockres,
- int level)
+ int level,
+ int lkm_flags)
{
int ret;
enum ocfs2_lock_type type = lockres->l_type;
@@ -578,6 +716,12 @@
LOG_ENTRY();
+#warning "this is ignored for now!"
+ /* Still waiting for this to be implemented in dlmmod, for now
+ * we fake a response */
+ if (lkm_flags & LKM_NOQUEUE)
+ return -EAGAIN;
+
again:
if (signal_pending(current)) {
ret = -EINTR;
@@ -634,8 +778,13 @@
lockres,
ocfs2_lock_type_basts[type]);
if (status != DLM_NORMAL) {
- LOG_ERROR_ARGS("Dlm returns %d\n", status);
- ret = -ENOENT;
+ if ((lkm_flags & LKM_NOQUEUE) &&
+ (status == DLM_NOTQUEUED))
+ ret = -EAGAIN;
+ else {
+ LOG_ERROR_ARGS("Dlm returns %d\n", status);
+ ret = -ENOENT;
+ }
ocfs2_recover_from_dlm_error(lockres, 1);
goto bail;
}
@@ -655,6 +804,16 @@
return ret;
}
+void ocfs2_cluster_unlock(ocfs_super *osb,
+ ocfs2_lock_res *lockres,
+ int level)
+{
+ spin_lock(&lockres->l_lock);
+ ocfs2_dec_holders(lockres, level);
+ ocfs2_vote_on_unlock(osb, lockres);
+ spin_unlock(&lockres->l_lock);
+}
+
/* Grants us an EX lock on the data and metadata resources, skipping
* the normal cluster directory lookup. Use this ONLY on newly created
* inodes which other nodes can't possibly see, and which haven't been
@@ -722,7 +881,7 @@
level = write ? LKM_EXMODE : LKM_PRMODE;
- status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level);
+ status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0);
if (status < 0 && status != -EINTR)
LOG_ERROR_STATUS(status);
@@ -760,15 +919,10 @@
void ocfs2_data_unlock(struct inode *inode,
int write)
{
- int level;
+ int level = write ? LKM_EXMODE : LKM_PRMODE;
ocfs2_lock_res *lockres = &OCFS_I(inode)->ip_data_lockres;
- level = write ? LKM_EXMODE : LKM_PRMODE;
-
- spin_lock(&lockres->l_lock);
- ocfs2_dec_holders(lockres, level);
- ocfs2_vote_on_unlock(OCFS2_SB(inode->i_sb), lockres);
- spin_unlock(&lockres->l_lock);
+ ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
}
static inline int ocfs2_wait_on_recovery(ocfs_super *osb)
@@ -860,17 +1014,18 @@
}
}
-/* may or may not return a bh if it went to disk. */
-static int ocfs2_meta_lock_update(struct inode *inode,
- struct buffer_head **bh)
+/* Determine whether a lock resource needs to be refreshed, and
+ * arbitrate who gets to refresh it.
+ *
+ * -1 means error, 0 means no refresh needed, > 0 means you need to
+ * refresh this and you MUST call ocfs2_complete_lock_res_refresh
+ * afterwards. */
+static int ocfs2_should_refresh_lock_res(ocfs2_lock_res *lockres)
{
+
int status = 0;
- u32 trustable_clusters = 0;
- ocfs2_lock_res *lockres;
- ocfs2_dinode *fe;
+ LOG_ENTRY();
- lockres = &OCFS_I(inode)->ip_meta_lockres;
-
refresh_check:
spin_lock(&lockres->l_lock);
if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
@@ -892,6 +1047,46 @@
lockres->l_flags |= OCFS2_LOCK_REFRESHING;
spin_unlock(&lockres->l_lock);
+ status = 1;
+bail:
+ LOG_EXIT_STATUS(status);
+ return status;
+}
+
+/* If status is non zero, I'll mark it as not being in refresh
+ * anymroe, but i won't clear the needs refresh flag. */
+static inline void ocfs2_complete_lock_res_refresh(ocfs2_lock_res *lockres,
+ int status)
+{
+ spin_lock(&lockres->l_lock);
+ lockres->l_flags &= ~OCFS2_LOCK_REFRESHING;
+ if (!status)
+ lockres->l_flags &= ~OCFS2_LOCK_NEEDS_REFRESH;
+ spin_unlock(&lockres->l_lock);
+
+ wake_up_all(&lockres->l_event);
+}
+
+/* may or may not return a bh if it went to disk. */
+static int ocfs2_meta_lock_update(struct inode *inode,
+ struct buffer_head **bh)
+{
+ int status;
+ u32 trustable_clusters = 0;
+ ocfs2_lock_res *lockres;
+ ocfs2_dinode *fe;
+
+ lockres = &OCFS_I(inode)->ip_meta_lockres;
+
+ status = ocfs2_should_refresh_lock_res(lockres);
+ if (!status)
+ goto bail;
+ if (status < 0) {
+ if (status != -EINTR)
+ LOG_ERROR_STATUS(status);
+ goto bail;
+ }
+
/* we don't want to use the LVB for bitmap files as the
* used/set bit union is not currently sent over the wire. */
if (!(OCFS_I(inode)->ip_flags & OCFS_INODE_BITMAP) &&
@@ -932,22 +1127,18 @@
ocfs2_set_local_seq_from_lvb(lockres);
ocfs2_reset_meta_lvb_values(inode);
- spin_lock(&lockres->l_lock);
- lockres->l_flags &= ~OCFS2_LOCK_REFRESHING;
- lockres->l_flags &= ~OCFS2_LOCK_NEEDS_REFRESH;
- spin_unlock(&lockres->l_lock);
-
- wake_up_all(&lockres->l_event);
+ ocfs2_complete_lock_res_refresh(lockres, 0);
bail:
return status;
}
-int ocfs2_meta_lock(struct inode *inode,
- ocfs_journal_handle *handle,
- struct buffer_head **ret_bh,
- int ex)
+int ocfs2_meta_lock_flags(struct inode *inode,
+ ocfs_journal_handle *handle,
+ struct buffer_head **ret_bh,
+ int ex,
+ int flags)
{
- int status, level;
+ int status, level, dlm_flags;
ocfs2_lock_res *lockres;
ocfs_super *osb = OCFS2_SB(inode->i_sb);
struct buffer_head *bh = NULL;
@@ -964,9 +1155,7 @@
ex ? "EXMODE" : "PRMODE", OCFS_I(inode)->ip_blkno);
#endif
- /* we skip recovery wait on journal inodes as those can be
- * locked from ocfs_recover_node. */
- if (!INODE_JOURNAL(inode)) {
+ if (!(flags & OCFS2_META_LOCK_RECOVERY)) {
status = ocfs2_wait_on_recovery(osb);
if (status < 0)
goto bail;
@@ -974,15 +1163,18 @@
lockres = &OCFS_I(inode)->ip_meta_lockres;
level = ex ? LKM_EXMODE : LKM_PRMODE;
+ dlm_flags = 0;
+ if (flags & OCFS2_META_LOCK_NOQUEUE)
+ dlm_flags |= LKM_NOQUEUE;
- status = ocfs2_cluster_lock(osb, lockres, level);
+ status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags);
if (status < 0) {
- if (status != -EINTR)
+ if (status != -EINTR && status != -EAGAIN)
LOG_ERROR_STATUS(status);
goto bail;
}
- if (!INODE_JOURNAL(inode)) {
+ if (!(flags & OCFS2_META_LOCK_RECOVERY)) {
status = ocfs2_wait_on_recovery(osb);
if (status < 0)
goto bail;
@@ -1023,17 +1215,63 @@
void ocfs2_meta_unlock(struct inode *inode,
int ex)
{
- int level;
+ int level = ex ? LKM_EXMODE : LKM_PRMODE;
ocfs2_lock_res *lockres = &OCFS_I(inode)->ip_meta_lockres;
- level = ex ? LKM_EXMODE : LKM_PRMODE;
+ ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
+}
- spin_lock(&lockres->l_lock);
- ocfs2_dec_holders(lockres, level);
- ocfs2_vote_on_unlock(OCFS2_SB(inode->i_sb), lockres);
- spin_unlock(&lockres->l_lock);
+int ocfs2_super_lock(ocfs_super *osb,
+ int ex)
+{
+ int status;
+ int level = ex ? LKM_EXMODE : LKM_PRMODE;
+ ocfs2_lock_res *lockres = &osb->super_lockres;
+ struct buffer_head *bh;
+ ocfs2_slot_info *si = osb->slot_info;
+
+ LOG_ENTRY();
+
+ status = ocfs2_cluster_lock(osb, lockres, level, 0);
+ if (status < 0) {
+ if (status != -EINTR)
+ LOG_ERROR_STATUS(status);
+ goto bail;
+ }
+
+ /* The super block lock path is really in the best position to
+ * know when resources covered by the lock need to be
+ * refreshed, so we do it here. Of course, making sense of
+ * everything is up to the caller :) */
+ status = ocfs2_should_refresh_lock_res(lockres);
+ if (status < 0) {
+ if (status != -EINTR)
+ LOG_ERROR_STATUS(status);
+ goto bail;
+ }
+ if (status) {
+ bh = si->si_bh;
+ status = ocfs_read_block(osb, bh->b_blocknr, &bh, 0,
+ si->si_inode);
+ if (status < 0)
+ LOG_ERROR_STATUS(status);
+
+ ocfs2_complete_lock_res_refresh(lockres, status);
+ }
+bail:
+ LOG_EXIT_STATUS(status);
+ return status;
}
+void ocfs2_super_unlock(ocfs_super *osb,
+ int ex)
+{
+ int level = ex ? LKM_EXMODE : LKM_PRMODE;
+ ocfs2_lock_res *lockres = &osb->super_lockres;
+
+ ocfs2_cluster_unlock(osb, lockres, level);
+}
+
int ocfs2_dlm_init(ocfs_super *osb)
{
int status, pid;
@@ -1071,8 +1309,10 @@
}
osb->dlm = dlm;
- /* sets osb->dlm */
- status = 0;
+
+ status = ocfs2_super_lock_res_init(&osb->super_lockres, osb);
+ if (status < 0)
+ LOG_ERROR_STATUS(status);
bail:
LOG_EXIT_STATUS(status);
@@ -1096,21 +1336,10 @@
if (wait_on_vote_task)
wait_for_completion(&osb->vote_event_complete);
+ ocfs2_lock_res_free(&osb->super_lockres);
dlm_unregister_domain(osb->dlm);
}
-int ocfs2_find_slot(ocfs_super *osb)
-{
-#warning "finish this"
-
- /* TODO: We take a lock on the super block, read in our node
- * map and find ourselves a slot. Right now hard code things
- * such that slot_num == global_node_num. */
-
- osb->slot_num = osb->global_node_num;
- return 0;
-}
-
static void ocfs2_unlock_ast_func(void *opaque, dlm_status status)
{
ocfs2_lock_res *lockres = opaque;
@@ -1449,6 +1678,69 @@
new_level, 0);
}
+/* TODO: This is very generic, and looks much like
+ * ocfs2_process_blocked_data. Lets try to find a way to combine these
+ * two. */
+static void ocfs2_process_blocked_super(ocfs_super *osb)
+{
+ int status = 0;
+ int new_level;
+ ocfs2_lock_res *lockres = &osb->super_lockres;
+
+ spin_lock(&lockres->l_lock);
+ if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) {
+ spin_unlock(&lockres->l_lock);
+ return;
+ }
+
+ if (lockres->l_flags & OCFS2_LOCK_BUSY) {
+ if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
+ /* If we're already trying to cancel a lock conversion
+ * then just drop the spinlock and requeue ourselves
+ * to check again later. */
+ spin_unlock(&lockres->l_lock);
+ } else
+ status = __ocfs2_cancel_convert(osb,
+ lockres);
+ if (status < 0)
+ LOG_ERROR_STATUS(status);
+ goto requeue;
+ }
+
+ /* if we're blocking an exclusive and we have *any* holders,
+ * then requeue. */
+ if ((lockres->l_blocking == LKM_EXMODE)
+ && (lockres->l_ex_holders || lockres->l_ro_holders)) {
+ spin_unlock(&lockres->l_lock);
+ goto requeue;
+ }
+
+ /* If it's a PR we're blocking, then only
+ * requeue if we've got anyone holding an EX */
+ if (lockres->l_blocking == LKM_PRMODE &&
+ lockres->l_ex_holders) {
+ spin_unlock(&lockres->l_lock);
+ goto requeue;
+ }
+
+ /* if we get here, then we know that we have no incompatible
+ * holders, and since we're marked, anyone asking for an
+ * incompatible lock will block. We can safely downconvert
+ * now. */
+ new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
+ spin_unlock(&lockres->l_lock);
+
+ status = __ocfs2_downconvert_lock(osb, lockres, new_level, 0);
+ if (status < 0) {
+ LOG_ERROR_STATUS(status);
+ goto requeue;
+ }
+
+ return;
+requeue:
+ ocfs2_schedule_blocked_super(osb);
+}
+
static void ocfs2_process_blocked_inode(struct inode *inode)
{
int status;
@@ -1467,6 +1759,13 @@
ocfs2_schedule_blocked_inode(inode);
}
+static void ocfs2_schedule_blocked_super(ocfs_super *osb)
+{
+ spin_lock(&osb->vote_task_lock);
+ osb->blocked_super_count++;
+ spin_unlock(&osb->vote_task_lock);
+}
+
static void ocfs2_schedule_blocked_inode(struct inode *inode)
{
ocfs_super *osb = OCFS2_SB(inode->i_sb);
@@ -1485,6 +1784,23 @@
spin_unlock(&osb->vote_task_lock);
}
+static void ocfs2_process_mount_request(ocfs_super *osb,
+ unsigned int node_num)
+{
+ printk("MOUNT vote from node %u\n", node_num);
+ /* The other node only sends us this message when he has an EX
+ * on the superblock, so our recovery threads (if having been
+ * launched) are waiting on it.*/
+ ocfs_recovery_map_clear(osb, node_num);
+}
+
+static void ocfs2_process_umount_request(ocfs_super *osb,
+ unsigned int node_num)
+{
+ printk("UMOUNT vote from node %u\n", node_num);
+ ocfs_node_map_set_bit(osb, &osb->umount_map, node_num);
+}
+
static int ocfs2_process_delete_request(struct inode *inode)
{
int response = -EBUSY;
@@ -1537,8 +1853,8 @@
return response;
}
-static int ocfs2_process_dentry_request(struct inode *inode,
- int rename)
+static void ocfs2_process_dentry_request(struct inode *inode,
+ int rename)
{
d_prune_aliases (inode);
@@ -1549,9 +1865,6 @@
else
inode->i_nlink--;
}
-
- /* we always vote yes on this request type. */
- return 0;
}
static void ocfs2_process_vote(ocfs_super *osb,
@@ -1563,6 +1876,18 @@
OCFS_ASSERT(!memcmp(msg->m_hdr.h_uuid, osb->uuid, MAX_VOL_ID_LENGTH));
+ switch (msg->m_request) {
+ case OCFS2_VOTE_REQ_UMOUNT:
+ ocfs2_process_umount_request(osb, msg->m_req_node);
+ goto respond;
+ break;
+ case OCFS2_VOTE_REQ_MOUNT:
+ ocfs2_process_mount_request(osb, msg->m_req_node);
+ goto respond;
+ break;
+ }
+
+ /* If we get here, then the request is against an inode. */
inode = ocfs_ilookup(osb, msg->m_blkno);
if (!inode)
goto respond;
@@ -1576,7 +1901,7 @@
case OCFS2_VOTE_REQ_RENAME:
rename = 1;
case OCFS2_VOTE_REQ_UNLINK:
- vote_response = ocfs2_process_dentry_request(inode, rename);
+ ocfs2_process_dentry_request(inode, rename);
break;
default:
printk("ocfs2_process_vote: node %u, invalid request: %u\n",
@@ -1598,6 +1923,16 @@
ocfs2_vote_work *work;
spin_lock(&osb->vote_task_lock);
+ if (osb->blocked_super_count) {
+ OCFS_ASSERT(osb->blocked_super_count == 1);
+ osb->blocked_super_count = 0;
+ spin_unlock(&osb->vote_task_lock);
+
+ ocfs2_process_blocked_super(osb);
+
+ spin_lock(&osb->vote_task_lock);
+ }
+
processed = osb->blocked_inode_count;
while (processed) {
OCFS_ASSERT(!list_empty(&osb->blocked_inode_list));
@@ -1634,6 +1969,16 @@
spin_unlock(&osb->vote_task_lock);
}
+static inline int ocfs2_vote_thread_has_work(ocfs_super *osb)
+{
+ if (list_empty(&osb->blocked_inode_list) &&
+ list_empty(&osb->vote_list) &&
+ !osb->blocked_super_count)
+ return 0;
+
+ return 1;
+}
+
static int ocfs2_vote_thread(void *arg)
{
int status = 0;
@@ -1651,8 +1996,7 @@
while (1) {
if (osb->vote_exit) {
- if (list_empty(&osb->blocked_inode_list) &&
- list_empty(&osb->vote_list))
+ if (!ocfs2_vote_thread_has_work(osb))
break;
/* don't want to sleep if we're supposed to quit. */
atomic_set(&osb->wake_vote_task, 1);
@@ -1679,7 +2023,8 @@
}
static int ocfs2_do_request_vote(ocfs_super *osb,
- struct inode *inode,
+ u64 blkno,
+ unsigned int generation,
enum ocfs2_vote_request type)
{
int status;
@@ -1688,7 +2033,8 @@
OCFS_ASSERT(type == OCFS2_VOTE_REQ_DELETE ||
type == OCFS2_VOTE_REQ_UNLINK ||
- type == OCFS2_VOTE_REQ_RENAME);
+ type == OCFS2_VOTE_REQ_RENAME ||
+ type == OCFS2_VOTE_REQ_UMOUNT);
request = kmalloc(sizeof(*request), GFP_KERNEL);
if (!request) {
@@ -1709,8 +2055,8 @@
request->m_hdr.h_type = OCFS2_MESSAGE_TYPE_VOTE;
request->m_req_node = osb->node_num;
request->m_request = type;
- request->m_blkno = OCFS_I(inode)->ip_blkno;
- request->m_generation = inode->i_generation;
+ request->m_blkno = blkno;
+ request->m_generation = generation;
/* register for the response here */
/* send the broadcast request here */
@@ -1750,7 +2096,17 @@
if (ocfs_node_map_is_only(osb, &osb->node_map, osb->node_num))
return 0;
- status = ocfs2_do_request_vote(osb, inode, type);
+ status = ocfs2_super_lock(osb, 0);
+ if (status < 0) {
+ if (status != -EINTR)
+ LOG_ERROR_STATUS(status);
+ break;
+ }
+
+ status = ocfs2_do_request_vote(osb, OCFS_I(inode)->ip_blkno,
+ inode->i_generation, type);
+
+ ocfs2_super_unlock(osb, 0);
}
return status;
}
@@ -1769,3 +2125,39 @@
{
return ocfs2_request_vote(inode, OCFS2_VOTE_REQ_RENAME);
}
+
+int ocfs2_request_mount_vote(ocfs_super *osb)
+{
+ int status;
+
+ status = -EAGAIN;
+ while (status == -EAGAIN) {
+ if (signal_pending(current))
+ return -EINTR;
+
+ if (ocfs_node_map_is_only(osb, &osb->node_map, osb->node_num))
+ return 0;
+
+ status = ocfs2_do_request_vote(osb, 0ULL, 0,
+ OCFS2_VOTE_REQ_MOUNT);
+ }
+ return status;
+}
+
+int ocfs2_request_umount_vote(ocfs_super *osb)
+{
+ int status;
+
+ status = -EAGAIN;
+ while (status == -EAGAIN) {
+ if (signal_pending(current))
+ return -EINTR;
+
+ if (ocfs_node_map_is_only(osb, &osb->node_map, osb->node_num))
+ return 0;
+
+ status = ocfs2_do_request_vote(osb, 0ULL, 0,
+ OCFS2_VOTE_REQ_UMOUNT);
+ }
+ return status;
+}
Modified: branches/dlm-glue/src/dlmglue.h
===================================================================
--- branches/dlm-glue/src/dlmglue.h 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/dlmglue.h 2004-11-19 23:44:28 UTC (rev 1655)
@@ -1,7 +1,7 @@
/* -*- mode: c; c-basic-offset: 8; -*-
* vim: noexpandtab sw=8 ts=8 sts=0:
*
- * middle.h
+ * dlmglue.h
*
* description here
*
@@ -79,10 +79,11 @@
int ocfs2_dlm_init(ocfs_super *osb);
void ocfs2_dlm_shutdown(ocfs_super *osb);
-int ocfs2_find_slot(ocfs_super *osb);
-int ocfs2_lock_res_init(ocfs2_lock_res *res,
- enum ocfs2_lock_type type,
- struct inode *inode);
+int ocfs2_inode_lock_res_init(ocfs2_lock_res *res,
+ enum ocfs2_lock_type type,
+ struct inode *inode);
+int ocfs2_super_lock_res_init(ocfs2_lock_res *res,
+ ocfs_super *osb);
void ocfs2_lock_res_free(ocfs2_lock_res *res);
int ocfs2_create_new_inode_locks(struct inode *inode);
int ocfs2_drop_inode_locks(struct inode *inode);
@@ -90,14 +91,24 @@
int write);
void ocfs2_data_unlock(struct inode *inode,
int write);
-int ocfs2_meta_lock(struct inode *inode,
- ocfs_journal_handle *handle,
- struct buffer_head **ret_bh,
- int ex);
+/* don't wait on recovery. */
+#define OCFS2_META_LOCK_RECOVERY (0x01)
+/* Instruct the dlm not to queue ourselves on the other node. */
+#define OCFS2_META_LOCK_NOQUEUE (0x02)
+/* 99% of the time we don't want to supply any additional flags --
+ * those are for very specific cases only. */
+#define ocfs2_meta_lock(i, h, b, e) ocfs2_meta_lock_flags(i, h, b, e, 0)
+int ocfs2_meta_lock_flags(struct inode *inode,
+ ocfs_journal_handle *handle,
+ struct buffer_head **ret_bh,
+ int ex,
+ int flags);
void ocfs2_meta_unlock(struct inode *inode,
int ex);
-int ocfs2_super_lock(ocfs_super *osb, int ex);
-void ocfs2_super_unlock(ocfs_super *osb);
+int ocfs2_super_lock(ocfs_super *osb,
+ int ex);
+void ocfs2_super_unlock(ocfs_super *osb,
+ int ex);
static inline void ocfs2_kick_vote_thread(ocfs_super *osb)
{
@@ -107,6 +118,8 @@
int ocfs2_request_delete_vote(struct inode *inode);
int ocfs2_request_unlink_vote(struct inode *inode);
int ocfs2_request_rename_vote(struct inode *inode);
+int ocfs2_request_mount_vote(ocfs_super *osb);
+int ocfs2_request_umount_vote(ocfs_super *osb);
static inline void ocfs2_lvb_set_trunc_clusters(struct inode *inode,
unsigned int trunc_clusters)
@@ -158,7 +171,9 @@
OCFS2_VOTE_REQ_INVALID = 0,
OCFS2_VOTE_REQ_DELETE,
OCFS2_VOTE_REQ_UNLINK,
- OCFS2_VOTE_REQ_RENAME
+ OCFS2_VOTE_REQ_RENAME,
+ OCFS2_VOTE_REQ_MOUNT,
+ OCFS2_VOTE_REQ_UMOUNT
};
#endif
Modified: branches/dlm-glue/src/heartbeat.c
===================================================================
--- branches/dlm-glue/src/heartbeat.c 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/heartbeat.c 2004-11-19 23:44:28 UTC (rev 1655)
@@ -50,11 +50,16 @@
#define OCFS_DEBUG_CONTEXT OCFS_DEBUG_CONTEXT_HEARTBEAT
#define OCFS2_HB_NODE_DOWN_PRI (0x0000001)
+#define OCFS2_HB_NODE_UP_PRI OCFS2_HB_NODE_DOWN_PRI
-static void ocfs2_hb_node_down_cb(void *ptr1,
- void *ptr2,
+static void ocfs2_hb_node_down_cb(struct inode *group,
+ struct inode *node,
int node_num,
void *data);
+static void ocfs2_hb_node_up_cb(struct inode *group,
+ struct inode *node,
+ int node_num,
+ void *data);
static void ocfs_node_map_init(ocfs_super *osb, ocfs_node_map *map);
static void __ocfs_node_map_dup(ocfs_super *osb,
@@ -75,40 +80,79 @@
spin_lock_init(&osb->node_map_lock);
ocfs_node_map_init(osb, &osb->node_map);
ocfs_node_map_init(osb, &osb->recovery_map);
+ ocfs_node_map_init(osb, &osb->umount_map);
}
-static void ocfs2_hb_node_down_cb(void *ptr1,
- void *ptr2,
+static void ocfs2_hb_node_down_cb(struct inode *group,
+ struct inode *node,
int node_num,
void *data)
{
ocfs_super *osb = data;
- if (atomic_read(&osb->vol_state) != VOLUME_MOUNTED) {
- printk("ocfs2: Ignoring node down callback for node %d\n",
- node_num);
+ ocfs_node_map_clear_bit(osb, &osb->node_map, node_num);
+ if (osb->group_inode != group)
return;
- }
- OCFS_ASSERT(osb->global_node_num != node_num);
+ OCFS_ASSERT(osb->node_num != node_num);
printk("ocfs2: node down event for %d\n", node_num);
- /* uhm, do some recovery stuff.. */
+
+ if (ocfs_node_map_test_bit(osb, &osb->umount_map, node_num)) {
+ /* If a node is in the umount map, then we've been
+ * expecting him to go down and we know ahead of time
+ * that recovery is not necessary. */
+ ocfs_node_map_clear_bit(osb, &osb->umount_map, node_num);
+ return;
+ }
+
+ ocfs_recovery_thread(osb, node_num);
}
+static void ocfs2_hb_node_up_cb(struct inode *group,
+ struct inode *node,
+ int node_num,
+ void *data)
+{
+ ocfs_super *osb = data;
+
+ ocfs_node_map_set_bit(osb, &osb->node_map, node_num);
+ if (osb->group_inode != group)
+ return;
+
+ OCFS_ASSERT(osb->node_num != node_num);
+
+ printk("ocfs2: node up event for %d\n", node_num);
+ ocfs_node_map_clear_bit(osb, &osb->umount_map, node_num);
+}
+
/* Most functions here are just stubs for now... */
int ocfs2_register_hb_callbacks(ocfs_super *osb)
{
int status;
- ocfs_node_map_set_bit(osb, &osb->node_map, osb->node_num);
status = hb_register_callback(HB_NODE_DOWN_CB,
ocfs2_hb_node_down_cb,
osb,
OCFS2_HB_NODE_DOWN_PRI);
+ if (status < 0) {
+ LOG_ERROR_STATUS(status);
+ goto bail;
+ }
+
+ status = hb_register_callback(HB_NODE_UP_CB,
+ ocfs2_hb_node_up_cb,
+ osb,
+ OCFS2_HB_NODE_UP_PRI);
if (status < 0)
LOG_ERROR_STATUS(status);
+ status = hb_fill_node_map(osb->group_inode, &osb->node_map.map,
+ sizeof(osb->node_map.map));
+ if (status < 0)
+ LOG_ERROR_STATUS(status);
+
+bail:
return status;
}
@@ -121,7 +165,11 @@
if (status < 0)
LOG_ERROR_STATUS(status);
- ocfs_node_map_clear_bit(osb, &osb->node_map, osb->node_num);
+ status = hb_unregister_callback(HB_NODE_UP_CB,
+ ocfs2_hb_node_up_cb, osb);
+ if (status < 0)
+ LOG_ERROR_STATUS(status);
+
}
/* special case -1 for now
@@ -259,10 +307,47 @@
ocfs_node_map_clear_bit(osb, &osb->recovery_map, num);
}
+int ocfs_node_map_first_set_bit(ocfs_super *osb,
+ ocfs_node_map *map)
+{
+ int i, ret = -1;
+ spin_lock(&osb->node_map_lock);
+ for(i = 0; i < map->num_nodes; i++)
+ if (test_bit(i, map->map)) {
+ ret = i;
+ break;
+ }
+ spin_unlock(&osb->node_map_lock);
+ return ret;
+}
+
#if 0
/* unused (for now) node map functions. */
+/* uses the heartbeat api to test whether a given global node num is
+ * heartbeating. Warning: this function can sleep in
+ * hb_fill_node_map() */
+int ocfs2_is_node_alive(ocfs_super *osb,
+ unsigned int node_num)
+{
+ int ret;
+ ocfs_node_map tmpmap;
+
+ ocfs_node_map_init(osb, &tmpmap);
+
+ ret = hb_fill_node_map(osb->group_inode, &tmpmap, sizeof(tmpmap.map));
+ if (ret < 0) {
+ LOG_ERROR_STATUS(ret);
+ goto bail;
+ }
+
+ ret = ocfs_node_map_test_bit(osb, &tmpmap, node_num);
+
+bail:
+ return ret;
+}
+
static int ocfs_node_map_stringify(ocfs_node_map *map, char **str)
{
int i, n;
@@ -347,12 +432,3 @@
}
#endif
-#if 0
- if (node_map[i].miss_cnt >= MISS_COUNT_NODE_DEAD) {
- ocfs_recovery_map_set(osb, i);
- ocfs_publish_map_clear(&osb->publ_map, i);
-
- /* Ok, we'd better recover him now...*/
- ocfs_recovery_thread(osb, i);
- }
-#endif
Modified: branches/dlm-glue/src/heartbeat.h
===================================================================
--- branches/dlm-glue/src/heartbeat.h 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/heartbeat.h 2004-11-19 23:44:28 UTC (rev 1655)
@@ -44,6 +44,8 @@
int ocfs_node_map_test_bit(ocfs_super *osb,
ocfs_node_map *map,
int bit);
+int ocfs_node_map_first_set_bit(ocfs_super *osb,
+ ocfs_node_map *map);
void ocfs_recovery_map_set(ocfs_super *osb,
int num);
void ocfs_recovery_map_clear(ocfs_super *osb,
Modified: branches/dlm-glue/src/inode.c
===================================================================
--- branches/dlm-glue/src/inode.c 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/inode.c 2004-11-19 23:44:28 UTC (rev 1655)
@@ -426,14 +426,14 @@
break;
}
- status = ocfs2_lock_res_init(&OCFS_I(inode)->ip_meta_lockres,
- OCFS_TYPE_META, inode);
+ status = ocfs2_inode_lock_res_init(&OCFS_I(inode)->ip_meta_lockres,
+ OCFS_TYPE_META, inode);
if (status < 0) {
LOG_ERROR_STATUS(status);
goto bail;
}
- status = ocfs2_lock_res_init(&OCFS_I(inode)->ip_data_lockres,
- OCFS_TYPE_DATA, inode);
+ status = ocfs2_inode_lock_res_init(&OCFS_I(inode)->ip_data_lockres,
+ OCFS_TYPE_DATA, inode);
if (status < 0)
LOG_ERROR_STATUS(status);
bail:
Modified: branches/dlm-glue/src/journal.c
===================================================================
--- branches/dlm-glue/src/journal.c 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/journal.c 2004-11-19 23:44:28 UTC (rev 1655)
@@ -42,6 +42,7 @@
#include "journal.h"
#include "localalloc.h"
#include "namei.h"
+#include "slot_map.h"
#include "super.h"
#include "util.h"
#include "sysfile.h"
@@ -64,6 +65,8 @@
static void ocfs_commit_unstarted_handle(ocfs_journal_handle *handle);
static int ocfs_journal_toggle_dirty(ocfs_super *osb,
int dirty);
+static int ocfs2_trylock_journal(ocfs_super *osb,
+ int slot_num);
/*
* ocfs_commit_cache()
@@ -619,7 +622,8 @@
BUG();
/* already have the inode for our journal */
- inode = ocfs_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE, osb->node_num);
+ inode = ocfs_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE,
+ osb->slot_num);
if (inode == NULL) {
LOG_ERROR_STR("access error");
status = -EACCES;
@@ -680,8 +684,6 @@
/* yay, pass the proper info back to our journal structure. */
osb->journal->osb = osb;
- /* eventually this will be a value passed into us */
- osb->journal->node_num = osb->node_num;
osb->journal->k_journal = k_journal;
osb->journal->k_inode = inode;
osb->journal->version = OCFS_JOURNAL_CURRENT_VERSION;
@@ -962,75 +964,93 @@
return(status);
}
-struct ocfs_recover_arg {
- ocfs_super *osb;
- int node_num;
-};
-
static int __ocfs_recovery_thread(void *arg)
{
- struct ocfs_recover_arg *recover_arg = arg;
- ocfs_super *osb = recover_arg->osb;
- int node_num = recover_arg->node_num;
+ ocfs_super *osb = arg;
int status = 0;
+ int node_num;
char proc[16];
- LOG_ENTRY_ARGS("(node_num=%d, osb->node_num = %d)\n", node_num,
- osb->node_num);
+ LOG_ENTRY();
sprintf (proc, "ocfs2rec-%d", osb->osb_id);
ocfs_daemonize (proc, strlen(proc), 0);
-#ifdef HAVE_NPTL
- spin_lock_irq (¤t->sighand->siglock);
- sigfillset(¤t->blocked);
- recalc_sigpending();
- spin_unlock_irq (¤t->sighand->siglock);
-#else
- spin_lock_irq(¤t->sigmask_lock);
- sigfillset(¤t->blocked);
- recalc_sigpending(current);
- spin_unlock_irq(¤t->sigmask_lock);
-#endif
+ status = ocfs_wait_on_mount(osb);
+ if (status < 0) {
+ if (status == -EBUSY)
+ status = 0;
+ goto bail;
+ }
- status = ocfs_recover_node(osb, node_num);
- if (status < 0)
+restart:
+ status = ocfs2_super_lock(osb, 1);
+ if (status < 0) {
LOG_ERROR_STATUS(status);
+ goto bail;
+ }
+ while(!ocfs_node_map_is_empty(osb, &osb->recovery_map)) {
+ node_num = ocfs_node_map_first_set_bit(osb,
+ &osb->recovery_map);
+ if (node_num < 0) {
+ LOG_TRACE_ARGS("Out of nodes to recover.\n");
+ break;
+ }
+
+ ocfs_recovery_map_clear(osb, node_num);
+ /* TODO: Figure out how we're going to save all the
+ * local alloc stuff for after recovery on all nodes
+ * is complete? */
+ status = ocfs_recover_node(osb, node_num);
+ if (status < 0) {
+ printk("ocfs2: Error %d recovering node %d on device "
+ "(%u,%u)!\n", status, node_num,
+ MAJOR(osb->sb->s_dev),MINOR(osb->sb->s_dev));
+ printk("ocfs2: Volume requires unmount.\n");
+ continue;
+ }
+ atomic_dec(&osb->num_recovery_threads);
+ }
+ ocfs2_super_unlock(osb, 1);
+
+bail:
+ down(&osb->recovery_lock);
+ if (!ocfs_node_map_is_empty(osb, &osb->recovery_map)) {
+ up(&osb->recovery_lock);
+ goto restart;
+ }
+ osb->recovery_launched = 0;
+ up(&osb->recovery_lock);
+
+ wake_up_all(&osb->recovery_event);
+
LOG_EXIT_STATUS(status);
-
- kfree(arg);
return status;
}
-void ocfs_recovery_thread(ocfs_super *osb, int node_num)
+void ocfs_recovery_thread(ocfs_super *osb, int node_num)
{
- struct ocfs_recover_arg *arg;
+ LOG_ENTRY_ARGS("(node_num=%d, osb->node_num = %d)\n",
+ node_num, osb->node_num);
- LOG_ENTRY_ARGS("(node_num=%d, osb->node_num = %d)\n", node_num,
- osb->node_num);
-
+ down(&osb->recovery_lock);
/* atomic_inc this here and let recover_vol dec it when
* done. We do it this way to avoid races with umount. People
* waiting on recovery will wait on this value to drop back
* down to zero. */
atomic_inc(&osb->num_recovery_threads);
+ ocfs_recovery_map_set(osb, node_num);
- arg = kmalloc(sizeof(struct ocfs_recover_arg), GFP_KERNEL);
- if (arg == NULL) {
- LOG_ERROR_STATUS(-ENOMEM);
- goto done;
- }
-
- arg->osb = osb;
- arg->node_num = node_num;
-
LOG_TRACE_STR("starting recovery thread...");
- kernel_thread(__ocfs_recovery_thread, arg,
- CLONE_VM | CLONE_FS | CLONE_FILES);
+ if (!osb->recovery_launched) {
+ kernel_thread(__ocfs_recovery_thread, osb,
+ CLONE_VM | CLONE_FS | CLONE_FILES);
+ osb->recovery_launched = 1;
+ }
-done:
+ up(&osb->recovery_lock);
wake_up_all(&osb->recovery_event);
LOG_EXIT();
@@ -1039,43 +1059,44 @@
static int ocfs_recover_node(ocfs_super *osb, int node_num)
{
- int status = -1;
- int tmpstat;
+ int status = 0;
+// int tmpstat;
+ int slot_num;
ocfs2_dinode *fe;
ocfs2_dinode *local_alloc = NULL;
struct inode *inode = NULL;
journal_t *k_journal = NULL;
struct buffer_head *bh = NULL;
ocfs_journal * journal = NULL;
- int recovery_lock = 0, got_lock = 0, clean_orphans = 0;
+ int got_lock = 0, clean_orphans = 0;
+ ocfs2_slot_info *si = osb->slot_info;
- LOG_ENTRY_ARGS("(node_num=%d, osb->node_num = %d)\n", node_num,
- osb->node_num);
+ LOG_ENTRY_ARGS("(node_num=%d, osb->node_num = %d)\n",
+ node_num, osb->node_num);
- if (!osb || (node_num >= osb->max_nodes)) {
- LOG_ERROR_STATUS (status = -EINVAL);
+ printk("ocfs2_recover_node: checking node %d\n", node_num);
+
+ /* Should not ever be called to recover ourselves -- in that
+ * case we should've called ocfs_journal_load instead. */
+ if (osb->node_num == node_num)
+ BUG();
+
+ ocfs2_update_slot_info(si);
+ slot_num = ocfs2_node_num_to_slot(si, node_num);
+ if (slot_num == OCFS_INVALID_NODE_NUM) {
+ printk("ocfs2_recover_node: no slot for this node, so no "
+ "recovery required.\n");
goto done;
}
- status = ocfs_wait_on_mount(osb);
- if (status < 0) {
- if (status == -EBUSY)
- status = 0;
- goto done;
- }
+ printk("ocfs2_recover_node: node %d was using slot %d\n", node_num,
+ slot_num);
+
journal = osb->journal;
- /* Grab the local recovery resource to ensure no other thread
- * comes in from this node for recovery */
- down(&(osb->recovery_lock));
- recovery_lock = 1;
- if (osb->disable_recovery) {
- LOG_TRACE_STR("Shutting down so skipping reovery.");
- goto done;
- }
-
/* Ok, look up the inode for our journal */
- inode = ocfs_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE, node_num);
+ inode = ocfs_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE,
+ slot_num);
if (inode == NULL) {
LOG_ERROR_STR("access error");
status = -EACCES;
@@ -1091,12 +1112,8 @@
SET_INODE_JOURNAL(inode);
- /* Should not ever be called to recover ourselves -- in that
- * case we should've called ocfs_journal_load instead. */
- if (osb->node_num == node_num)
- BUG();
-
- status = ocfs2_meta_lock(inode, NULL, &bh, 1);
+ status = ocfs2_meta_lock_flags(inode, NULL, &bh, 1,
+ OCFS2_META_LOCK_RECOVERY);
if (status < 0) {
LOG_TRACE_ARGS("status returned from ocfs2_meta_lock=%d\n",
status);
@@ -1105,17 +1122,19 @@
goto done;
}
got_lock = 1;
-
+
fe = (ocfs2_dinode *) bh->b_data;
if (!(fe->id1.journal1.i_flags & OCFS2_JOURNAL_DIRTY_FL)) {
LOG_TRACE_ARGS("No recovery required for node %d\n", node_num);
- status = 0;
+ printk("ocfs2_recover_node: No recovery required for node "
+ "%d\n", node_num);
goto clear_node;
}
- printk("ocfs2: Recovering node %d from device (%u,%u)\n", node_num,
- MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
+ printk("ocfs2: Recovering node %d from slot %d on device (%u,%u)\n",
+ node_num, slot_num, MAJOR(osb->sb->s_dev),
+ MINOR(osb->sb->s_dev));
clean_orphans = 1;
OCFS_I(inode)->ip_clusters = fe->i_clusters;
@@ -1164,8 +1183,9 @@
/* shutdown the journal */
journal_destroy(k_journal);
+#warning "we can't complete local alloc recovery in this function!"
/* recover his local alloc file, AFTER recovering his journal... */
- status = ocfs_begin_local_alloc_recovery(osb, node_num, &local_alloc);
+ status = ocfs_begin_local_alloc_recovery(osb, slot_num, &local_alloc);
if (status < 0) {
LOG_ERROR_STATUS(status);
goto done;
@@ -1174,11 +1194,11 @@
status = 0;
clear_node:
- ocfs_recovery_map_clear(osb, node_num);
+ ocfs2_clear_slot(si, slot_num);
+ status = ocfs2_update_disk_slots(osb, si);
+ if (status < 0)
+ LOG_ERROR_STATUS(status);
done:
- if (recovery_lock)
- up(&(osb->recovery_lock));
-
/* drop the lock on this nodes journal */
if (got_lock)
ocfs2_meta_unlock(inode, 1);
@@ -1188,28 +1208,114 @@
if (bh)
brelse(bh);
-
+#if 0
if (local_alloc && !status) {
tmpstat = ocfs_complete_local_alloc_recovery(osb, local_alloc);
if (tmpstat < 0)
LOG_ERROR_STATUS(tmpstat);
}
-
+#endif
if (local_alloc)
kfree(local_alloc);
-
+#if 0
if (clean_orphans && !status) {
tmpstat = ocfs_recover_orphans(osb);
if (tmpstat < 0)
LOG_ERROR_STATUS(tmpstat);
}
+#endif
- atomic_dec(&osb->num_recovery_threads);
-
LOG_EXIT_STATUS(status);
return(status);
}
+/* Test node liveness by trylocking his journal. If we get the lock,
+ * we drop it here. Return 0 if we got the lock, -EAGAIN if node is
+ * still alive (we couldn't get the lock) and < 0 on error. */
+static int ocfs2_trylock_journal(ocfs_super *osb,
+ int slot_num)
+{
+ int status, flags;
+ struct inode *inode = NULL;
+
+ inode = ocfs_get_system_file_inode(osb, JOURNAL_SYSTEM_INODE,
+ slot_num);
+ if (inode == NULL) {
+ LOG_ERROR_STR("access error");
+ status = -EACCES;
+ goto bail;
+ }
+ if (is_bad_inode (inode)) {
+ LOG_ERROR_STR("access error (bad inode)");
+ iput (inode);
+ inode = NULL;
+ status = -EACCES;
+ goto bail;
+ }
+ SET_INODE_JOURNAL(inode);
+
+ flags = OCFS2_META_LOCK_RECOVERY|OCFS2_META_LOCK_NOQUEUE;
+ status = ocfs2_meta_lock_flags(inode, NULL, NULL, 1, flags);
+ if (status < 0) {
+ if (status != -EAGAIN || status != -EINTR)
+ LOG_ERROR_STATUS(status);
+ goto bail;
+ }
+
+ ocfs2_meta_unlock(inode, 1);
+bail:
+ if (inode)
+ iput(inode);
+
+ return status;
+}
+
+/* Call this underneath ocfs2_super_lock. It also assumes that the
+ * slot info struct has been updated from disk. */
+int ocfs2_mark_dead_nodes(ocfs_super *osb)
+{
+ int status, i, node_num;
+ ocfs2_slot_info *si = osb->slot_info;
+
+ /* This is called with the super block cluster lock, so we
+ * know that the slot map can't change underneath us. */
+
+ spin_lock(&si->si_lock);
+ for(i = 0; i < si->si_num_slots; i++) {
+ node_num = si->si_global_node_nums[i];
+ if (i == osb->slot_num)
+ continue;
+ if (node_num == OCFS_INVALID_NODE_NUM)
+ continue;
+ if (ocfs_node_map_test_bit(osb, &osb->recovery_map, node_num))
+ continue;
+ spin_unlock(&si->si_lock);
+
+ /* Ok, we have a slot occupied by another node which
+ * is not in the recovery map. We trylock his journal
+ * file here to test if he's alive. */
+ status = ocfs2_trylock_journal(osb, i);
+ if (!status) {
+ /* Since we're called from mount, we know that
+ * the recovery thread can't race us on
+ * setting / checking the recovery bits. */
+ ocfs_recovery_thread(osb, node_num);
+ } else if ((status < 0) && (status != -EAGAIN)) {
+ if (status != -EINTR)
+ LOG_ERROR_STATUS(status);
+ goto bail;
+ }
+
+ spin_lock(&si->si_lock);
+ }
+ spin_unlock(&si->si_lock);
+
+ status = 0;
+bail:
+ LOG_EXIT_STATUS(status);
+ return status;
+}
+
int ocfs_recover_orphans(ocfs_super *osb)
{
int status = 0;
@@ -1222,8 +1328,6 @@
struct ocfs2_dir_entry *de;
struct super_block *sb = osb->sb;
- down(&osb->orphan_recovery_lock);
-
orphan_dir_inode = ocfs_get_system_file_inode(osb,
ORPHAN_DIR_SYSTEM_INODE,
-1);
@@ -1321,8 +1425,6 @@
}
bail:
- up(&osb->orphan_recovery_lock);
-
if (have_disk_lock)
ocfs2_meta_unlock(orphan_dir_inode, 0);
Modified: branches/dlm-glue/src/localalloc.c
===================================================================
--- branches/dlm-glue/src/localalloc.c 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/localalloc.c 2004-11-19 23:44:28 UTC (rev 1655)
@@ -147,7 +147,7 @@
/* read the alloc off disk */
inode = ocfs_get_system_file_inode(osb, LOCAL_ALLOC_SYSTEM_INODE,
- osb->node_num);
+ osb->slot_num);
if (!inode) {
LOG_ERROR_STATUS(status=-EINVAL);
goto bail;
@@ -236,7 +236,7 @@
local_alloc_inode =
ocfs_get_system_file_inode(osb,
LOCAL_ALLOC_SYSTEM_INODE,
- osb->node_num);
+ osb->slot_num);
if (!local_alloc_inode) {
status = -ENOENT;
LOG_ERROR_STATUS(status);
@@ -348,7 +348,7 @@
* caller to process with ocfs_complete_local_alloc_recovery
*/
int ocfs_begin_local_alloc_recovery(ocfs_super *osb,
- int node_num,
+ int slot_num,
ocfs2_dinode **alloc_copy)
{
int status = 0;
@@ -356,13 +356,13 @@
struct inode *inode = NULL;
ocfs2_dinode *alloc;
- LOG_ENTRY_ARGS("(node_num = %d)\n", node_num);
+ LOG_ENTRY_ARGS("(slot_num = %d)\n", slot_num);
*alloc_copy = NULL;
inode = ocfs_get_system_file_inode(osb,
LOCAL_ALLOC_SYSTEM_INODE,
- node_num);
+ slot_num);
if (!inode) {
LOG_ERROR_STATUS(status=-EINVAL);
goto bail;
@@ -501,7 +501,7 @@
local_alloc_inode =
ocfs_get_system_file_inode(osb,
LOCAL_ALLOC_SYSTEM_INODE,
- osb->node_num);
+ osb->slot_num);
if (!local_alloc_inode) {
status = -ENOENT;
LOG_ERROR_STATUS(status);
Modified: branches/dlm-glue/src/namei.c
===================================================================
--- branches/dlm-glue/src/namei.c 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/namei.c 2004-11-19 23:44:28 UTC (rev 1655)
@@ -523,7 +523,7 @@
fe->i_generation = cpu_to_le32(inode->i_generation);
fe->i_blkno = fe_blkno;
fe->i_suballoc_bit = suballoc_bit;
- fe->i_suballoc_node = osb->node_num;
+ fe->i_suballoc_node = osb->slot_num;
fe->i_uid = current->fsuid;
if (dir->i_mode & S_ISGID) {
fe->i_gid = dir->i_gid;
Modified: branches/dlm-glue/src/ocfs.h
===================================================================
--- branches/dlm-glue/src/ocfs.h 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/ocfs.h 2004-11-19 23:44:28 UTC (rev 1655)
@@ -189,6 +189,7 @@
enum ocfs2_lock_type {
OCFS_TYPE_META = 0,
OCFS_TYPE_DATA,
+ OCFS_TYPE_SUPER,
OCFS_NUM_LOCK_TYPES
};
@@ -204,8 +205,7 @@
#define OCFS2_LOCK_REFRESHING (0x00000020)
typedef struct _ocfs2_lock_res {
- /* do i need this or can't i just use container_of? */
- struct inode *l_inode;
+ void *l_priv;
spinlock_t l_lock;
enum ocfs2_lock_type l_type;
@@ -343,6 +343,7 @@
} ocfs_alloc_stats;
struct _ocfs_journal;
+struct _ocfs2_slot_info;
/*
* ocfs_super
@@ -360,9 +361,12 @@
struct inode *sys_root_inode;
struct inode *system_inodes[NUM_SYSTEM_INODES];
+ struct _ocfs2_slot_info *slot_info;
+
spinlock_t node_map_lock;
ocfs_node_map node_map;
ocfs_node_map recovery_map;
+ ocfs_node_map umount_map;
/* new */
u32 num_clusters;
@@ -384,8 +388,7 @@
u16 max_nodes;
u16 num_nodes;
s16 node_num;
- s16 global_node_num;
- unsigned int slot_num;
+ s16 slot_num;
int reclaim_id; /* reclaim the original node number*/
int s_sectsize_bits;
int s_clustersize;
@@ -393,8 +396,8 @@
struct proc_dir_entry *proc_sub_dir; /* points to /proc/fs/ocfs2/<maj_min> */
atomic_t vol_state;
- struct semaphore orphan_recovery_lock;
struct semaphore recovery_lock;
+ int recovery_launched;
int disable_recovery;
atomic_t num_recovery_threads;
wait_queue_head_t flush_event;
@@ -402,15 +405,24 @@
struct _ocfs_journal *journal;
atomic_t clean_buffer_seq;
spinlock_t clean_buffer_lock;
+
int have_local_alloc;
struct buffer_head *local_alloc_bh;
+
+ /* Next two fields are for local node slot recovery during
+ * mount. */
+ int dirty;
+ ocfs2_dinode *local_alloc_copy;
+
ocfs_dlm_stats net_reqst_stats; /* stats of netdlm vote requests */
ocfs_dlm_stats net_reply_stats; /* stats of netdlm vote reponses */
ocfs_alloc_stats alloc_stats;
char dev_str[20]; /* "major,minor" of the device */
char *group_name;
+ struct inode *group_inode;
dlm_ctxt *dlm;
+ ocfs2_lock_res super_lockres;
wait_queue_head_t recovery_event;
@@ -419,6 +431,7 @@
wait_queue_head_t vote_event;
atomic_t wake_vote_task;
int vote_exit;
+ int blocked_super_count;
struct list_head blocked_inode_list;
int blocked_inode_count;
struct list_head vote_list;
@@ -434,7 +447,6 @@
kmem_cache_t *inode_cache;
kmem_cache_t *lock_cache;
__u32 flags;
- __s16 pref_node_num; /* preferred... osb has the real one */
char *node_name; /* human readable node identification */
char *cluster_name; /* unused */
int comm_info_read; /* ipc info loaded from config file */
Modified: branches/dlm-glue/src/ocfs2_fs.h
===================================================================
--- branches/dlm-glue/src/ocfs2_fs.h 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/ocfs2_fs.h 2004-11-19 23:44:28 UTC (rev 1655)
@@ -116,13 +116,13 @@
#define OCFS2_DEFAULT_JOURNAL_SIZE (8 * ONE_MEGA_BYTE)
#define OCFS2_MIN_JOURNAL_SIZE (4 * ONE_MEGA_BYTE)
-
/* System file index */
enum {
BAD_BLOCK_SYSTEM_INODE = 0,
GLOBAL_INODE_ALLOC_SYSTEM_INODE,
+ SLOT_MAP_SYSTEM_INODE,
+#define OCFS2_FIRST_ONLINE_SYSTEM_INODE SLOT_MAP_SYSTEM_INODE
DLM_SYSTEM_INODE,
-#define OCFS2_FIRST_ONLINE_SYSTEM_INODE DLM_SYSTEM_INODE
GLOBAL_BITMAP_SYSTEM_INODE,
ORPHAN_DIR_SYSTEM_INODE,
#define OCFS2_LAST_GLOBAL_SYSTEM_INODE ORPHAN_DIR_SYSTEM_INODE
@@ -140,6 +140,7 @@
[GLOBAL_INODE_ALLOC_SYSTEM_INODE] "global_inode_alloc",
/* These are used by the running filesystem */
+ [SLOT_MAP_SYSTEM_INODE] "slot_map",
[DLM_SYSTEM_INODE] "dlm",
[GLOBAL_BITMAP_SYSTEM_INODE] "global_bitmap",
[ORPHAN_DIR_SYSTEM_INODE] "orphan_dir",
@@ -174,9 +175,11 @@
*/
#define OCFS2_DIR_PAD 4
#define OCFS2_DIR_ROUND (OCFS2_DIR_PAD - 1)
-#define OCFS2_DIR_REC_LEN(name_len) (((name_len) + 12 + \
+#define OCFS2_DIR_MEMBER_LEN offsetof(struct ocfs2_dir_entry, name)
+#define OCFS2_DIR_REC_LEN(name_len) (((name_len) + OCFS2_DIR_MEMBER_LEN + \
OCFS2_DIR_ROUND) & \
~OCFS2_DIR_ROUND)
+
#define OCFS2_LINK_MAX 32000
#define S_SHIFT 12
Modified: branches/dlm-glue/src/ocfs_journal.h
===================================================================
--- branches/dlm-glue/src/ocfs_journal.h 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/ocfs_journal.h 2004-11-19 23:44:28 UTC (rev 1655)
@@ -59,7 +59,6 @@
* which we usually run
* from (recovery,
* etc) */
- __u32 node_num; /* Whose journal are we? */
struct buffer_head *lockbh; /* Journal disk lock, used
to access file entry */
atomic_t num_trans; /* Number of transactions
@@ -179,7 +178,7 @@
/*
* Journal Control:
- * Initialize, Load, Shutdown, Wipe, Create a journal.
+ * Initialize, Load, Shutdown, Wipe a journal.
*
* ocfs_journal_init - Initialize journal structures in the OSB.
* ocfs_journal_load - Load the given journal off disk. Replay it if
@@ -189,6 +188,8 @@
* ocfs_journal_wipe - Wipe transactions from a journal. Optionally
* zero out each block.
* ocfs_recovery_thread - Perform recovery on a node. osb is our own osb.
+ * ocfs2_mark_dead_nodes - Start recovery on nodes we won't get a heartbeat
+ * event on.
* ocfs_start_checkpoint - Kick the commit thread to do a checkpoint.
*/
int ocfs_journal_init(struct _ocfs_super *osb, int *dirty);
@@ -196,6 +197,7 @@
int ocfs_journal_wipe(ocfs_journal *journal, int full);
int ocfs_journal_load(ocfs_journal *journal);
void ocfs_recovery_thread(struct _ocfs_super *osb, int node_num);
+int ocfs2_mark_dead_nodes(ocfs_super *osb);
static inline void ocfs_start_checkpoint(struct _ocfs_super *osb)
{
atomic_set(&osb->flush_event_woken, 1);
Modified: branches/dlm-glue/src/ocfs_log.h
===================================================================
--- branches/dlm-glue/src/ocfs_log.h 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/ocfs_log.h 2004-11-19 23:44:28 UTC (rev 1655)
@@ -100,7 +100,7 @@
#define OCFS_DEBUG_CONTEXT_ALLOC 0x00000001 /* alloc.c */
#define OCFS_DEBUG_CONTEXT_DIR 0x00000002 /* dir.c */
#define OCFS_DEBUG_CONTEXT_EXTMAP 0x00000004 /* extmap.c */
-#define OCFS_DEBUG_CONTEXT_UNUSED1 0x00000008 /* */
+#define OCFS_DEBUG_CONTEXT_SLOTMAP 0x00000008 /* */
#define OCFS_DEBUG_CONTEXT_IOCTL 0x00000010 /* ioctl.c */
#define OCFS_DEBUG_CONTEXT_UNUSED2 0x00000020 /* */
#define OCFS_DEBUG_CONTEXT_PROC 0x00000040 /* proc.c */
Modified: branches/dlm-glue/src/proc.c
===================================================================
--- branches/dlm-glue/src/proc.c 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/proc.c 2004-11-19 23:44:28 UTC (rev 1655)
@@ -51,6 +51,7 @@
static int ocfs_proc_dlm_stats(char *page, char **start, off_t off, int count, int *eof, void *data);
static int ocfs_proc_version (char *page, char **start, off_t off, int count, int *eof, void *data);
static int ocfs_proc_nodenum (char *page, char **start, off_t off, int count, int *eof, void *data);
+static int ocfs_proc_slotnum (char *page, char **start, off_t off, int count, int *eof, void *data);
static int ocfs_proc_nodename (char *page, char **start, off_t off, int count, int *eof, void *data);
static int ocfs_proc_mountpoint (char *page, char **start, off_t off, int count, int *eof, void *data);
static int ocfs_proc_statistics (char *page, char **start, off_t off, int count, int *eof, void *data);
@@ -75,6 +76,7 @@
ocfs_proc_list sub_dir[] = {
{ "nodenum", NULL, ocfs_proc_nodenum },
{ "mountpoint", NULL, ocfs_proc_mountpoint },
+ { "slotnum", NULL, ocfs_proc_slotnum },
{ "statistics", NULL, ocfs_proc_statistics },
{ "lockstat", NULL, ocfs_proc_dlm_stats },
{ "device", NULL, ocfs_proc_device },
@@ -297,6 +299,29 @@
} /* ocfs_proc_nodenum */
/*
+ * ocfs_proc_slotnum()
+ *
+ */
+static int ocfs_proc_slotnum (char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ int len;
+ int ret;
+ ocfs_super *osb;
+
+ LOG_ENTRY ();
+
+ osb = data;
+ sprintf (page, "%d\n", osb->slot_num);
+ len = strlen (page);
+
+ ret = ocfs_proc_calc_metrics (page, start, off, count, eof, len);
+
+ LOG_EXIT_INT (ret);
+ return ret;
+} /* ocfs_proc_slotnum */
+
+/*
* ocfs_proc_nodename()
*
*/
@@ -397,46 +422,25 @@
int count, int *eof, void *data)
{
int len;
- char *pubmap = NULL;
ocfs_super *osb;
- int ret = 0, i;
- char *ptr;
+ int ret = 0;
LOG_ENTRY ();
osb = data;
- pubmap = ocfs_malloc (100);
- if (!pubmap) {
- LOG_ERROR_STATUS (-ENOMEM);
- goto bail;
- }
-
- ptr = pubmap;
- for (i = 0; i < osb->max_nodes; i++) {
- if (ocfs_node_map_test_bit(osb, &osb->node_map, i))
- ptr += sprintf (ptr, "%d ", i);
- }
- if (pubmap != ptr)
- *(ptr - 1) = '\0';
-
#define PROC_STATS \
- "Publish map : %s\n" \
"Number of nodes : %u\n" \
"Cluster size : %d\n" \
"Volume size : %llu\n" \
"Open Transactions: : %u\n"
- len = sprintf (page, PROC_STATS, pubmap,
- osb->num_nodes, osb->s_clustersize,
+ len = sprintf (page, PROC_STATS, osb->num_nodes, osb->s_clustersize,
ocfs2_clusters_to_bytes(osb->sb, osb->num_clusters),
atomic_read(&osb->journal->num_trans));
ret = ocfs_proc_calc_metrics (page, start, off, count, eof, len);
-bail:
- if (pubmap)
- kfree(pubmap);
LOG_EXIT_INT (ret);
return ret;
} /* ocfs_proc_statistics */
Added: branches/dlm-glue/src/slot_map.c
===================================================================
--- branches/dlm-glue/src/slot_map.c 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/slot_map.c 2004-11-19 23:44:28 UTC (rev 1655)
@@ -0,0 +1,280 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * slot_map.c
+ *
+ *
+ *
+ * Copyright (C) 2002, 2004 Oracle. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ */
+
+#include "ocfs_compat.h"
+
+#include <linux/types.h>
+#include <linux/slab.h>
+#include <linux/highmem.h>
+#include <linux/smp_lock.h>
+
+#include "ocfs_log.h"
+#include "ocfs.h"
+#include "ocfs2.h"
+
+#include "dlmglue.h"
+#include "extent_map.h"
+#include "slot_map.h"
+#include "sysfile.h"
+
+#include "buffer_head_io.h"
+
+#define OCFS_DEBUG_CONTEXT OCFS_DEBUG_CONTEXT_SLOTMAP
+
+static int ocfs2_update_disk_slots(ocfs_super *osb,
+ ocfs2_slot_info *si);
+static s16 __ocfs2_node_num_to_slot(ocfs2_slot_info *si,
+ s16 global);
+static void __ocfs2_fill_slot(ocfs2_slot_info *si,
+ s16 slot_num,
+ s16 node_num);
+
+/* post the slot information on disk into our slot_info struct. */
+void ocfs2_update_slot_info(ocfs2_slot_info *si)
+{
+ int i;
+ s16 *disk_info;
+
+ /* we don't read the slot block here as ocfs2_super_lock
+ * should've made sure we have the most recent copy. */
+ spin_lock(&si->si_lock);
+ disk_info = (s16 *) si->si_bh->b_data;
+
+ for (i = 0; i < si->si_size; i++)
+ si->si_global_node_nums[i] = le16_to_cpu(disk_info[i]);
+
+ spin_unlock(&si->si_lock);
+}
+
+/* post the our slot info stuff into it's destination bh and write it
+ * out. */
+int ocfs2_update_disk_slots(ocfs_super *osb,
+ ocfs2_slot_info *si)
+{
+ int status, i;
+ s16 *disk_info = (s16 *) si->si_bh->b_data;
+
+ spin_lock(&si->si_lock);
+ for (i = 0; i < si->si_size; i++)
+ disk_info[i] = cpu_to_le16(si->si_global_node_nums[i]);
+ spin_unlock(&si->si_lock);
+
+ status = ocfs_write_block(osb, si->si_bh, si->si_inode);
+ if (status < 0)
+ LOG_ERROR_STATUS(status);
+
+ return status;
+}
+
+/* try to find global node in the slot info. Returns
+ * OCFS_INVALID_NODE_NUM if nothing is found. */
+static s16 __ocfs2_node_num_to_slot(ocfs2_slot_info *si,
+ s16 global)
+{
+ int i;
+ s16 ret = OCFS_INVALID_NODE_NUM;
+
+ for(i = 0; i < si->si_num_slots; i++) {
+ if (global == si->si_global_node_nums[i]) {
+ ret = (s16) i;
+ break;
+ }
+ }
+ return ret;
+}
+
+s16 ocfs2_node_num_to_slot(ocfs2_slot_info *si,
+ s16 global)
+{
+ s16 ret;
+
+ spin_lock(&si->si_lock);
+ ret = __ocfs2_node_num_to_slot(si, global);
+ spin_unlock(&si->si_lock);
+}
+
+static void __ocfs2_fill_slot(ocfs2_slot_info *si,
+ s16 slot_num,
+ s16 node_num)
+{
+ OCFS_ASSERT(slot_num != OCFS_INVALID_NODE_NUM);
+ OCFS_ASSERT(slot_num < si->si_num_slots);
+ OCFS_ASSERT((node_num == OCFS_INVALID_NODE_NUM) ||
+ (node_num < OCFS2_MAX_NODES));
+
+ si->si_global_node_nums[slot_num] = node_num;
+}
+
+void ocfs2_clear_slot(ocfs2_slot_info *si,
+ s16 slot_num)
+{
+ spin_lock(&si->si_lock);
+ __ocfs2_fill_slot(si, slot_num, OCFS_INVALID_NODE_NUM);
+ spin_unlock(&si->si_lock);
+}
+
+int ocfs2_init_slot_info(ocfs_super *osb)
+{
+ int status, i;
+ u64 blkno;
+ struct inode *inode = NULL;
+ struct buffer_head *bh = NULL;
+ ocfs2_slot_info *si;
+
+ si = kmalloc(sizeof(ocfs2_slot_info), GFP_KERNEL);
+ if (!si) {
+ status = -ENOMEM;
+ LOG_ERROR_STATUS(status);
+ goto bail;
+ }
+ memset(si, 0, sizeof(ocfs2_slot_info));
+ spin_lock_init(&si->si_lock);
+ si->si_num_slots = osb->max_nodes;
+ si->si_size = OCFS2_MAX_NODES;
+
+ for(i = 0; i < si->si_num_slots; i++)
+ si->si_global_node_nums[i] = OCFS_INVALID_NODE_NUM;
+
+ inode = ocfs_get_system_file_inode(osb, SLOT_MAP_SYSTEM_INODE, -1);
+ if (!inode) {
+ LOG_ERROR_STATUS(status = -EINVAL);
+ goto bail;
+ }
+
+ status = ocfs2_extent_map_get_blocks(inode, 0ULL, 1, &blkno, NULL);
+ if (status < 0) {
+ LOG_ERROR_STATUS(status);
+ goto bail;
+ }
+
+ status = ocfs_read_block(osb, blkno, &bh, 0, inode);
+ if (status < 0) {
+ LOG_ERROR_STATUS(status);
+ goto bail;
+ }
+
+ si->si_inode = inode;
+ si->si_bh = bh;
+ osb->slot_info = si;
+bail:
+ if (status < 0 && si)
+ ocfs2_free_slot_info(si);
+
+ return status;
+}
+
+void ocfs2_free_slot_info(ocfs2_slot_info *si)
+{
+ if (si->si_inode)
+ iput(si->si_inode);
+ if (si->si_bh)
+ brelse(si->si_bh);
+ kfree(si);
+}
+
+int ocfs2_find_slot(ocfs_super *osb)
+{
+ int status;
+ s16 slot;
+ ocfs2_slot_info *si;
+
+ LOG_ENTRY();
+
+ si = osb->slot_info;
+
+ ocfs2_update_slot_info(si);
+
+ spin_lock(&si->si_lock);
+ /* search for ourselves first and take the slot if it already
+ * exists. Perhaps we need to mark this in a variable for our
+ * own journal recovery? Possibly not, though we certainly
+ * need to warn to the user */
+ slot = __ocfs2_node_num_to_slot(si, osb->node_num);
+ if (slot == OCFS_INVALID_NODE_NUM) {
+ /* if no slot yet, then just take 1st available
+ * one. */
+ slot = __ocfs2_node_num_to_slot(si, OCFS_INVALID_NODE_NUM);
+ if (slot == OCFS_INVALID_NODE_NUM) {
+ spin_unlock(&si->si_lock);
+ printk("ocfs2: no free slots available!\n");
+ status = -EINVAL;
+ goto bail;
+ }
+ } else
+ printk("ocfs2: slot %d is already allocated to this node!\n",
+ slot);
+
+ __ocfs2_fill_slot(si, slot, osb->node_num);
+ osb->slot_num = slot;
+ spin_unlock(&si->si_lock);
+
+ printk("ocfs2: taking node slot %d\n", osb->slot_num);
+
+ status = ocfs2_update_disk_slots(osb, si);
+ if (status < 0)
+ LOG_ERROR_STATUS(status);
+
+bail:
+ LOG_EXIT_STATUS(status);
+ return status;
+}
+
+void ocfs2_put_slot(ocfs_super *osb)
+{
+ int status;
+ ocfs2_slot_info *si = osb->slot_info;
+
+ if (!si)
+ return;
+
+ status = ocfs2_request_umount_vote(osb);
+ if (status < 0) {
+ LOG_ERROR_STATUS(status);
+ goto bail;
+ }
+
+ /* so what happens if someone does recovery while we're
+ * waiting for the ex? */
+
+ /* cluster lock */
+
+ ocfs2_update_slot_info(si);
+
+ spin_lock(&si->si_lock);
+ __ocfs2_fill_slot(si, osb->slot_num, OCFS_INVALID_NODE_NUM);
+ osb->slot_num = OCFS_INVALID_NODE_NUM;
+ spin_unlock(&si->si_lock);
+
+ ocfs2_update_disk_slots(osb, si);
+ if (status < 0) {
+ LOG_ERROR_STATUS(status);
+ goto bail;
+ }
+
+bail:
+ osb->slot_info = NULL;
+ kfree(si);
+}
+
Added: branches/dlm-glue/src/slot_map.h
===================================================================
--- branches/dlm-glue/src/slot_map.h 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/slot_map.h 2004-11-19 23:44:28 UTC (rev 1655)
@@ -0,0 +1,55 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * slotmap.h
+ *
+ * description here
+ *
+ * Copyright (C) 2002, 2004 Oracle. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ */
+
+
+#ifndef SLOTMAP_H
+#define SLOTMAP_H
+
+typedef struct _ocfs2_slot_info {
+ spinlock_t si_lock;
+
+ struct inode *si_inode;
+ struct buffer_head *si_bh;
+ unsigned int si_num_slots;
+ unsigned int si_size;
+ s16 si_global_node_nums[OCFS2_MAX_NODES];
+} ocfs2_slot_info;
+
+int ocfs2_init_slot_info(ocfs_super *osb);
+void ocfs2_free_slot_info(ocfs2_slot_info *si);
+
+int ocfs2_find_slot(ocfs_super *osb);
+void ocfs2_put_slot(ocfs_super *osb);
+
+void ocfs2_update_slot_info(ocfs2_slot_info *si);
+int ocfs2_update_disk_slots(ocfs_super *osb,
+ ocfs2_slot_info *si);
+
+s16 ocfs2_node_num_to_slot(ocfs2_slot_info *si,
+ s16 global);
+void ocfs2_clear_slot(ocfs2_slot_info *si,
+ s16 slot_num);
+
+#endif
Modified: branches/dlm-glue/src/suballoc.c
===================================================================
--- branches/dlm-glue/src/suballoc.c 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/suballoc.c 2004-11-19 23:44:28 UTC (rev 1655)
@@ -491,7 +491,7 @@
#ifndef OCFS_USE_ALL_METADATA_SUBALLOCATORS
alloc_inode = ocfs_get_system_file_inode(osb, EXTENT_ALLOC_SYSTEM_INODE, 0);
#else
- alloc_inode = ocfs_get_system_file_inode(osb, EXTENT_ALLOC_SYSTEM_INODE, osb->node_num);
+ alloc_inode = ocfs_get_system_file_inode(osb, EXTENT_ALLOC_SYSTEM_INODE, osb->slot_num);
#endif
if (!alloc_inode) {
status = -ENOMEM;
@@ -540,7 +540,7 @@
(*ac)->ac_handle = handle;
(*ac)->ac_which = OCFS_AC_USE_INODE;
- alloc_inode = ocfs_get_system_file_inode(osb, INODE_ALLOC_SYSTEM_INODE, osb->node_num);
+ alloc_inode = ocfs_get_system_file_inode(osb, INODE_ALLOC_SYSTEM_INODE, osb->slot_num);
if (!alloc_inode) {
status = -ENOMEM;
LOG_ERROR_STATUS(status);
Modified: branches/dlm-glue/src/super.c
===================================================================
--- branches/dlm-glue/src/super.c 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/super.c 2004-11-19 23:44:28 UTC (rev 1655)
@@ -61,6 +61,7 @@
#include "journal.h"
#include "localalloc.h"
#include "proc.h"
+#include "slot_map.h"
#include "super.h"
#include "sysfile.h"
#include "util.h"
@@ -156,6 +157,7 @@
static int ocfs_init_local_system_inodes(ocfs_super *osb);
static int ocfs_release_system_inodes(ocfs_super *osb);
static int ocfs2_fill_node_info(ocfs_super *osb);
+static int ocfs2_complete_mount_recovery(ocfs_super *osb);
static int ocfs_check_volume(ocfs_super * osb);
static int ocfs_verify_volume(ocfs2_dinode *di, struct buffer_head *bh,
__u32 sectsize);
@@ -239,7 +241,7 @@
for (i = OCFS2_FIRST_ONLINE_SYSTEM_INODE;
i <= OCFS2_LAST_GLOBAL_SYSTEM_INODE; i++) {
- new = ocfs_get_system_file_inode(osb, i, osb->node_num);
+ new = ocfs_get_system_file_inode(osb, i, osb->slot_num);
if (!new) {
ocfs_release_system_inodes(osb);
LOG_ERROR_STATUS(status = -EINVAL);
@@ -268,7 +270,7 @@
LOG_ENTRY();
for (i = OCFS2_LAST_GLOBAL_SYSTEM_INODE + 1; i < NUM_SYSTEM_INODES ; i++) {
- new = ocfs_get_system_file_inode(osb, i, osb->node_num);
+ new = ocfs_get_system_file_inode(osb, i, osb->slot_num);
if (!new) {
ocfs_release_system_inodes(osb);
LOG_ERROR_STATUS(status = -EINVAL);
@@ -364,11 +366,20 @@
sb->s_root = root;
- printk ("ocfs2: Mounting device (%u,%u) on %s (node %d)\n",
+ printk ("ocfs2: Mounting device (%u,%u) on %s (node %d, slot %d)\n",
MAJOR(sb->s_dev), MINOR(sb->s_dev),
- OcfsGlobalCtxt.node_name, osb->node_num);
+ OcfsGlobalCtxt.node_name, osb->node_num, osb->slot_num);
atomic_set(&osb->vol_state, VOLUME_MOUNTED);
+
+ if (osb->dirty) {
+ /* This must happen *after* setting the volume to
+ * MOUNTED as we may sleep on any recovery threads. */
+ status = ocfs2_complete_mount_recovery(osb);
+ if (status < 0)
+ LOG_EXIT_STATUS(status);
+ }
+
LOG_EXIT_STATUS(status);
return status;
@@ -845,15 +856,14 @@
goto bail;
}
- osb->global_node_num = osb->node_num = nm_this_node(group);
+ osb->group_inode = group;
+ osb->node_num = nm_this_node(group);
- printk("ocfs2: I am node %u, a member of group %s\n", osb->node_num,
+ printk("ocfs2: I am node %d, a member of group %s\n", osb->node_num,
osb->group_name);
status = 0;
bail:
- if (group)
- iput(group);
return status;
}
@@ -864,10 +874,10 @@
*/
static int ocfs_mount_volume (struct super_block *sb, int reclaim_id, struct inode *root)
{
- int status;
- ocfs_super *osb;
+ int status, sector_size;
+ int unlock_super = 0;
+ ocfs_super *osb = NULL;
struct buffer_head *bh = NULL;
- int sector_size;
LOG_ENTRY ();
@@ -920,6 +930,13 @@
goto leave;
}
+ status = ocfs2_super_lock(osb, 1);
+ if (status < 0) {
+ LOG_ERROR_STATUS (status);
+ goto leave;
+ }
+ unlock_super = 1;
+
/* This will load up the node map and add ourselves to it. */
status = ocfs2_find_slot(osb);
if (status < 0) {
@@ -946,7 +963,19 @@
goto leave;
}
+ /* This should be sent *after* we recovered our journal as it
+ * will cause other nodes to unmark us as needing
+ * recovery. However, we need to send it *before* dropping the
+ * super block lock as otherwise their recovery threads might
+ * try to clean us up while we're live! */
+ status = ocfs2_request_mount_vote(osb);
+ if (status < 0)
+ LOG_ERROR_STATUS (status);
+
leave:
+ if (unlock_super)
+ ocfs2_super_unlock(osb, 1);
+
if (bh != NULL)
brelse(bh);
LOG_EXIT_STATUS (status);
@@ -989,6 +1018,8 @@
/* Dismount */
OCFS_SET_FLAG (osb->osb_flags, OCFS_OSB_FLAGS_BEING_DISMOUNTED);
+ ocfs2_put_slot(osb);
+
ocfs2_dlm_shutdown(osb);
ocfs2_clear_hb_callbacks(osb);
@@ -1103,7 +1134,6 @@
MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
init_MUTEX (&(osb->recovery_lock));
- init_MUTEX (&(osb->orphan_recovery_lock));
osb->disable_recovery = 0;
@@ -1115,6 +1145,7 @@
spin_lock_init (&osb->clean_buffer_lock);
osb->node_num = OCFS_INVALID_NODE_NUM;
+ osb->slot_num = OCFS_INVALID_NODE_NUM;
osb->have_local_alloc = 0;
osb->local_alloc_bh = NULL;
@@ -1236,8 +1267,11 @@
printk("cluster bitmap inode: %llu, clusters per group: %u\n",
osb->bitmap_blkno, osb->bitmap_cpg);
- /* We might need to add a variable in Global List of osb to */
- /* delay any creation, if any other node is already creating a file */
+ status = ocfs2_init_slot_info(osb);
+ if (status < 0) {
+ LOG_ERROR_STATUS(status);
+ goto bail;
+ }
/* Link this osb onto the global linked list of all osb structures. */
/* The Global Link List is mainted for the whole driver . */
@@ -1324,6 +1358,35 @@
return status;
} /* ocfs_verify_volume */
+/* This part of local node recovery needs to happen after we've
+ * discovered all other nodes that need recovery and we've recovered
+ * them. */
+static int ocfs2_complete_mount_recovery(ocfs_super *osb)
+{
+ int status = 0;
+ ocfs2_dinode *local_alloc = osb->local_alloc_copy;
+
+ osb->local_alloc_copy = NULL;
+
+ if (osb->dirty) {
+ status = ocfs_complete_local_alloc_recovery(osb, local_alloc);
+ if (status < 0) {
+ LOG_ERROR_STATUS(status);
+ goto finally;
+ }
+
+ status = ocfs_recover_orphans(osb);
+ if (status < 0)
+ LOG_ERROR_STATUS(status);
+ }
+ osb->dirty = 0;
+
+finally:
+ if (local_alloc)
+ kfree(local_alloc);
+ return status;
+}
+
/*
* ocfs_check_volume()
*
@@ -1331,7 +1394,6 @@
static int ocfs_check_volume (ocfs_super * osb)
{
int status = 0;
- int node_num = osb->node_num;
int dirty;
ocfs2_dinode *local_alloc = NULL; /* only used if we
* recover
@@ -1366,7 +1428,7 @@
if (dirty) {
/* recover my local alloc if we didn't unmount cleanly. */
status = ocfs_begin_local_alloc_recovery(osb,
- node_num,
+ osb->slot_num,
&local_alloc);
if (status < 0) {
LOG_ERROR_STATUS(status);
@@ -1379,21 +1441,26 @@
LOG_TRACE_STR("Journal loaded.");
status = ocfs_load_local_alloc(osb);
- if (status < 0)
+ if (status < 0) {
LOG_ERROR_STATUS(status);
+ goto finally;
+ }
if (dirty) {
- status = ocfs_complete_local_alloc_recovery(osb, local_alloc);
- if (status < 0) {
- LOG_ERROR_STATUS(status);
- goto finally;
- }
-
- status = ocfs_recover_orphans(osb);
- if (status < 0)
- LOG_ERROR_STATUS(status);
+ /* Recovery will be completed after we've mounted the
+ * rest of the volume. */
+ osb->dirty = 1;
+ osb->local_alloc_copy = local_alloc;
+ local_alloc = NULL;
}
+ /* go through each journal, trylock it and if you get the
+ * lock, and it's marked as dirty, set the bit in the recover
+ * map and launch a recovery thread for it. */
+ status = ocfs2_mark_dead_nodes(osb);
+ if (status < 0)
+ LOG_ERROR_STATUS(status);
+
finally:
if (local_alloc)
kfree(local_alloc);
@@ -1424,6 +1491,12 @@
list_del (&(osb->osb_next));
up (&(OcfsGlobalCtxt.global_res));
+ if (osb->slot_info)
+ ocfs2_free_slot_info(osb->slot_info);
+
+ if (osb->group_inode)
+ iput(osb->group_inode);
+
/* FIXME
* This belongs in journal shutdown, but because we have to
* allocate osb->journal at the start of ocfs_initalize_osb(),
@@ -1432,7 +1505,8 @@
kfree(osb->journal);
if (osb->group_name)
kfree(osb->group_name);
-
+ if (osb->local_alloc_copy)
+ kfree(osb->local_alloc_copy);
memset (osb, 0, sizeof (ocfs_super));
LOG_EXIT ();
Modified: branches/dlm-glue/src/sysfile.c
===================================================================
--- branches/dlm-glue/src/sysfile.c 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/sysfile.c 2004-11-19 23:44:28 UTC (rev 1655)
@@ -46,10 +46,10 @@
/* Tracing */
#define OCFS_DEBUG_CONTEXT OCFS_DEBUG_CONTEXT_SYSFILE
-static struct inode * _ocfs_get_system_file_inode(ocfs_super *osb, int type, __u32 node);
+static struct inode * _ocfs_get_system_file_inode(ocfs_super *osb, int type, __u32 slot);
static inline int is_global_system_inode(int type);
-static inline int is_in_system_inode_array(ocfs_super *osb, int type, __u32 node);
+static inline int is_in_system_inode_array(ocfs_super *osb, int type, __u32 slot);
static inline int is_global_system_inode(int type)
{
@@ -57,19 +57,19 @@
type <= OCFS2_LAST_GLOBAL_SYSTEM_INODE);
}
-static inline int is_in_system_inode_array(ocfs_super *osb, int type, __u32 node)
+static inline int is_in_system_inode_array(ocfs_super *osb, int type, __u32 slot)
{
- return (node == osb->node_num || is_global_system_inode(type));
+ return (slot == osb->slot_num || is_global_system_inode(type));
}
struct inode *ocfs_get_system_file_inode(ocfs_super *osb, int type,
- __u32 node)
+ __u32 slot)
{
struct inode *inode = NULL;
struct inode **arr = NULL;
/* avoid the lookup if cached in local system file array */
- if (is_in_system_inode_array(osb, type, node))
+ if (is_in_system_inode_array(osb, type, slot))
arr = &(osb->system_inodes[type]);
if (arr && ((inode = *arr) != NULL)) {
@@ -82,7 +82,7 @@
}
/* this gets one ref thru iget */
- inode = _ocfs_get_system_file_inode(osb, type, node);
+ inode = _ocfs_get_system_file_inode(osb, type, slot);
/* add one more if putting into array for first time */
if (arr && inode) {
@@ -93,7 +93,7 @@
return inode;
}
-static struct inode * _ocfs_get_system_file_inode(ocfs_super *osb, int type, __u32 node)
+static struct inode * _ocfs_get_system_file_inode(ocfs_super *osb, int type, __u32 slot)
{
char namebuf[40];
struct inode *inode = NULL;
@@ -104,7 +104,7 @@
ocfs2_sprintf_system_inode_name(namebuf,
sizeof(namebuf),
- type, node);
+ type, slot);
status = ocfs_find_files_on_disk(osb, namebuf, strlen(namebuf),
&blkno, osb->sys_root_inode,
Modified: branches/dlm-glue/src/sysfile.h
===================================================================
--- branches/dlm-glue/src/sysfile.h 2004-11-19 22:58:39 UTC (rev 1654)
+++ branches/dlm-glue/src/sysfile.h 2004-11-19 23:44:28 UTC (rev 1655)
@@ -26,6 +26,6 @@
#ifndef OCFS2_SYSFILE_H
#define OCFS2_SYSFILE_H
-struct inode * ocfs_get_system_file_inode(ocfs_super *osb, int type, __u32 node);
+struct inode * ocfs_get_system_file_inode(ocfs_super *osb, int type, __u32 slot);
#endif /* OCFS2_SYSFILE_H */
More information about the Ocfs2-commits
mailing list