[Ocfs2-commits] khackel commits r791 - in trunk/src: . inc
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Fri Mar 19 01:37:44 CST 2004
Author: khackel
Date: 2004-03-19 01:37:42 -0600 (Fri, 19 Mar 2004)
New Revision: 791
Modified:
trunk/src/dir.c
trunk/src/dlm.c
trunk/src/file.c
trunk/src/inc/ocfs.h
trunk/src/inc/proto.h
trunk/src/namei.c
trunk/src/nm.c
trunk/src/oin.c
Log:
big change to add support for readonly cache locks
Modified: trunk/src/dir.c
===================================================================
--- trunk/src/dir.c 2004-03-18 22:41:21 UTC (rev 790)
+++ trunk/src/dir.c 2004-03-19 07:37:42 UTC (rev 791)
@@ -157,7 +157,7 @@
} /* ocfs_readdir */
/* ocfs_find_files_on_disk()
- *
+ * NOTE: this should always be called with inode->i_sem taken!
*/
int ocfs_find_files_on_disk (ocfs_super * osb, __u64 parent_off, struct qstr * file_name, struct buffer_head ** fe_bh, ocfs_file * ofile, struct inode *inode, bool take_lock)
{
@@ -171,10 +171,8 @@
struct buffer_head *bh = NULL;
struct buffer_head **bhs = NULL;
int bufsz, nbhs, i;
- __u32 lock_type = OCFS_DLM_SHARED_LOCK;
+ __u32 lock_type = OCFS_DLM_ENABLE_CACHE_LOCK;
- /* TODO: change this to take a buffer head instead of fe */
-
LOG_ENTRY_ARGS ("(osb=%p, parent=%u.%u, fname=%p, fe_bh=%p, ofile=%p, inode=%p)\n", osb, parent_off, file_name, fe_bh, ofile, inode);
nbhs = osb->vol_layout.dir_node_size >> 9;
@@ -197,12 +195,11 @@
}
OCFS_ASSERT(bhs);
+ sync = false;
if (take_lock) {
- /* Get a shared lock on the directory... */
- // temp change... try this out
- lock_type = OCFS_DLM_ENABLE_CACHE_LOCK;
- status = ocfs_acquire_lock (osb, parent_off, lock_type, FLAG_DIR,
- &lockres, &bh, inode);
+ /* Get a lock on the directory... */
+ status = ocfs_acquire_lock (osb, parent_off, lock_type, FLAG_DIR|FLAG_READDIR,
+ &lockres, &bh, inode);
if (status < 0) {
/* Volume should be disabled in this case */
if (status != -EINTR)
@@ -210,14 +207,9 @@
goto leave;
}
lock_acq = true;
- if (lockres->master_node_num == osb->node_num &&
- lockres->lock_type > OCFS_DLM_SHARED_LOCK)
- sync = false;
- else
+ if (lockres->master_node_num != osb->node_num ||
+ lockres->lock_type < OCFS_DLM_EXCLUSIVE_LOCK)
sync = true;
- } else {
- /* calling function has already taken a cache or exclusive lock */
- sync = false;
}
if (bhs[0]==NULL || bhs[0]->b_blocknr != (thisDirNode >> 9)) {
@@ -247,7 +239,7 @@
if (take_lock && lock_acq)
{
tmpstat = ocfs_release_lock (osb, parent_off, lock_type,
- FLAG_DIR, lockres, bh, inode);
+ FLAG_DIR|FLAG_READDIR, lockres, bh, inode);
if (tmpstat < 0) {
LOG_ERROR_STATUS (tmpstat);
/* Volume should be disabled in this case */
Modified: trunk/src/dlm.c
===================================================================
--- trunk/src/dlm.c 2004-03-18 22:41:21 UTC (rev 790)
+++ trunk/src/dlm.c 2004-03-19 07:37:42 UTC (rev 791)
@@ -33,17 +33,19 @@
/* Tracing */
#define OCFS_DEBUG_CONTEXT OCFS_DEBUG_CONTEXT_DLM
+int new_lock_function(ocfs_super * osb, __u32 requested_lock, __u32 flags, ocfs_lock_res * lockres, struct buffer_head *bh, bool *disk_vote, struct inode *inode);
+
+static inline int ocfs_wait_for_readonly_drop(ocfs_super *osb, ocfs_lock_res *lockres);
+
static int ocfs_insert_cache_link (ocfs_super * osb, ocfs_lock_res * lockres);
-static int ocfs_update_lock_state (ocfs_super * osb, ocfs_lock_res * lockres, __u32 flags, bool *disk_vote, struct inode *inode);
static int ocfs_send_dlm_request_msg (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, ocfs_lock_res * lockres, __u64 vote_map);
static int ocfs_request_vote (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, __u64 vote_map, __u64 * lock_seq_num, struct inode *inode);
static int ocfs_wait_for_vote (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, __u64 vote_map, __u32 time_to_wait, __u64 lock_seq_num, ocfs_lock_res * lockres);
-static int ocfs_reset_voting (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u64 vote_map);
+static int ocfs_reset_voting (ocfs_super * osb);
static int ocfs_wait_for_lock_release (ocfs_super * osb, __u64 offset, __u32 time_to_wait, ocfs_lock_res * lockres, __u32 lock_type, struct inode *inode);
static int ocfs_get_vote_on_disk (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, __u64 * got_vote_map, __u64 vote_map, __u64 lock_seq_num, __u64 * oin_open_map);
static int ocfs_break_cache_lock (ocfs_super * osb, ocfs_lock_res * lockres, struct inode *inode);
static int ocfs_disk_request_vote (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, __u64 vote_map, __u64 * lock_seq_num);
-int ocfs_make_lock_master (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, ocfs_lock_res * lockres, struct buffer_head *bh, bool *disk_vote, struct inode *inode);
int ocfs_update_disk_lock (ocfs_super * osb, ocfs_lock_res * lockres, __u32 flags, struct buffer_head **bh, struct inode *inode);
static int ocfs_update_master_on_open (ocfs_super * osb, ocfs_lock_res * lockres, struct inode *inode);
int ocfs_disk_release_lock (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, ocfs_lock_res * lockres, struct buffer_head *bh, struct inode *inode);
@@ -67,88 +69,8 @@
return status;
} /* ocfs_insert_cache_link */
-/*
- * ocfs_update_lock_state()
- *
- */
-static int ocfs_update_lock_state (ocfs_super * osb, ocfs_lock_res * lockres, __u32 flags, bool *disk_vote, struct inode *inode)
-{
- __u32 votemap;
- int status = 0;
- int tmpstat;
- __u64 lockseqno = 0;
- unsigned long jif = 0;
- LOG_ENTRY_ARGS ("(0x%08x, 0x%08x, %u)\n", osb, lockres, flags);
- ocfs_acquire_lockres (lockres);
- votemap = (1 << lockres->master_node_num);
-
- if (votemap == (1 << osb->node_num)) {
- status = 0;
- goto vote_success;
- }
-
- if (comm_voting && !*disk_vote) {
- LOG_TRACE_STR ("Network vote");
- jif = jiffies;
- status = ocfs_send_dlm_request_msg (osb, lockres->sector_num,
- lockres->lock_type, flags, lockres, votemap);
- if (status >= 0) {
- status = lockres->vote_status;
- if (status >= 0)
- goto vote_success;
- else
- goto finito;
- } else if (status == -ETIMEDOUT) {
- LOG_TRACE_STR ("Network voting timed out");
- }
- else
- LOG_ERROR_STATUS (status);
- lockres->vote_state = 0;
- }
-
- LOG_TRACE_STR ("Disk vote");
- *disk_vote = true;
- jif = jiffies;
- status = ocfs_request_vote (osb, lockres->sector_num,
- lockres->lock_type, flags, votemap, &lockseqno, inode);
- if (status < 0) {
- if (status != -EAGAIN)
- LOG_ERROR_STATUS (status);
- goto finito;
- }
-
- status = ocfs_wait_for_vote (osb, lockres->sector_num,
- lockres->lock_type, flags, votemap, 5000,
- lockseqno, lockres);
- if (status < 0) {
- if (status != -EAGAIN)
- LOG_ERROR_STATUS (status);
- goto finito;
- }
-
-vote_success:
- ocfs_break_cache_lock_zap_buffers(osb, inode);
-
- jif = jiffies - jif;
- LOG_TRACE_ARGS ("Lock time:%u\n", jif);
-
- if (flags & FLAG_CHANGE_MASTER)
- lockres->master_node_num = osb->node_num;
-finito:
- if (*disk_vote) {
- tmpstat = ocfs_reset_voting (osb, lockres->sector_num,
- lockres->lock_type, votemap);
- if (tmpstat < 0)
- LOG_ERROR_STATUS (tmpstat);
- }
- ocfs_release_lockres (lockres);
-
- LOG_EXIT_STATUS (status);
- return status;
-} /* ocfs_update_lock_state */
-
/*
* ocfs_disk_request_vote()
*
@@ -360,6 +282,7 @@
struct buffer_head *bh = NULL;
__u32 curr_master;
__u8 lock_level;
+ bool is_dir = false, disk_vote = false;
LOG_ENTRY_ARGS ("(0x%08x, %u.%u, %u, 0x%08x, %u)\n", osb,
HI (offset), LO (offset), time_to_wait,
@@ -377,6 +300,7 @@
fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(bh); /* read */
curr_master = DISK_LOCK_CURRENT_MASTER (fe);
lock_level = DISK_LOCK_FILE_LOCK (fe);
+ is_dir = (fe->attribs & OCFS_ATTRIB_DIRECTORY);
OCFS_BH_PUT_DATA(bh);
if ((curr_master == OCFS_INVALID_NODE_NUM) ||
@@ -384,7 +308,8 @@
goto got_it;
}
- if ((!IS_NODE_ALIVE (osb->publ_map, curr_master, OCFS_MAXIMUM_NODES)) && (!TEST_NODE_IN_RECOVERY(osb, curr_master))) {
+ if ((!IS_NODE_ALIVE (osb->publ_map, curr_master, OCFS_MAXIMUM_NODES)) &&
+ (!TEST_NODE_IN_RECOVERY(osb, curr_master))) {
/* Reset the lock as not owned and return success?? */
/* This needs to be under some sort of cluster wide lock, */
fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_WRITE(bh); /* write */
@@ -400,38 +325,81 @@
/* The local node is not the master */
if (lock_level == OCFS_DLM_ENABLE_CACHE_LOCK) {
- int tmpstat;
-
+ ocfs_acquire_lockres(lockres);
lockres->lock_type = lock_level;
lockres->master_node_num = curr_master;
- status = ocfs_break_cache_lock (osb, lockres, inode);
- if (status < 0) {
- if (status != -EINTR)
- LOG_ERROR_STATUS (status);
- goto finally;
+
+ if (is_dir) {
+ if (lockres->readonly_node != OCFS_INVALID_NODE_NUM) {
+ if (lockres->readonly_node == curr_master) {
+ // readonly cachelock already on this dir
+ printk("ocfs_wait_for_lock_release: ronode=master=%d\n", curr_master);
+ ocfs_release_lockres(lockres);
+ goto got_it;
+ } else {
+ LOG_ERROR_ARGS("(1) readonly node changed! was %d, now master is %d\n",
+ lockres->readonly_node, curr_master);
+ lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+ }
+ }
+
+ // no readonly node, need to alert owner to get readonly access
+ status = new_lock_function(osb, lockres->lock_type,
+ FLAG_DIR | FLAG_READDIR | FLAG_ACQUIRE_LOCK,
+ lockres, bh, &disk_vote, inode);
+ if (status < 0) {
+ ocfs_release_lockres(lockres);
+ if (status == -EAGAIN) {
+ if (ocfs_task_interruptible (osb)) {
+ LOG_TRACE_ARGS("interrupted... lockid=%u.%u\n",
+ HILO(lockres->sector_num));
+ status = -EINTR;
+ goto finally;
+ }
+ goto again;
+ }
+ goto finally;
+ }
+
+ printk("waitforlockrelease: setting ronode, was=%d, now=%d\n", lockres->readonly_node, lockres->master_node_num);
+ lockres->readonly_node = lockres->master_node_num;
+ printk("ocfs_wait_for_lock_release: cache->readonly ronode=master=%d\n", curr_master);
+ ocfs_release_lockres(lockres);
+ goto got_it;
+ } else {
+ ocfs_release_lockres(lockres);
+#warning need to deal with this
+ status = ocfs_break_cache_lock (osb, lockres, inode);
+ if (status < 0) {
+ if (status != -EINTR)
+ LOG_ERROR_STATUS (status);
+ goto finally;
+ }
+ tmpstat = ocfs_read_bh (osb, offset, &bh, 0, inode);
+ if (tmpstat < 0) {
+ LOG_ERROR_STATUS (tmpstat);
+ status = tmpstat;
+ goto finally;
+ }
+ LOG_TRACE_ARGS("broke cache lock, setting to NO_LOCK\n");
+ fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_WRITE(bh); /* write */
+ DISK_LOCK_FILE_LOCK (fe) = OCFS_DLM_NO_LOCK;
+ lock_level = OCFS_DLM_NO_LOCK;
+ OCFS_BH_PUT_DATA(bh);
+ tmpstat = ocfs_write_bh (osb, bh, 0, inode);
+ if (tmpstat < 0) {
+ LOG_ERROR_STATUS (tmpstat);
+ status = tmpstat;
+ goto finally;
+ }
}
- tmpstat = ocfs_read_bh (osb, offset, &bh, 0, inode);
- if (tmpstat < 0) {
- LOG_ERROR_STATUS (tmpstat);
- status = tmpstat;
- goto finally;
- }
- LOG_TRACE_ARGS("broke cache lock, setting to NO_LOCK\n");
- fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_WRITE(bh); /* write */
- DISK_LOCK_FILE_LOCK (fe) = OCFS_DLM_NO_LOCK;
- lock_level = OCFS_DLM_NO_LOCK;
- OCFS_BH_PUT_DATA(bh);
- tmpstat = ocfs_write_bh (osb, bh, 0, inode);
- if (tmpstat < 0) {
- LOG_ERROR_STATUS (tmpstat);
- status = tmpstat;
- goto finally;
- }
}
+
if (lock_level <= lock_type)
goto got_it;
-
+
+again:
brelse(bh);
ocfs_sleep (WAIT_FOR_VOTE_INCREMENT);
timewaited += WAIT_FOR_VOTE_INCREMENT;
@@ -449,6 +417,16 @@
lockres->master_node_num = DISK_LOCK_CURRENT_MASTER (fe);
lockres->oin_openmap = DISK_LOCK_OIN_MAP (fe);
lockres->last_lock_upd = DISK_LOCK_LAST_WRITE (fe);
+ if (lockres->readonly_node != OCFS_INVALID_NODE_NUM) {
+ if (lockres->readonly_node != lockres->master_node_num) {
+ LOG_ERROR_ARGS("(2) readonly node changed! was %d, now master is %d\n",
+ lockres->readonly_node, lockres->master_node_num);
+ lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+ } else if (lockres->lock_type != OCFS_DLM_ENABLE_CACHE_LOCK) {
+ LOG_ERROR_ARGS("readonly lock is not a cachelock any more!\n");
+ lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+ }
+ }
ocfs_release_lockres (lockres);
OCFS_BH_PUT_DATA(bh);
}
@@ -581,63 +559,7 @@
return (status);
} /* ocfs_get_vote_on_disk */
-/*
- * ocfs_disk_reset_voting()
- *
- */
-int ocfs_disk_reset_voting (ocfs_super * osb, __u64 lock_id, __u32 lock_type)
-{
- int status = 0;
- ocfs_publish *pubsect = NULL;
- __u64 offset = 0;
- struct buffer_head *bh = NULL;
- LOG_ENTRY_ARGS ("(0x%08x, %u.%u, %u)\n", osb, HI (lock_id),
- LO (lock_id), lock_type);
-
- LOG_TRACE_ARGS ("0x%08x, %u.%u, %u\n", osb, HI (lock_id),
- LO (lock_id), lock_type);
-
- /* take lock to prevent publish overwrites by vote_req and nm thread */
- down (&(osb->publish_lock));
-
- /* Read node's publish sector */
- offset = osb->vol_layout.publ_sect_off + (osb->node_num * osb->sect_size);
-
- status = ocfs_read_bh (osb, offset, &bh, 0, NULL);
- if (status < 0) {
- LOG_ERROR_STATUS (status);
- goto finally;
- }
- pubsect = (ocfs_publish *)OCFS_BH_GET_DATA_WRITE(bh); /* write */
-
- pubsect->dirty = false;
- pubsect->vote = 0;
- pubsect->vote_type = 0;
- pubsect->vote_map = 0;
- pubsect->dir_ent = 0;
-
- /* Write it back */
- OCFS_BH_PUT_DATA(bh);
- status = ocfs_write_bh (osb, bh, 0, NULL);
- if (status < 0) {
- LOG_ERROR_STATUS (status);
- goto finally;
- }
-
-
- osb->publish_dirty = false;
-
- atomic_set (&osb->node_req_vote, 0);
-
-finally:
- if (bh != NULL)
- brelse(bh);
- up (&(osb->publish_lock));
- LOG_EXIT_STATUS (status);
- return (status);
-} /* ocfs_disk_reset_voting */
-
/*
* ocfs_wait_for_vote()
*
@@ -709,16 +631,53 @@
* ocfs_reset_voting()
*
*/
-static int ocfs_reset_voting (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u64 vote_map)
+static int ocfs_reset_voting (ocfs_super * osb)
{
- int status;
+ int status = 0;
+ ocfs_publish *pubsect = NULL;
+ __u64 offset = 0;
+ struct buffer_head *bh = NULL;
LOG_ENTRY ();
- status = ocfs_disk_reset_voting (osb, lock_id, lock_type);
+ /* take lock to prevent publish overwrites by vote_req and nm thread */
+ down (&(osb->publish_lock));
+ /* Read node's publish sector */
+ offset = osb->vol_layout.publ_sect_off + (osb->node_num * osb->sect_size);
+
+ status = ocfs_read_bh (osb, offset, &bh, 0, NULL);
+ if (status < 0) {
+ LOG_ERROR_STATUS (status);
+ goto finally;
+ }
+ pubsect = (ocfs_publish *)OCFS_BH_GET_DATA_WRITE(bh); /* write */
+
+ pubsect->dirty = false;
+ pubsect->vote = 0;
+ pubsect->vote_type = 0;
+ pubsect->vote_map = 0;
+ pubsect->dir_ent = 0;
+
+ /* Write it back */
+ OCFS_BH_PUT_DATA(bh);
+ status = ocfs_write_bh (osb, bh, 0, NULL);
+ if (status < 0) {
+ LOG_ERROR_STATUS (status);
+ goto finally;
+ }
+
+
+ osb->publish_dirty = false;
+
+ atomic_set (&osb->node_req_vote, 0);
+
+finally:
+ if (bh != NULL)
+ brelse(bh);
+ up (&(osb->publish_lock));
LOG_EXIT_STATUS (status);
- return status;
+ return (status);
} /* ocfs_reset_voting */
/*
@@ -822,119 +781,7 @@
return status;
} /* ocfs_send_dlm_request_msg */
-/*
- * ocfs_make_lock_master()
- *
- */
-int ocfs_make_lock_master (ocfs_super * osb, __u64 lock_id, __u32 lock_type, __u32 flags, ocfs_lock_res * lockres, struct buffer_head *bh, bool *disk_vote, struct inode *inode)
-{
- __u64 vote_map = 0;
- __u64 lockseqnum = 0;
- int status = 0;
- int tmpstat;
- unsigned long jif;
- ocfs_file_entry *fe = NULL;
- LOG_ENTRY ();
-
- ocfs_acquire_lockres (lockres);
- fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(bh); /* read */
-
- vote_map = osb->publ_map;
- if (((flags & FLAG_FILE_DELETE) || (flags & FLAG_FILE_RENAME)) &&
- (!(flags & FLAG_DIR)) &&
- (DISK_LOCK_CURRENT_MASTER (fe) == osb->node_num)) {
- vote_map = DISK_LOCK_OIN_MAP (fe);
- vote_map &= osb->publ_map; /* remove all dead nodes */
- }
- vote_map &= ~(1 << osb->node_num);
- OCFS_BH_PUT_DATA(bh);
-
- if (vote_map == 0) {
- /* As this is the only node alive, make it master of the lock */
- if (lockres->lock_type <= lock_type)
- lockres->lock_type = (__u8) lock_type;
- lockres->master_node_num = osb->node_num;
-
- status = ocfs_update_disk_lock (osb, lockres,
- DLOCK_FLAG_MASTER | DLOCK_FLAG_LOCK, &bh, inode);
- if (status < 0) {
- LOG_ERROR_STATUS (status);
- goto bail;
- }
- goto bail;
- }
-
-
- if (comm_voting && !*disk_vote) {
- LOG_TRACE_STR ("Network vote");
- jif = jiffies;
- status = ocfs_send_dlm_request_msg (osb, lock_id, lock_type,
- flags, lockres, vote_map);
- if (status >= 0) {
- status = lockres->vote_status;
- if (status >= 0)
- goto vote_success;
- else
- goto bail;
- } else if (status == -ETIMEDOUT) {
- LOG_TRACE_STR ("Network voting timed out");
- }
- else
- LOG_ERROR_STATUS (status);
- lockres->vote_state = 0;
- }
-
- LOG_TRACE_STR ("Disk vote");
- *disk_vote = true;
- jif = jiffies;
- status = ocfs_request_vote (osb, lock_id, lock_type, flags, vote_map,
- &lockseqnum, inode);
- if (status < 0) {
- if (status != -EAGAIN)
- LOG_ERROR_STATUS (status);
- goto bail;
- }
-
- status = ocfs_wait_for_vote (osb, lock_id, lock_type, flags, vote_map,
- 5000, lockseqnum, lockres);
- if (status < 0) {
- if (status != -EAGAIN)
- LOG_ERROR_STATUS (status);
- goto bail;
- }
-
-vote_success:
- jif = jiffies - jif;
- LOG_TRACE_ARGS ("Lock time: %u\n", jif);
-
- /* Make this node the master of this lock */
- if (lockres->lock_type <= lock_type)
- lockres->lock_type = (__u8) lock_type;
-
- lockres->master_node_num = osb->node_num;
-
- /* Write that we now are the master to the disk */
- status = ocfs_update_disk_lock (osb, lockres,
- DLOCK_FLAG_MASTER | DLOCK_FLAG_LOCK | DLOCK_FLAG_OPEN_MAP, &bh, inode);
- if (status < 0) {
- LOG_ERROR_STATUS (status);
- goto bail;
- }
-
-bail:
-
- if (*disk_vote) {
- tmpstat = ocfs_reset_voting (osb, lock_id, lock_type, vote_map);
- if (tmpstat < 0)
- LOG_ERROR_STATUS (tmpstat);
- }
- ocfs_release_lockres (lockres);
-
- LOG_EXIT_STATUS (status);
- return status;
-} /* ocfs_make_lock_master */
-
/*
* ocfs_acquire_lockres_ex()
*
@@ -1136,8 +983,8 @@
}
ocfs_release_lockres (lockres);
} else {
- status = ocfs_update_lock_state (osb, lockres,
- FLAG_ADD_OIN_MAP, &disk_vote, inode);
+ status = new_lock_function(osb, lockres->lock_type, FLAG_ADD_OIN_MAP, lockres,
+ NULL, &disk_vote, inode);
if (status < 0) {
if (status != -EAGAIN)
LOG_ERROR_STATUS (status);
@@ -1198,6 +1045,10 @@
lockres->writer_node_num = OCFS_INVALID_NODE_NUM;
lockres->reader_node_num = OCFS_INVALID_NODE_NUM;
+ lockres->readonly_map = 0ULL;
+ lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+ lockres->readonly_dropping = false;
+
lockres->lock_holders = 0;
LOG_TRACE_ARGS("lockres->lock_holders = %u\n", lockres->lock_holders);
@@ -1385,13 +1236,17 @@
int lockflags = (lock_id >= osb->vol_layout.bitmap_off ? OCFS_BH_CACHED : 0);
/* TODO: 40 bytes of "bool" sitting on the stack for now. move */
/* mutually exclusive flags into an enum and switch on them */
- bool disk_vote = false, keep_exclusive = false, local_lock = false;
+ bool disk_vote = false;
bool no_owner = false, owner_dead = false, wait_on_recovery = false;
- bool truncate_extend = false, have_cache_already = false;
int lock_path = invalid_path;
+ __u32 extra_lock_flags = 0;
LOG_ENTRY_ARGS ("(0x%08x, %u.%u, %u, %u, 0x%08x, 0x%08x)\n", osb,
HI (lock_id), LO (lock_id), lock_type, flags, lr, bh);
+
+
+ OCFS_ASSERT(lock_type != OCFS_DLM_NO_LOCK);
+ OCFS_ASSERT(lock_type != OCFS_DLM_SHARED_LOCK);
if (bh != NULL)
b = bh;
@@ -1407,44 +1262,12 @@
LOG_ERROR_STATUS (status);
goto bail;
}
-
- /* NO_LOCK */
- if (lock_type == OCFS_DLM_NO_LOCK)
- goto bail;
-
- /* SHARED */
- if (lock_type == OCFS_DLM_SHARED_LOCK) {
- if (!(flags & FLAG_DIR))
- goto bail;
- ocfs_acquire_lockres (lockres);
- if (lockres->lock_type == OCFS_DLM_NO_LOCK)
- lockres->lock_type = OCFS_DLM_SHARED_LOCK;
- else if ((lockres->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK) &&
- (lockres->master_node_num != osb->node_num))
- status = ocfs_break_cache_lock (osb, lockres, inode);
- if (status < 0) {
- if (status != -EINTR)
- LOG_ERROR_STATUS (status);
- ocfs_release_lockres (lockres);
- goto bail;
- }
- lockres->lock_holders++;
- LOG_TRACE_ARGS("lockres->lock_holders = %u\n",
- lockres->lock_holders);
- atomic_inc (&(lockres->lr_share_cnt));
- ocfs_release_lockres (lockres);
- goto bail;
- }
-
- /* EXCLUSIVE or CACHE */
- status = 0;
ocfs_get_lockres (lockres);
again:
ocfs_acquire_lockres (lockres);
- k++;
- LOG_TRACE_ARGS("attempting to get lock, pass: %d\n", k);
+ LOG_TRACE_ARGS("attempting to get lock, pass: %d\n", ++k);
if (lockres->master_node_num == osb->node_num)
updated = true;
@@ -1458,18 +1281,6 @@
}
disklock = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(*b); /* read */
-#ifdef SUSPICIOUS_CODE
- // This code is added to avoid the case when fileentry is not yet updated
- // but the lockresource is updated by NMthread and needsflush is set to FALSE.
- if (lockres->master_node_num != osb->node_num &&
- DISK_LOCK_CURRENT_MASTER (disklock) == osb->node_num) {
- OCFS_BH_PUT_DATA(*b);
- ocfs_release_lockres (lockres);
- ocfs_sleep (1000);
- goto again;
- }
-#endif
-
if (lockres->master_node_num != osb->node_num ||
lockres->master_node_num != DISK_LOCK_CURRENT_MASTER (disklock)) {
lockres->master_node_num = DISK_LOCK_CURRENT_MASTER (disklock);
@@ -1480,6 +1291,7 @@
OCFS_BH_PUT_DATA(*b);
}
+reevaluate:
no_owner = (lockres->master_node_num == OCFS_INVALID_NODE_NUM);
/* master node is an invalid node */
@@ -1489,181 +1301,122 @@
goto finally;
}
- truncate_extend = (flags & (FLAG_FILE_EXTEND | FLAG_FILE_TRUNCATE));
- local_lock = (lockres->master_node_num == osb->node_num);
wait_on_recovery = TEST_NODE_IN_RECOVERY(osb, lockres->master_node_num);
owner_dead = !(no_owner || IS_NODE_ALIVE(osb->publ_map,
lockres->master_node_num, OCFS_MAXIMUM_NODES));
+ if ((owner_dead || wait_on_recovery) &&
+ lockres->readonly_node == lockres->master_node_num) {
+ // if owner is dead or in recovery and the lockres
+ // has the readonly owner set, clear it
+ lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+ }
- if (!local_lock && (wait_on_recovery || no_owner || owner_dead)) {
- lock_path = become_master;
+ status = 0;
+ extra_lock_flags = 0;
+
+ if (flags & FLAG_READDIR) {
+ if (lockres->readonly_node != OCFS_INVALID_NODE_NUM)
+ goto skip_lock_write;
+ if (lockres->master_node_num == osb->node_num &&
+ lockres->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK) {
+ /* local node is master */
+ printk("acquirelock: setting ronode, was=%d, now=%d, master=%d\n",
+ lockres->readonly_node, osb->node_num, lockres->master_node_num);
+ lockres->readonly_node = osb->node_num;
+ goto skip_lock_write;
+ }
+
+ if (lockres->master_node_num == OCFS_INVALID_NODE_NUM ||
+ owner_dead || wait_on_recovery) {
+ /* no master or dead master */
+ extra_lock_flags = FLAG_REMASTER;
+ } else {
+ /* valid master, but either not cachelock or elsewhere */
+ if (lockres->lock_type != OCFS_DLM_ENABLE_CACHE_LOCK) {
+ /* treat just like a normal master change request */
+ extra_lock_flags = FLAG_CHANGE_MASTER;
+ }
+ }
+ goto do_lock;
+ }
+
+ // anything else is NOT a readdir request
+ if (lockres->readonly_node != osb->node_num)
+ lockres->readonly_node = OCFS_INVALID_NODE_NUM; // clear any owner
+
+ status = ocfs_wait_for_readonly_drop(osb, lockres);
+ if (status < 0) {
+ if (status == -ETIMEDOUT)
+ goto again;
+ if (status == -EAGAIN)
+ goto reevaluate;
+ LOG_ERROR_STATUS(status);
+ goto finally;
+ }
+
+ if (lockres->master_node_num != osb->node_num &&
+ (wait_on_recovery || no_owner || owner_dead)) {
+ extra_lock_flags = FLAG_REMASTER;
} else if (flags & (FLAG_FILE_DELETE | FLAG_FILE_RENAME)) {
- lock_path = get_x;
- } else if (local_lock) {
- if (truncate_extend)
- lock_path = become_master;
+ if (ocfs_journal_new_file_search(osb, lock_id)!=0) {
+ extra_lock_flags = 0;
+ } else if (lockres->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK)
+ extra_lock_flags = FLAG_FAST_PATH_LOCK;
else
- lock_path = fast_path;
+ extra_lock_flags = FLAG_CHANGE_MASTER;
+ } else if (lockres->master_node_num == osb->node_num) {
+ if (flags & (FLAG_FILE_EXTEND | FLAG_FILE_TRUNCATE) &&
+ ocfs_journal_new_file_search(osb, lock_id)!=0)
+ extra_lock_flags = FLAG_REMASTER;
+ else if (lockres->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK)
+ extra_lock_flags = FLAG_FAST_PATH_LOCK;
+ else
+ extra_lock_flags = FLAG_CHANGE_MASTER;
} else {
- lock_path = master_request;
+ extra_lock_flags = FLAG_CHANGE_MASTER;
}
- /* hack upon hack... if the cachelock is still sitting around, skip voting */
- if ((lock_path == become_master || lock_path == get_x) &&
- ocfs_journal_new_file_search(osb, lock_id)==0)
- lock_path = fast_path;
- if (lock_path == fast_path && lockres->lock_type != OCFS_DLM_ENABLE_CACHE_LOCK) {
- LOG_TRACE_ARGS("testing testing!!! flipping this fast_path to master_request\n");
- lock_path = master_request;
- }
-
+do_lock:
+ flags |= extra_lock_flags;
- LOG_TRACE_ARGS("lockres: master=%d, locktype=%d, flags: %d, lock_path: %s\n",
- lockres->master_node_num, lockres->lock_type, flags,
- lock_path_str(lock_path));
-
- switch (lock_path) {
- case fast_path: /* master node is this node */
- {
- /* specifically keep an exclusive if we already have one on */
- /* this node even if we are asking for a cache lock */
- disklock = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(*b); /* read */
- keep_exclusive = (DISK_LOCK_FILE_LOCK (disklock) == OCFS_DLM_EXCLUSIVE_LOCK);
- if (keep_exclusive)
- LOG_ERROR_STR("keep_exclusive set!");
- OCFS_BH_PUT_DATA(*b);
- break;
- }
- case become_master: /* there is no master, or master needs recovery */
- case get_x: /* a delete or rename request */
- {
- if (wait_on_recovery && !(flags & FLAG_FILE_RECOVERY)) {
- int waitcnt = 0;
- LOG_TRACE_ARGS("Waiting on node %u to be recovered\n",
- lockres->master_node_num);
- while (1) {
- LOG_TRACE_ARGS("waitcnt = %d\n", waitcnt);
- if (!TEST_NODE_IN_RECOVERY(osb, lockres->master_node_num))
- break;
- ocfs_sleep(500);
- }
- }
+ LOG_TRACE_ARGS("lockres: master=%d, locktype=%d, flags: %08x\n",
+ lockres->master_node_num, lockres->lock_type, flags);
- status = ocfs_make_lock_master (osb, lock_id, lock_type,
- flags, lockres, *b, &disk_vote, inode);
+ printk("lockres: lockid=%u.%u, this=%d, master=%d, locktype=%d, flags=%08x, ronode=%d, romap=%08x\n",
+ lockres->sector_num, osb->node_num, lockres->master_node_num, lockres->lock_type, flags,
+ lockres->readonly_node, lockres->readonly_map);
- if (status < 0) {
- ocfs_release_lockres (lockres);
- if (status == -EAGAIN) {
- ocfs_sleep (500);
- if (ocfs_task_interruptible (osb)) {
- LOG_TRACE_ARGS("interrupted... lockid=%u.%u\n", HILO(lock_id));
- status = -EINTR;
- goto finally;
- }
-
- updated = false;
- goto again;
- }
- goto finally;
- }
-
- /* make lock master succeeded */
- /* so why, if get_x and the make lock master do the same thing,
- * does the make lock master path need to rewrite the stuff to disk
- * but the get_x path doesn't ???? */
- if (get_x)
- goto skip_lock_write;
- keep_exclusive = false;
- break;
+ if (wait_on_recovery && !(flags & FLAG_FILE_RECOVERY)) {
+ int waitcnt = 0;
+ LOG_TRACE_ARGS("Waiting on node %u to be recovered\n",
+ lockres->master_node_num);
+ while (1) {
+ LOG_TRACE_ARGS("waitcnt = %d\n", waitcnt);
+ if (!TEST_NODE_IN_RECOVERY(osb, lockres->master_node_num))
+ break;
+ ocfs_sleep(500);
}
-#if 0 // we never hit this case anymore. lets not bloat things...
- case wait_for_release: /* there is a valid, live master and it's not this node */
- /* if the lock is acquired already by the master wait */
- /* for release, else change master */
- {
- ocfs_release_lockres(lockres);
- status = ocfs_wait_for_lock_release (osb, lock_id, 30000, lockres,
- ((flags & FLAG_DIR) ? OCFS_DLM_SHARED_LOCK : OCFS_DLM_NO_LOCK), inode);
- if (status == 0 || status == -ETIMEDOUT) {
- /* lock released or waited too long, back to top */
- if (status == -ETIMEDOUT) {
- LOG_TRACE_ARGS("lock %u.%u, level %d, not being freed by node %u\n",
- HILO(lock_id), lockres->lock_type, lockres->master_node_num);
- }
- updated = false;
- goto again;
- }
- if (status != -EINTR) {
- LOG_ERROR_STR ("Lock owner is alive and taking too much time");
- LOG_ERROR_STATUS(status);
- }
- goto finally;
- }
-#endif
- case master_request:
- {
- status = ocfs_update_lock_state (osb, lockres, flags | FLAG_CHANGE_MASTER,
- &disk_vote, inode);
- if (status < 0) {
- ocfs_release_lockres (lockres);
- if (status == -EAGAIN) {
- ocfs_sleep (500);
- if (ocfs_task_interruptible (osb)) {
- LOG_TRACE_ARGS("interrupted... lockid=%u.%u\n",
- HILO(lockres->sector_num));
- status = -EINTR;
- goto finally;
- }
- updated = false;
- goto again;
- }
+ }
+
+ status = new_lock_function(osb, lock_type, flags, lockres, *b, &disk_vote, inode);
+ if (status < 0) {
+ ocfs_release_lockres (lockres);
+ if (status == -EAGAIN) {
+ ocfs_sleep (500);
+ if (ocfs_task_interruptible (osb)) {
+ LOG_TRACE_ARGS("interrupted... lockid=%u.%u\n",
+ HILO(lockres->sector_num));
+ status = -EINTR;
goto finally;
}
- // successfully got vote to change master
- status = ocfs_read_bh (osb, lock_id, b, lockflags, inode);
- if (status < 0) {
- LOG_ERROR_STATUS (status);
- goto finally;
- }
-
- keep_exclusive = false;
- break;
+ updated = false;
+ goto again;
}
- default:
- {
- LOG_ERROR_ARGS("unknown lock type (path=%d)\n",
- lock_path);
- status = -EINVAL;
- goto finally;
- }
- }
-
- disklock = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(*b); /* read */
- have_cache_already = (DISK_LOCK_CURRENT_MASTER (disklock) == osb->node_num &&
- DISK_LOCK_FILE_LOCK (disklock) == OCFS_DLM_ENABLE_CACHE_LOCK);
- OCFS_BH_PUT_DATA(*b);
-
- if (!keep_exclusive && !have_cache_already) {
- disklock = (ocfs_file_entry *)OCFS_BH_GET_DATA_WRITE(*b); /* write */
- DISK_LOCK_CURRENT_MASTER (disklock) = osb->node_num;
- DISK_LOCK_FILE_LOCK (disklock) = lock_type;
- OCFS_BH_PUT_DATA(*b);
-
- status = ocfs_write_bh (osb, *b, 0, inode);
- if (status < 0) {
- LOG_ERROR_STATUS (status);
- goto finally;
- }
- }
-
+ goto finally;
+ }
+
/* We got the lock */
- disklock = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(*b); /* read */
- lockres->lock_type = lock_type;
- lockres->master_node_num = osb->node_num;
- lockres->oin_openmap = DISK_LOCK_OIN_MAP (disklock);
- OCFS_BH_PUT_DATA(*b);
status = 0;
skip_lock_write:
@@ -1763,23 +1516,6 @@
if (!(flags & FLAG_FILE_UPDATE_OIN) && !(flags & FLAG_FILE_DELETE))
goto finally;
-#if 0
- if (comm_voting) {
- LOG_TRACE_STR ("Network vote");
- status = ocfs_send_dlm_request_msg (osb, lock_id, lock_type,
- flags, lockres, votemap);
- if (status >= 0)
- goto finally;
- if (status == -ETIMEDOUT) {
- LOG_TRACE_STR ("Network voting timed out");
- lockres->vote_state = 0;
- }
- }
-
- LOG_TRACE_STR ("Disk vote");
- disk_vote = true;
- jif = jiffies;
-#endif
status = -EAGAIN;
while (status == -EAGAIN) {
if (comm_voting && !disk_vote) {
@@ -1820,7 +1556,7 @@
goto finito;
}
- tmpstat = ocfs_reset_voting (osb, lock_id, lock_type, oin_node_map);
+ tmpstat = ocfs_reset_voting (osb);
if (tmpstat < 0) {
LOG_ERROR_STATUS (status = tmpstat);
goto finito;
@@ -1841,7 +1577,7 @@
LOG_TRACE_ARGS ("Lock time: %u\n", jif);
if (disk_vote && !disk_reset) {
- tmpstat = ocfs_reset_voting (osb, lock_id, lock_type, oin_node_map);
+ tmpstat = ocfs_reset_voting (osb);
if (tmpstat < 0)
LOG_ERROR_STATUS (tmpstat);
}
@@ -1849,14 +1585,15 @@
fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_WRITE(*b); /* write */
LOG_TRACE_ARGS("writing lock now... releasemaster: %s, level: %d, master: %d\n",
- flags & FLAG_FILE_RELEASE_MASTER, DISK_LOCK_FILE_LOCK (fe),
+ flags & FLAG_FILE_RELEASE_MASTER ? "yes" : "no",
+ DISK_LOCK_FILE_LOCK (fe),
DISK_LOCK_CURRENT_MASTER (fe));
if (flags & FLAG_FILE_RELEASE_MASTER)
DISK_LOCK_CURRENT_MASTER (fe) = OCFS_INVALID_NODE_NUM;
if ((DISK_LOCK_FILE_LOCK (fe) == OCFS_DLM_ENABLE_CACHE_LOCK) &&
(DISK_LOCK_CURRENT_MASTER (fe) == osb->node_num)) {
- lockres->lock_state = OCFS_DLM_ENABLE_CACHE_LOCK;
+ lockres->lock_type = OCFS_DLM_ENABLE_CACHE_LOCK;
cachelock = true;
LOG_TRACE_STR("keeping at CACHE_LOCK");
}
@@ -1907,26 +1644,21 @@
OCFS_BH_PUT_DATA(bh);
}
+ OCFS_ASSERT(lock_type != OCFS_DLM_SHARED_LOCK);
- if (lock_type == OCFS_DLM_SHARED_LOCK) {
- if (atomic_dec_and_test (&lockres->lr_share_cnt)) {
- if (lockres->lock_type == OCFS_DLM_SHARED_LOCK)
- lockres->lock_type = OCFS_DLM_NO_LOCK;
- }
+ if ((lockres->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK) &&
+ (lockres->master_node_num == osb->node_num) &&
+ !(flags & FLAG_FILE_DELETE)) {
status = 0;
goto finally;
}
- /*
- * Change flags based on which kind of lock we are releasing
- * For directory we need special handling of oin updates when the release
- * is for XBcast
- * For file we need to update oin's
- * For Shared we need to update the lock state locally only
- */
- if ((lockres->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK) &&
- (lockres->master_node_num == osb->node_num) &&
- !(flags & FLAG_FILE_DELETE)) {
+ if (flags & FLAG_READDIR) {
+ if (lockres->lock_type != OCFS_DLM_ENABLE_CACHE_LOCK ||
+ lockres->master_node_num != lockres->readonly_node ||
+ lockres->master_node_num == OCFS_INVALID_NODE_NUM)
+ LOG_ERROR_ARGS("READDIR release has issues! type=%d, master=%d, ronode=%d\n",
+ lockres->lock_type, lockres->master_node_num, lockres->readonly_node);
status = 0;
goto finally;
}
@@ -1968,96 +1700,10 @@
} /* ocfs_init_dlm */
/*
- * ocfs_create_log_extent_map()
- *
- */
-int ocfs_create_log_extent_map (ocfs_super * osb, __u64 diskOffset, __u64 ByteCount)
-{
- int status = 0;
- __u32 i;
- __u64 remainingLength;
- __u32 length, byte_cnt;
- __u32 num_runs = 0;
- __s64 to_find = 0, found_foff = 0, found_doff = 0;
-
- LOG_ENTRY ();
-
- to_find = diskOffset;
- remainingLength = ByteCount;
-
- ocfs_down_sem (&(osb->map_lock), true);
-
- num_runs = osb->metadata_map.count;
-
- for (i = 0; i < num_runs; i++) {
- if (!ocfs_get_next_extent_map_entry
- (osb, &osb->metadata_map, i, &found_foff,
- &found_doff, &length))
- continue;
-
- if (found_doff >= (to_find + remainingLength))
- break;
-
- if (to_find >= (found_doff + length)) {
- /* This meta data run is before the relevant stf */
- continue;
- }
-
- if (to_find < found_doff) {
- if (to_find + remainingLength > found_doff) {
- /* We have a data run and a metadata run */
- remainingLength -= found_doff - to_find;
- to_find = found_doff;
- byte_cnt = remainingLength<length?remainingLength:length;
- osb->log_file_size += byte_cnt;
- ocfs_add_extent_map_entry (osb, &osb->trans_map,
- found_doff, found_doff, byte_cnt);
- remainingLength -= byte_cnt;
- to_find += byte_cnt;
- if (remainingLength > 0)
- continue;
- else
- break;
- }
- } else { /* if ((to_find >= found_doff)) */
- if (to_find + remainingLength <= found_doff + length) {
- /* It is only metadata */
- osb->log_file_size += remainingLength;
- ocfs_add_extent_map_entry (osb, &osb->trans_map,
- to_find, to_find, remainingLength);
- remainingLength -= remainingLength;
- to_find += remainingLength;
- break;
- } else {
- /* Meta data and as yet unknown data */
- byte_cnt = length - (to_find - found_doff);
- osb->log_file_size += byte_cnt;
- ocfs_add_extent_map_entry (osb, &osb->trans_map,
- to_find, to_find, byte_cnt);
- remainingLength -= byte_cnt;
- to_find += byte_cnt;
- continue;
- }
- }
- }
-
- ocfs_up_sem (&(osb->map_lock));
-
- /* Create new extent map from real runs */
-
-
- if (osb->log_file_size >= OCFS_TRANS_FLUSH_LIMIT) {
- osb->needs_flush = true;
- }
-
- LOG_EXIT_STATUS (status);
- return status;
-} /* ocfs_create_log_extent_map */
-
-/*
* ocfs_break_cache_lock()
*
*/
+/* TODO: merge down into new lock function */
static int ocfs_break_cache_lock (ocfs_super * osb, ocfs_lock_res * lockres, struct inode *inode)
{
int status;
@@ -2078,27 +1724,6 @@
jif = jiffies;
-#if 0
- if (comm_voting) {
- LOG_TRACE_STR ("Network vote");
- status = ocfs_send_dlm_request_msg (osb, lockres->sector_num,
- lockres->lock_type,
- FLAG_FILE_RELEASE_CACHE,
- lockres, votemap);
- if (status >= 0) {
- lockres->lock_type = OCFS_DLM_NO_LOCK;
- goto finally;
- }
- if (status == -ETIMEDOUT) {
- LOG_TRACE_STR ("Network voting timed out");
- lockres->vote_state = 0;
- }
- }
-
- LOG_TRACE_STR ("Disk vote");
- disk_vote = true;
- jif = jiffies;
-#endif
status = -EAGAIN;
while (status == -EAGAIN) {
if (!IS_NODE_ALIVE (osb->publ_map, lockres->master_node_num,
@@ -2165,8 +1790,7 @@
}
reset:
- tmpstat = ocfs_reset_voting (osb, lockres->sector_num,
- lockres->lock_type, votemap);
+ tmpstat = ocfs_reset_voting (osb);
if (tmpstat < 0) {
LOG_ERROR_STATUS (status = tmpstat);
goto finally;
@@ -2203,8 +1827,7 @@
LOG_TRACE_ARGS ("Lock time: %u\n", jif);
if (disk_vote && !disk_reset) {
- tmpstat = ocfs_reset_voting (osb, lockres->sector_num,
- lockres->lock_type, votemap);
+ tmpstat = ocfs_reset_voting (osb);
if (tmpstat < 0)
LOG_ERROR_STATUS (tmpstat);
}
@@ -2239,3 +1862,290 @@
ocfs_inc_inode_seq(osb, inode, false);
return 0;
}
+
+
+/* TODO: merge down into new lock function */
+int ocfs_send_readonly_drop_message(ocfs_super *osb, ocfs_lock_res *lockres, __u64 vote_map)
+{
+ int status = 0, tmpstat;
+ __u64 lock_id = lockres->sector_num, lockseqnum = 0;
+ bool disk_vote = false;
+
+ LOG_ENTRY ();
+
+ if (comm_voting) {
+ status = ocfs_send_dlm_request_msg (osb, lock_id, OCFS_DLM_ENABLE_CACHE_LOCK,
+ FLAG_DROP_READONLY, lockres, vote_map);
+ if (status >= 0) {
+ status = lockres->vote_status;
+ goto bail;
+ } else if (status == -ETIMEDOUT)
+ LOG_TRACE_STR ("Network voting timed out");
+ else
+ LOG_ERROR_STATUS (status);
+ lockres->vote_state = 0;
+ }
+
+ disk_vote = true;
+ status = ocfs_request_vote (osb, lock_id, OCFS_DLM_ENABLE_CACHE_LOCK, FLAG_DROP_READONLY,
+ vote_map, &lockseqnum, NULL);
+ if (status < 0) {
+ if (status != -EAGAIN)
+ LOG_ERROR_STATUS (status);
+ goto bail;
+ }
+
+ status = ocfs_wait_for_vote (osb, lock_id, OCFS_DLM_ENABLE_CACHE_LOCK, FLAG_DROP_READONLY,
+ vote_map, 5000, lockseqnum, lockres);
+ if (status < 0) {
+ if (status != -EAGAIN)
+ LOG_ERROR_STATUS (status);
+ goto bail;
+ }
+
+bail:
+ if (disk_vote) {
+ tmpstat = ocfs_reset_voting (osb);
+ if (tmpstat < 0)
+ LOG_ERROR_STATUS (tmpstat);
+ }
+
+ LOG_EXIT_STATUS (status);
+ return status;
+}
+
+
+int new_lock_function(ocfs_super * osb, __u32 requested_lock, __u32 flags, ocfs_lock_res * lockres, struct buffer_head *bh, bool *disk_vote, struct inode *inode)
+{
+ __u64 vote_map = 0;
+ __u64 lockseqnum = 0;
+ int tmpstat;
+ ocfs_file_entry *fe = NULL;
+ __u64 lock_id;
+ __u32 lock_write_flags = DLOCK_FLAG_MASTER | DLOCK_FLAG_LOCK | DLOCK_FLAG_OPEN_MAP;
+ __u32 lock_type = requested_lock;
+ bool need_to_zap_buffers = false, need_lock_write = true;
+ bool is_readdir = (flags & FLAG_READDIR) ? true : false;
+ int status = 0;
+
+ LOG_ENTRY ();
+
+ ocfs_acquire_lockres (lockres);
+ lock_id = lockres->sector_num;
+
+ if (flags & FLAG_READDIR) {
+ if (flags & (FLAG_CHANGE_MASTER | FLAG_REMASTER)) {
+ /* there is no readonly_node. treat like normal change master. */
+ flags &= ~FLAG_READDIR;
+ }
+ } else if (flags & FLAG_CHANGE_MASTER) {
+ /* non-readdir with CHANGE_MASTER should have no readonly_node */
+ if (lockres->readonly_node != OCFS_INVALID_NODE_NUM) {
+ LOG_ERROR_ARGS("change_master but readonly_node was %d\n",
+ lockres->readonly_node);
+ lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+ }
+ }
+
+ /* TODO: take this out when all is ok */
+ if (flags & FLAG_READDIR) {
+ /* only send a message with FLAG_READDIR in it if
+ * the recipient already has a cachelock but is not
+ * currently set as the readonly_node */
+ OCFS_ASSERT(lockres->master_node_num != osb->node_num);
+ OCFS_ASSERT(lockres->master_node_num != OCFS_INVALID_NODE_NUM);
+ OCFS_ASSERT(lockres->readonly_node == OCFS_INVALID_NODE_NUM);
+ OCFS_ASSERT(lockres->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK);
+ }
+
+
+ if (flags & (FLAG_CHANGE_MASTER | FLAG_REMASTER)) {
+ /* on a master change... */
+ need_to_zap_buffers = true; /* need to dump local buffers */
+ need_lock_write = true; /* and rewrite the lock */
+ } else if (flags & FLAG_ADD_OIN_MAP) {
+ need_lock_write = false;
+ } else if (flags & FLAG_READDIR) {
+ need_lock_write = false;
+ need_to_zap_buffers = true;
+ } else {
+ fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(bh); /* read */
+ /* may not need to rewrite the lock later if we already have a cachelock */
+ need_lock_write = !(DISK_LOCK_CURRENT_MASTER (fe) == osb->node_num &&
+ DISK_LOCK_FILE_LOCK (fe) == OCFS_DLM_ENABLE_CACHE_LOCK);
+ OCFS_BH_PUT_DATA(bh);
+ }
+
+ /* that's why it's called fast path */
+ if (flags & FLAG_FAST_PATH_LOCK)
+ goto vote_success;
+
+
+
+ /* figure out who to vote with */
+ if (flags & (FLAG_REMASTER | FLAG_FILE_DELETE | FLAG_FILE_RENAME))
+ vote_map = osb->publ_map; /* broadcast */
+ else {
+ vote_map = (1 << lockres->master_node_num); /* just owner */
+ lock_type = lockres->lock_type;
+ }
+ vote_map &= ~(1 << osb->node_num); // remove this node
+ if (vote_map == 0) {
+ /* As this is the only node alive, make it master of the lock */
+ /* no need to update open map */
+ lock_write_flags &= ~(DLOCK_FLAG_OPEN_MAP);
+ goto vote_success;
+ }
+
+
+
+ /* net voting */
+ if (comm_voting && !*disk_vote) {
+ LOG_TRACE_STR ("Network vote");
+ status = ocfs_send_dlm_request_msg (osb, lock_id, lock_type, flags, lockres, vote_map);
+ if (status >= 0) {
+ status = lockres->vote_status;
+ if (status >= 0)
+ goto vote_success;
+ else
+ goto bail;
+ } else if (status == -ETIMEDOUT) {
+ LOG_TRACE_STR ("Network voting timed out");
+ }
+ else
+ LOG_ERROR_STATUS (status);
+ lockres->vote_state = 0;
+ }
+
+
+
+ /* disk voting */
+ LOG_TRACE_STR ("Disk vote");
+ *disk_vote = true;
+ status = ocfs_request_vote (osb, lock_id, lock_type, flags, vote_map, &lockseqnum, inode);
+ if (status < 0) {
+ if (status != -EAGAIN)
+ LOG_ERROR_STATUS (status);
+ goto bail;
+ }
+
+ status = ocfs_wait_for_vote (osb, lock_id, lock_type, flags, vote_map, 5000, lockseqnum, lockres);
+ if (status < 0) {
+ if (status != -EAGAIN)
+ LOG_ERROR_STATUS (status);
+ goto bail;
+ }
+
+vote_success:
+ if (need_to_zap_buffers)
+ ocfs_break_cache_lock_zap_buffers(osb, inode);
+
+ /* just alerting owner on open */
+ if (flags & FLAG_ADD_OIN_MAP)
+ goto bail;
+
+ /* converted cachelock to readonly cachelock */
+ if (flags & FLAG_READDIR) {
+ lockres->readonly_node = lockres->master_node_num;
+ goto bail;
+ }
+
+ /* update the lockres */
+ printk("new_lock_function: set lockid=%u.%u, locktype=%d->%d, master=%d->%d\n",
+ lockres->sector_num, lockres->lock_type, requested_lock,
+ lockres->master_node_num, osb->node_num);
+ lockres->master_node_num = osb->node_num;
+ lockres->lock_type = requested_lock;
+
+ /* update the disk lock */
+ if (need_lock_write) {
+ status = ocfs_update_disk_lock (osb, lockres, lock_write_flags, &bh, inode);
+ if (status < 0)
+ LOG_ERROR_STATUS (status);
+ }
+
+ fe = (ocfs_file_entry *)OCFS_BH_GET_DATA_READ(bh); /* read */
+ lockres->oin_openmap = DISK_LOCK_OIN_MAP (fe);
+ OCFS_BH_PUT_DATA(bh);
+
+bail:
+ if (*disk_vote) {
+ tmpstat = ocfs_reset_voting (osb);
+ if (tmpstat < 0)
+ LOG_ERROR_STATUS (tmpstat);
+ }
+
+ /* if we removed FLAG_READDIR above, set the readonly_node now */
+ if (is_readdir && !(flags & FLAG_READDIR)) {
+ lockres->readonly_node = lockres->master_node_num;
+ }
+
+ ocfs_release_lockres (lockres);
+
+ LOG_EXIT_STATUS (status);
+ return status;
+}
+
+static int _ocfs_wait_for_readonly_drop(ocfs_super *osb, ocfs_lock_res *lockres);
+
+static inline int ocfs_wait_for_readonly_drop(ocfs_super *osb, ocfs_lock_res *lockres)
+{
+ if (lockres->readonly_map == 0ULL)
+ return 0;
+ return _ocfs_wait_for_readonly_drop(osb, lockres);
+}
+
+#define READONLY_DROP_TRIES 5
+static int _ocfs_wait_for_readonly_drop(ocfs_super *osb, ocfs_lock_res *lockres)
+{
+ int tries = 0;
+ int status = 0;
+
+ LOG_ENTRY();
+
+ if (lockres->readonly_map != 0ULL) {
+ // if this node is the owner, need to alert all nodes
+ // in map, set map to 0, ro_node=-1, continue as if normal cache lock
+
+ // if there is a readonly_map, we had better be the owner
+ OCFS_ASSERT(lockres->readonly_node == osb->node_num);
+ if (!lockres->readonly_dropping) {
+ ocfs_get_lockres(lockres);
+ status = ocfs_drop_readonly_cache_lock(osb, lockres);
+ if (status < 0) {
+ LOG_ERROR_STATUS (status);
+ ocfs_release_lockres (lockres);
+ goto exit;
+ }
+ }
+ while (tries < READONLY_DROP_TRIES) {
+ if (lockres->readonly_node != osb->node_num) {
+ if (lockres->readonly_map != 0ULL)
+ LOG_ERROR_STR("readonly_node is not this node, but map is still set");
+ lockres->readonly_map = 0ULL;
+ status = -EAGAIN;
+ goto exit;
+ } else {
+ OCFS_ASSERT(lockres->master_node_num == osb->node_num);
+ if (lockres->readonly_map == 0ULL) {
+ lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+ status = -EAGAIN;
+ goto exit;
+ }
+ }
+
+ ocfs_release_lockres(lockres);
+ ocfs_sleep (OCFS_NM_HEARTBEAT_TIME / 10);
+ ocfs_acquire_lockres(lockres);
+ }
+
+ // not good. could not get everyone to release in time.
+ // ????: what do we do here?!
+ ocfs_release_lockres(lockres);
+ status = -ETIMEDOUT;
+ }
+exit:
+
+ LOG_EXIT_STATUS(status);
+ return status;
+}
Modified: trunk/src/file.c
===================================================================
--- trunk/src/file.c 2004-03-18 22:41:21 UTC (rev 790)
+++ trunk/src/file.c 2004-03-19 07:37:42 UTC (rev 791)
@@ -150,8 +150,11 @@
}
/* Look on the disk now ... */
+ down(&parent->i_sem);
status = ocfs_find_files_on_disk (osb, parent_off, &(dentry->d_name),
&fe_bh, NULL, parent, true);
+ up(&parent->i_sem);
+
if (status >= 0) {
oin = NULL;
ocfs_down_sem (&(osb->osb_res), true);
@@ -665,7 +668,7 @@
ocfs_get_lockres (lockres);
if ((lockres->master_node_num != osb->node_num) ||
- (lockres->lock_state != OCFS_DLM_ENABLE_CACHE_LOCK)) {
+ (lockres->lock_type != OCFS_DLM_ENABLE_CACHE_LOCK)) {
oin->cache_enabled = false;
} else {
oin->cache_enabled = true;
Modified: trunk/src/inc/ocfs.h
===================================================================
--- trunk/src/inc/ocfs.h 2004-03-18 22:41:21 UTC (rev 790)
+++ trunk/src/inc/ocfs.h 2004-03-19 07:37:42 UTC (rev 791)
@@ -138,7 +138,9 @@
ADD_OIN_MAP, // add requestor into oin map
NOT_MASTER, // I am not master, retry
REMASTER_THIS, // remaster lock to me
- REMASTER_REQUESTOR // remaster lock to requestor
+ REMASTER_REQUESTOR, // remaster lock to requestor
+ DROP_READONLY, // RO cachelock needs to convert to RW
+ READONLY // a RW or RO cachelock, requesting RO
};
enum {
@@ -335,8 +337,8 @@
#define FLAG_CHANGE_MASTER 0x00000400
#define FLAG_ADD_OIN_MAP 0x00000800
#define FLAG_DIR 0x00001000
-#define FLAG_FILE_UNUSED3 0x00002000
-#define FLAG_FILE_UNUSED4 0x00004000
+#define FLAG_REMASTER 0x00002000
+#define FLAG_FAST_PATH_LOCK 0x00004000
#define FLAG_FILE_UNUSED5 0x00008000
#define FLAG_FILE_UNUSED6 0x00010000
#define FLAG_DEL_NAME 0x00020000
@@ -350,8 +352,8 @@
#define FLAG_FILE_UNUSED12 0x02000000
#define FLAG_FILE_UNUSED13 0x04000000
#define FLAG_FILE_TRUNCATE 0x08000000
-#define FLAG_FILE_UNUSED14 0x10000000
-#define FLAG_FILE_UNUSED15 0x20000000
+#define FLAG_DROP_READONLY 0x10000000
+#define FLAG_READDIR 0x20000000
#define FLAG_ACQUIRE_LOCK 0x40000000
#define FLAG_RELEASE_LOCK 0x80000000
@@ -1831,6 +1833,9 @@
__u32 writer_node_num;
__u32 reader_node_num;
__u32 lock_holders;
+ bool readonly_dropping;
+ __u32 readonly_node;
+ __u64 readonly_map;
};
struct _ocfs_inode
Modified: trunk/src/inc/proto.h
===================================================================
--- trunk/src/inc/proto.h 2004-03-18 22:41:21 UTC (rev 790)
+++ trunk/src/inc/proto.h 2004-03-19 07:37:42 UTC (rev 791)
@@ -51,6 +51,7 @@
int ocfs_init_dlm (void);
void ocfs_process_one_vote_reply(ocfs_super *osb, ocfs_vote_reply_ctxt *ctxt, __u32 node_num);
int ocfs_break_cache_lock_zap_buffers(ocfs_super * osb, struct inode * inode);
+int ocfs_send_readonly_drop_message(ocfs_super *osb, ocfs_lock_res *lockres, __u64 vote_map);
int ocfs_create_log_extent_map (ocfs_super * osb, __u64 diskOffset, __u64 ByteCount);
@@ -208,6 +209,7 @@
int ocfs_recv_udp_msg (ocfs_recv_ctxt * recv_ctxt);
int ocfs_send_dismount_msg (ocfs_super * osb, __u64 vote_map);
int ocfs_send_vote_reply (ocfs_super * osb, ocfs_dlm_msg * dlm_msg, __u32 vote_status, bool inode_open);
+int ocfs_drop_readonly_cache_lock(ocfs_super *osb, ocfs_lock_res *lockres);
void ocfs_initialize_bitmap (ocfs_alloc_bm * bitmap, __u32 validbits, __u32 allocbits);
Modified: trunk/src/namei.c
===================================================================
--- trunk/src/namei.c 2004-03-18 22:41:21 UTC (rev 790)
+++ trunk/src/namei.c 2004-03-19 07:37:42 UTC (rev 791)
@@ -273,10 +273,19 @@
fe = (ocfs_file_entry *) OCFS_BH_GET_DATA_READ(new_fe_bh); /* read */
- /* is this safe if we no longer have it locked? */
if (oin->lock_res != NULL) {
- oin->lock_res->master_node_num = DISK_LOCK_CURRENT_MASTER (fe);
- oin->lock_res->lock_state = DISK_LOCK_FILE_LOCK (fe);
+ ocfs_lock_res *lockres = oin->lock_res;
+ ocfs_acquire_lockres(lockres);
+ lockres->master_node_num = DISK_LOCK_CURRENT_MASTER (fe);
+ lockres->lock_type = DISK_LOCK_FILE_LOCK (fe);
+ if (lockres->readonly_node != OCFS_INVALID_NODE_NUM &&
+ lockres->readonly_node != lockres->master_node_num) {
+ LOG_ERROR_ARGS("no longer readonly! ronode=%d, master=%d, lockid=%u.%u\n",
+ lockres->readonly_node, lockres->master_node_num,
+ lockres->sector_num);
+ lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+ }
+ ocfs_release_lockres(lockres);
}
/* Insert the OFile on the OIN list */
Modified: trunk/src/nm.c
===================================================================
--- trunk/src/nm.c 2004-03-18 22:41:21 UTC (rev 790)
+++ trunk/src/nm.c 2004-03-19 07:37:42 UTC (rev 791)
@@ -37,22 +37,30 @@
static int ocfs_search_commited(ocfs_super *osb, ocfs_lock_res *lockres);
static int ocfs_schedule_process_vote(ocfs_super *osb, struct buffer_head *bh, int vote_node);
+static int _ocfs_drop_readonly_cache_lock(void *arg);
+
+typedef struct _ocfs_ro_cache_drop_ctxt
+{
+ ocfs_super *osb;
+ ocfs_lock_res *lockres;
+} ocfs_ro_cache_drop_ctxt;
+
+
void ocfs_process_vote_worker(void *val);
-#ifdef VERBOSE_PROCESS_VOTE
static const char *process_vote_strings[] = {
"INVALID_REQUEST", // reply with a NO vote
"UPDATE_OIN_INODE", // update both oin and inode
- "UPDATE_INODE", // no oin, so only update inode
+ "UPDATE_INODE", // no oin, so only update inode
"DELETE_RENAME", // delete or rename request (EX)
"RELEASE_CACHE", // release a cache lock I hold
"CHANGE_MASTER", // request to change master to requestor
"ADD_OIN_MAP", // add requestor into oin map
"NOT_MASTER", // I am not master, retry
"REMASTER_THIS", // remaster lock to me
- "REMASTER_REQUESTOR" // remaster lock to requestor
+ "REMASTER_REQUESTOR", // remaster lock to requestor
+ "DROP_READONLY" // RO cachelock needs to convert to RW
};
-#endif
/*
* ocfs_recv_thread()
@@ -485,6 +493,15 @@
lock_res->lock_type = DISK_LOCK_FILE_LOCK (fe);
lock_res->master_node_num = DISK_LOCK_CURRENT_MASTER (fe);
lock_res->oin_openmap = DISK_LOCK_OIN_MAP (fe);
+
+ if (lock_res->readonly_node != OCFS_INVALID_NODE_NUM &&
+ lock_res->readonly_node != lock_res->master_node_num) {
+ LOG_ERROR_ARGS("no longer readonly! ronode=%d, master=%d, lockid=%u.%u\n",
+ lock_res->readonly_node, lock_res->master_node_num,
+ lock_res->sector_num);
+ lock_res->readonly_node = OCFS_INVALID_NODE_NUM;
+ }
+
OCFS_BH_PUT_DATA(*bh);
ocfs_release_lockres (lock_res);
@@ -620,8 +637,10 @@
{
int vote_type = INVALID_REQUEST;
bool my_node_wins = false;
+ __u64 lockid = lockres ? lockres->sector_num : 0ULL;
- LOG_ENTRY_ARGS("(status=%d)\n", status);
+ LOG_ENTRY_ARGS("(status=%d, lockid=%u.%u, node_num=%d, flags=%08x)\n", status,
+ HILO(lockid), node_num, flags);
*oin = NULL;
*master_alive = true;
@@ -640,6 +659,18 @@
*oin = lockres->oin;
}
+ if (flags & FLAG_DROP_READONLY) {
+ vote_type = DROP_READONLY;
+ goto done;
+ } else if (flags & FLAG_READDIR) {
+ if (lockres->master_node_num == osb->node_num &&
+ lockres->lock_type == OCFS_DLM_ENABLE_CACHE_LOCK) {
+ vote_type = READONLY;
+ } else
+ vote_type = INVALID_REQUEST;
+ goto done;
+ }
+
if (flags & (FLAG_FILE_DELETE | FLAG_FILE_RENAME))
vote_type = DELETE_RENAME;
else if (flags & FLAG_FILE_RELEASE_CACHE)
@@ -666,7 +697,7 @@
else
vote_type = REMASTER_REQUESTOR;
}
-
+done:
/* the only allowable action if we failed to */
/* get the lockres is a simple inode update */
if (status < 0 && vote_type != UPDATE_INODE) {
@@ -738,11 +769,10 @@
struct inode *inode = NULL;
bool master_alive = true, is_dir = false;
bool is_locked, open_handle;
- int lockflags = 0, in_cache = 0;
+ int lockflags = 0;
bool inc_inode_seq = false;
bool disk_vote = (ctxt->request_method == DISK_VOTE);
bool comm_vote = (ctxt->request_method == COMM_VOTE);
- bool have_trans_lock = false;
bool have_i_sem = false;
ocfs_publish *publish = (disk_vote ? ctxt->u.publish : NULL);
ocfs_dlm_msg *dlm_msg = (comm_vote ? ctxt->u.dlm_msg : NULL);
@@ -814,14 +844,39 @@
vote_type = get_process_vote_action(osb, lockres, node_num, flags,
status, &master_alive, &oin);
-
+
#ifdef VERBOSE_PROCESS_VOTE
printk("(%u) ocfs_process_vote: %s request for lockid: %u.%u, action: %s, type: %s\n", ocfs_getpid(),
flags & FLAG_RELEASE_LOCK ? "RELEASE" :
(flags & FLAG_ACQUIRE_LOCK ? "ACQUIRE" : "MODIFY"), lock_id,
process_vote_strings[vote_type], disk_vote ? "disk vote" : "net vote" );
#endif
+ printk("process_vote: this=%d, master=%d, locktype=%d, flags=%08x, ronode=%d, romap=%08x\n",
+ osb->node_num, lockres->master_node_num, lockres->lock_type, flags,
+ lockres->readonly_node, lockres->readonly_map);
+ /* get_process_vote_action will only allow CHANGE_MASTER, RELEASE_CACHE, and
+ * ADD_OIN_MAP on a CACHE lock held by this node. the CHANGE_MASTER/RELEASE_CACHE
+ * path needs to check the readonly map to see if any nodes need to be updated. this
+ * is not necessary for the ADD_OIN_MAP path since it cannot actually modify any
+ * data or metadata under the lock.
+ */
+
+#if 0
+/* TODO: REMOVEME! */
+if (flags & FLAG_READDIR) {
+ printk("ocfs_process_vote: READDIR %s request for lockid: %u.%u, action: %s, type: %s\n",
+ flags & FLAG_RELEASE_LOCK ? "RELEASE" :
+ (flags & FLAG_ACQUIRE_LOCK ? "ACQUIRE" : "MODIFY"), lock_id,
+ process_vote_strings[vote_type], disk_vote ? "disk vote" : "net vote" );
+} else if (vote_type == DROP_READONLY) {
+ printk("ocfs_process_vote: DROP_READONLY %s request for lockid: %u.%u, action: %s, type: %s\n",
+ flags & FLAG_RELEASE_LOCK ? "RELEASE" :
+ (flags & FLAG_ACQUIRE_LOCK ? "ACQUIRE" : "MODIFY"), lock_id,
+ process_vote_strings[vote_type], disk_vote ? "disk vote" : "net vote" );
+}
+#endif
+
if (inode && (vote_type != DELETE_RENAME)) {
/* Ok, for all operations where we no longer need
* isem, drop it now. */
@@ -969,7 +1024,7 @@
/* Change the master if there is no lock */
if (lockres->master_node_num == osb->node_num &&
- lockres->lock_state <= OCFS_DLM_SHARED_LOCK) {
+ lockres->lock_type < OCFS_DLM_EXCLUSIVE_LOCK) {
/* Change the lock ownership to the node asking for vote */
/* and write new master on the disk */
@@ -998,6 +1053,32 @@
break;
+ case READONLY:
+ LOG_TRACE_STR("READONLY");
+ OCFS_ASSERT(lockres->readonly_node==osb->node_num ||
+ lockres->readonly_node==OCFS_INVALID_NODE_NUM);
+
+ // if the requestor just wants to do readdir, we
+ // drop our buffers, so switch to readonly and done
+ if (inode) {
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
+ sync_mapping_buffers(inode->i_mapping);
+#else
+ fsync_inode_buffers(inode);
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,4,18)
+ fsync_inode_data_buffers(inode);
+#endif
+#endif
+ }
+ printk("do i need a zap buffers here?\n");
+// ocfs_break_cache_lock_zap_buffers(osb, inode);
+ lockres->readonly_map |= (1 << node_num);
+ printk("READONLY: setting ronode, was=%d, now=%d, master=%d\n", lockres->readonly_node, osb->node_num, lockres->master_node_num);
+ lockres->readonly_node = osb->node_num;
+ vote_response = FLAG_VOTE_NODE;
+ status = 0;
+ break;
+
case RELEASE_CACHE:
case CHANGE_MASTER:
if (vote_type == RELEASE_CACHE)
@@ -1007,66 +1088,48 @@
status = -EFAIL;
- /* If nobody currently owns the lock, then
- * fastpath it. */
- if (lockres->lock_holders == 0)
- goto give_lock;
-
- /* Slow path. We might still be able to give
- * him the lock if it's part of the cache and
- * we can flush it... */
-
- LOG_TRACE_ARGS("Lock id (%u.%u) has %u holders\n",
- HILO(lockres->sector_num),
- lockres->lock_holders);
-
- /* Try to take the trans_lock. We try a couple
- * times, with some sleep just in case a
- * transaction is about to complete. */
- have_trans_lock = false;
- for(i = 0; i < 2; i++) {
- if (down_trylock(&osb->trans_lock) == 0) {
- have_trans_lock = true;
- break;
+ /* requestor will need to retry if anyone is using the lockres */
+ if (lockres->lock_holders > 0) {
+ LOG_TRACE_ARGS("Lock id (%u.%u) has %u holders\n",
+ HILO(lockres->sector_num), lockres->lock_holders);
+ down(&(osb->journal.commit_sem));
+ if (ocfs_search_commited(osb, lockres)) {
+ // kick the commit thread
+ atomic_set (&osb->flush_event_woken, 1);
+ wake_up (&osb->flush_event);
}
- ocfs_sleep(100);
- }
-
- /* We couldn't get the trans_lock. There's no
- * point in going any further. */
- if (!have_trans_lock) {
- LOG_TRACE_STR("FLAG_VOTE_UPDATE_RETRY (2)");
+ up(&(osb->journal.commit_sem));
vote_response = FLAG_VOTE_UPDATE_RETRY;
status = 0;
break;
}
- /* We have the trans_lock! If it's in the
- * commited list, then kick the commit thread
- * and vote RETRY this time. Otherwise, it's
- * currently in use by another transaction. */
- down(&(osb->journal.commit_sem));
- in_cache = ocfs_search_commited(osb, lockres);
- up(&(osb->journal.commit_sem));
-
- if (in_cache) {
- atomic_set (&osb->flush_event_woken, 1);
- wake_up (&osb->flush_event);
+ /* this is currently a readonly cache lock.
+ * need to communicate to all the nodes in the
+ * map that lock will be changing to RW before we
+ * continue. RETRY this request while we spawn
+ * off a thread to collect up the communication */
+ if (lockres->readonly_map != 0ULL) {
+ // assumption: node asking for vote has already dropped readonly_node
+ lockres->readonly_map &= ~(1 << node_num);
+ if (lockres->readonly_map != 0ULL) {
+ OCFS_ASSERT(lockres->readonly_node == osb->node_num);
+ status = 0;
+ if (!lockres->readonly_dropping) {
+ ocfs_get_lockres(lockres);
+ if (ocfs_drop_readonly_cache_lock(osb, lockres) < 0) {
+ LOG_ERROR_STATUS(status = -ENOMEM);
+ ocfs_put_lockres(lockres);
+ }
+ }
+ vote_response = FLAG_VOTE_UPDATE_RETRY;
+ break;
+ }
+ // noone left in map, so continue
+ printk("noone left in map, so continue...\n");
+ lockres->readonly_node = OCFS_INVALID_NODE_NUM;
}
- up(&osb->trans_lock);
- /* Ok, either we couldn't find it in the
- * cache, or it became busy again while we
- * were dumping cache. */
- LOG_TRACE_STR("FLAG_VOTE_UPDATE_RETRY (3)");
- vote_response = FLAG_VOTE_UPDATE_RETRY;
- status = 0;
- break;
-
-give_lock:
- if (vote_type == CHANGE_MASTER)
- lockres->master_node_num = node_num;
-
if (inode) {
#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,0)
sync_mapping_buffers(inode->i_mapping);
@@ -1077,8 +1140,15 @@
#endif
#endif
}
+
+ /* nobody currently owns the lock so fastpath it */
+ if (vote_type == CHANGE_MASTER)
+ lockres->master_node_num = node_num;
+
+
if (oin != NULL) {
- lockres->lock_type = lockres->lock_state = OCFS_DLM_NO_LOCK;
+ printk("setting locktype to nolock\n");
+ lockres->lock_type = OCFS_DLM_NO_LOCK;
lockres->cache_lock_held = false;
}
@@ -1115,8 +1185,10 @@
brelse(fe_bh);
break;
}
- if (vote_type == RELEASE_CACHE)
- lockres->lock_type = lockres->lock_state = OCFS_DLM_NO_LOCK;
+ if (vote_type == RELEASE_CACHE) {
+ printk("setting locktype to nolock\n");
+ lockres->lock_type = OCFS_DLM_NO_LOCK;
+ }
else // CHANGE_MASTER
lockres->master_node_num = node_num;
} else {
@@ -1162,6 +1234,35 @@
}
brelse(fe_bh);
break;
+
+ case DROP_READONLY:
+ /* TODO: may need locking in here to lock out
+ * the actual IO that a readdir may have in
+ * progress, if it's possible to have a corrupt
+ * readdir. for now, skip it.
+ * NOTE: can't just take i_sem because lock order
+ * needs to be i_sem->lockres... would have to
+ * drop lockres, take i_sem, take lockres, then
+ * recheck all the conditions to see if still
+ * appropriate, then do the work and drop both.
+ * seems like a lot of work. almost as many lines
+ * of code as there are lines of comments right here.
+ */
+
+ /* this path should always succeed on the vote *
+ * even in the error case. do nothing for error. */
+ if (lockres->master_node_num != node_num ||
+ lockres->lock_type != OCFS_DLM_ENABLE_CACHE_LOCK ||
+ lockres->readonly_map != 0ULL)
+ LOG_ERROR_ARGS("(drop-ro) master=%d node_num=%d locktype=%d map=%08x.%08x ronode=%d\n",
+ lockres->master_node_num, node_num, lockres->lock_type,
+ HILO(lockres->readonly_map), lockres->readonly_node);
+ else
+ lockres->readonly_node = OCFS_INVALID_NODE_NUM;
+
+ status = 0;
+ vote_response = FLAG_VOTE_NODE;
+ break;
case NOT_MASTER:
LOG_TRACE_STR("NOT_MASTER");
@@ -1371,3 +1472,89 @@
LOG_EXIT();
}
+
+int ocfs_drop_readonly_cache_lock(ocfs_super *osb, ocfs_lock_res *lockres)
+{
+ ocfs_ro_cache_drop_ctxt *arg;
+ arg = kmalloc(sizeof(ocfs_ro_cache_drop_ctxt), GFP_KERNEL);
+ if (arg == NULL)
+ return -ENOMEM;
+
+ arg->osb = osb;
+ arg->lockres = lockres;
+
+ kernel_thread(_ocfs_drop_readonly_cache_lock, (void *) arg,
+ CLONE_VM | CLONE_FS | CLONE_FILES);
+ return 0;
+}
+
+static int _ocfs_drop_readonly_cache_lock(void *arg)
+{
+ ocfs_ro_cache_drop_ctxt *ctxt = (ocfs_ro_cache_drop_ctxt *)arg;
+ ocfs_super *osb = ctxt->osb;
+ ocfs_lock_res *lockres = ctxt->lockres;
+ __u64 map;
+ int status = 0;
+
+#define OCFS_DROP_RO_THREAD_NAME "ocfs2dropro"
+
+ ocfs_daemonize (OCFS_DROP_RO_THREAD_NAME, strlen(OCFS_DROP_RO_THREAD_NAME));
+
+ /* this will wait until process_vote gets to the release */
+ ocfs_acquire_lockres(lockres);
+
+ /* check these under the lock */
+ if (lockres->readonly_node != osb->node_num ||
+ lockres->master_node_num != osb->node_num ||
+ lockres->lock_type != OCFS_DLM_ENABLE_CACHE_LOCK) {
+ LOG_ERROR_ARGS("bad RO lockres! this=%d, ro_node=%d, master=%d, locktype=%u\n",
+ osb->node_num, lockres->readonly_node,
+ lockres->master_node_num, lockres->lock_type);
+ status = -EINVAL;
+ goto leave;
+ }
+
+ if (lockres->readonly_dropping) {
+ status = 0;
+ goto leave;
+ }
+
+ lockres->readonly_dropping = true;
+ map = lockres->readonly_map;
+ map &= osb->publ_map; /* remove all dead nodes */
+
+ status = 0;
+ while (map != 0ULL && map != (__u64)(1 << osb->node_num)) {
+ // TODO: need to check all members of the map
+ // in each run thru the loop to see if they died
+ // and eliminate them from the map
+
+ /* cannot hold lockres while waiting for vote */
+ ocfs_release_lockres(lockres);
+
+ status = ocfs_send_readonly_drop_message(osb, lockres, map);
+ if (status >= 0) {
+ ocfs_acquire_lockres(lockres);
+ break;
+ } else if (status != -EAGAIN) {
+ LOG_ERROR_STATUS (status);
+ ocfs_acquire_lockres(lockres);
+ break;
+ }
+
+ /* yes, disgusting. need a waitqueue on lockres */
+ ocfs_sleep (OCFS_NM_HEARTBEAT_TIME / 10);
+ ocfs_acquire_lockres(lockres);
+ map = lockres->readonly_map;
+ map &= osb->publ_map; /* remove all dead nodes */
+ }
+ if (status >= 0)
+ lockres->readonly_map = 0ULL;
+ lockres->readonly_dropping = false;
+
+leave:
+ ocfs_release_lockres(lockres);
+ ocfs_put_lockres(lockres);
+ kfree(arg);
+ return status;
+}
Modified: trunk/src/oin.c
===================================================================
--- trunk/src/oin.c 2004-03-18 22:41:21 UTC (rev 790)
+++ trunk/src/oin.c 2004-03-19 07:37:42 UTC (rev 791)
@@ -47,6 +47,7 @@
struct list_head *iter;
struct list_head *temp_iter;
int disk_len;
+ ocfs_disk_lock dlock; /* ???: is this too much on the stack? */
/* We are setting the oin Updated flag in the end. */
LOG_ENTRY ();
@@ -212,18 +213,30 @@
/* ??? we need to the lock resource before updating it */
if (oin->lock_res) {
- ocfs_get_lockres(oin->lock_res);
+ /* cannot hold bhsem while taking lockres... baaad */
+ memcpy(&dlock, (ocfs_disk_lock *)fe, sizeof(ocfs_disk_lock));
+ OCFS_BH_PUT_DATA(fe_bh);
+ fe = NULL;
pLockRes = oin->lock_res;
- pLockRes->lock_type = DISK_LOCK_FILE_LOCK (fe);
- pLockRes->master_node_num = DISK_LOCK_CURRENT_MASTER (fe);
- pLockRes->oin_openmap = DISK_LOCK_OIN_MAP (fe);
- pLockRes->last_write_time = DISK_LOCK_LAST_WRITE (fe);
- pLockRes->last_read_time = DISK_LOCK_LAST_READ (fe);
- pLockRes->reader_node_num = DISK_LOCK_READER_NODE (fe);
- pLockRes->writer_node_num = DISK_LOCK_WRITER_NODE (fe);
+ ocfs_acquire_lockres(pLockRes);
+ pLockRes->lock_type = DISK_LOCK_FILE_LOCK (&dlock);
+ pLockRes->master_node_num = DISK_LOCK_CURRENT_MASTER (&dlock);
+ pLockRes->oin_openmap = DISK_LOCK_OIN_MAP (&dlock);
+ pLockRes->last_write_time = DISK_LOCK_LAST_WRITE (&dlock);
+ pLockRes->last_read_time = DISK_LOCK_LAST_READ (&dlock);
+ pLockRes->reader_node_num = DISK_LOCK_READER_NODE (&dlock);
+ pLockRes->writer_node_num = DISK_LOCK_WRITER_NODE (&dlock);
- ocfs_put_lockres(oin->lock_res);
+ if (pLockRes->readonly_node != OCFS_INVALID_NODE_NUM &&
+ pLockRes->readonly_node != pLockRes->master_node_num) {
+ LOG_ERROR_ARGS("no longer readonly! ronode=%d, master=%d, lockid=%u.%u\n",
+ pLockRes->readonly_node, pLockRes->master_node_num,
+ pLockRes->sector_num);
+ pLockRes->readonly_node = OCFS_INVALID_NODE_NUM;
+ }
+
+ ocfs_release_lockres(pLockRes);
}
status = 0;
More information about the Ocfs2-commits
mailing list