[Ocfs2-commits] zab commits r2130 - trunk/fs/ocfs2/cluster
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Fri Apr 8 20:47:31 CDT 2005
Author: zab
Signed-off-by: mfasheh
Date: 2005-04-08 20:47:29 -0500 (Fri, 08 Apr 2005)
New Revision: 2130
Modified:
trunk/fs/ocfs2/cluster/heartbeat.c
Log:
o nodes are live as long as *any* hb region sees activity. track region slots
in global state so that we can only call the callbacks when that changes.
this lets us mount multiple volumes on a node.
Signed-off-by: mfasheh
Modified: trunk/fs/ocfs2/cluster/heartbeat.c
===================================================================
--- trunk/fs/ocfs2/cluster/heartbeat.c 2005-04-09 00:16:35 UTC (rev 2129)
+++ trunk/fs/ocfs2/cluster/heartbeat.c 2005-04-09 01:47:29 UTC (rev 2130)
@@ -17,9 +17,6 @@
* License along with this program; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 021110-1307, USA.
- *
- * TODO:
- * - make sure attributes can't be written to after object commital
*/
#include <linux/module.h>
@@ -81,11 +78,14 @@
* from multiple hb region threads.
*/
static DECLARE_RWSEM(hb_callback_sem);
-/*
- * region setup and teardown races with node_fill_map here. We use
- * the callback sem to protect them.
+
+/*
+ * multiple hb threads are watching multiple regions. A node is live
+ * whenever any of the threads sees activity from the node in its region.
*/
-static LIST_HEAD(hb_active_regions);
+static spinlock_t hb_live_lock = SPIN_LOCK_UNLOCKED;
+static struct list_head hb_live_slots[NM_MAX_NODES];
+static unsigned long hb_live_node_bitmap[BITS_TO_LONGS(NM_MAX_NODES)];
static DECLARE_WAIT_QUEUE_HEAD(hb_steady_queue);
@@ -114,8 +114,7 @@
unsigned long ds_last_time;
u16 ds_equal_samples;
u16 ds_changed_samples;
- /* protected by the hr_slot_list_lock */
- struct list_head ds_live_item; /* on alive_list when live */
+ struct list_head ds_live_item;
};
/* each thread owns a region.. when we're asked to tear down the region
@@ -141,9 +140,6 @@
struct page **hr_slot_data;
struct block_device *hr_bdev;
struct hb_disk_slot *hr_slots;
- /* a single hb-thread writer and many fill_node readers are protected */
- rwlock_t hr_slot_list_lock;
- struct list_head hr_live_list;
/* let the person setting up hb wait for it to return until it
* has reached a 'steady' state. This will be fixed when we have
@@ -151,20 +147,6 @@
atomic_t hr_steady_iterations;
};
-static int hb_do_node_down(struct nm_node *node, int idx)
-{
- hbprintk("hb_do_node_down: node=%u\n", node->nd_num);
- hb_do_callbacks(HB_NODE_DOWN_CB, node, idx);
- return 0;
-}
-
-static int hb_do_node_up(struct nm_node *node, int idx)
-{
- hbprintk("hb_do_node_up: node=%u\n", node->nd_num);
- hb_do_callbacks(HB_NODE_UP_CB, node, idx);
- return 0;
-}
-
struct hb_bio_wait_ctxt {
atomic_t wc_num_reqs;
struct completion wc_io_complete;
@@ -455,19 +437,74 @@
hb_block->time = cpu_to_le64(cputime);
}
-static void hb_do_disk_heartbeat(struct hb_region *reg)
+static int hb_check_slot(struct hb_disk_slot *slot)
{
+ int type = HB_NUM_CB;
+ hb_disk_heartbeat_block *hb_block = slot->ds_raw_block;
u64 cputime;
- int i, ret;
- struct nm_node *node;
- struct hb_disk_slot *slot;
- struct list_head *pos, *tmp;
- hb_disk_heartbeat_block *hb_block;
- /* only need to worry about locking when we touch the reg lists
- * which fill_node_map sees. otherwise only we touch these
- * lists and the slot items */
- LIST_HEAD(newborn);
- LIST_HEAD(deceased);
+
+ /* we don't care if these wrap.. the state transitions below
+ * clear at the right places */
+ cputime = le64_to_cpu(hb_block->time);
+ if (slot->ds_last_time != cputime)
+ slot->ds_changed_samples++;
+ else
+ slot->ds_equal_samples++;
+ slot->ds_last_time = cputime;
+
+ spin_lock(&hb_live_lock);
+ /* dead nodes only come to life after some number of
+ * changes at any time during their dead time */
+ if (list_empty(&slot->ds_live_item) &&
+ slot->ds_changed_samples >= HB_LIVE_THRESHOLD) {
+ /* first on the list generates a callback */
+ if (list_empty(&hb_live_slots[slot->ds_node_num]))
+ type = HB_NODE_UP_CB;
+ list_add_tail(&slot->ds_live_item,
+ &hb_live_slots[slot->ds_node_num]);
+
+ set_bit(slot->ds_node_num, hb_live_node_bitmap);
+ slot->ds_equal_samples = 0;
+ goto out;
+ }
+
+ /* if the list is dead, we're done.. */
+ if (list_empty(&slot->ds_live_item))
+ goto out;
+
+ /* live nodes only go dead after enough consequtive missed
+ * samples.. reset the missed counter whenever we see
+ * activity */
+ if (slot->ds_equal_samples >= HB_DEAD_THRESHOLD) {
+ /* last off the live_slot generates a callback */
+ list_del_init(&slot->ds_live_item);
+ if (list_empty(&hb_live_slots[slot->ds_node_num]))
+ type = HB_NODE_DOWN_CB;
+
+ clear_bit(slot->ds_node_num, hb_live_node_bitmap);
+ slot->ds_changed_samples = 0;
+ goto out;
+ }
+ if (slot->ds_changed_samples) {
+ slot->ds_changed_samples = 0;
+ slot->ds_equal_samples = 0;
+ }
+out:
+ spin_unlock(&hb_live_lock);
+ if (type != HB_NUM_CB) {
+ struct nm_node *node = nm_get_node_by_num(slot->ds_node_num);
+ if (node) {
+ hb_do_callbacks(type, node, slot->ds_node_num);
+ nm_node_put(node);
+ }
+ return 1;
+ }
+ return 0;
+}
+
+static void hb_do_disk_heartbeat(struct hb_region *reg)
+{
+ int i, ret, change = 0;
unsigned long configured_nodes[BITS_TO_LONGS(NM_MAX_NODES)];
struct bio *write_bio;
struct hb_bio_wait_ctxt write_wc;
@@ -501,85 +538,16 @@
i = -1;
while((i = find_next_bit(configured_nodes, NM_MAX_NODES, i + 1)) < NM_MAX_NODES) {
- slot = ®->hr_slots[i];
- hb_block = slot->ds_raw_block;
- /* we don't care if these wrap.. the state transitions below
- * clear at the right places */
- cputime = le64_to_cpu(hb_block->time);
- if (slot->ds_last_time != cputime)
- slot->ds_changed_samples++;
- else
- slot->ds_equal_samples++;
- slot->ds_last_time = cputime;
-
- /* dead nodes only come to life after some number of
- * changes at any time during their dead time */
- if (list_empty(&slot->ds_live_item) &&
- slot->ds_changed_samples >= HB_LIVE_THRESHOLD) {
- list_add_tail(&slot->ds_live_item, &newborn);
- slot->ds_equal_samples = 0;
- continue;
- }
-
- /* live nodes only go dead after enough consequtive missed
- * samples.. reset the missed counter whenever we see
- * activity */
- if (!list_empty(&slot->ds_live_item)) {
- if (slot->ds_equal_samples >= HB_DEAD_THRESHOLD) {
- hbprintk("node %d JUST DIED!!!!\n", i);
- list_move(&slot->ds_live_item, &deceased);
- slot->ds_changed_samples = 0;
- continue;
- }
- if (slot->ds_changed_samples) {
- slot->ds_changed_samples = 0;
- slot->ds_equal_samples = 0;
- }
- }
+ change |= hb_check_slot(®->hr_slots[i]);
}
/* let the person who launched us know when things are steady */
- if (list_empty(&newborn) && list_empty(&deceased) &&
- (atomic_read(®->hr_steady_iterations) != 0)) {
+ if (!change && (atomic_read(®->hr_steady_iterations) != 0)) {
if (atomic_dec_and_test(®->hr_steady_iterations))
wake_up(&hb_steady_queue);
}
- /* record our new live guys in the live list and call callbacks */
- list_for_each_safe(pos, tmp, &newborn) {
- slot = list_entry(pos, struct hb_disk_slot, ds_live_item);
-
- write_lock(®->hr_slot_list_lock);
- list_move(&slot->ds_live_item, ®->hr_live_list);
- write_unlock(®->hr_slot_list_lock);
-
- node = nm_get_node_by_num(slot->ds_node_num);
- if (node == NULL) {
- hbprintk("saw hb for node %d but don't have a node\n",
- slot->ds_node_num);
- continue;
- }
- hb_do_node_up(node, slot->ds_node_num);
- nm_node_put(node);
- }
-
- /* drop our temporary live_item linkage and call callbacks */
- list_for_each_safe(pos, tmp, &deceased) {
- slot = list_entry(pos, struct hb_disk_slot, ds_live_item);
-
- list_del_init(&slot->ds_live_item);
-
- node = nm_get_node_by_num(slot->ds_node_num);
- if (node == NULL) {
- hbprintk("node %d went down but don't have a node\n",
- slot->ds_node_num);
- continue;
- }
- hb_do_node_down(node, slot->ds_node_num);
- nm_node_put(node);
- }
-
/* Make sure the write hits disk before we return. */
hb_wait_on_io(reg, &write_wc);
bio_put(write_bio);
@@ -615,6 +583,9 @@
INIT_LIST_HEAD(&hb_callbacks[i].list);
init_MUTEX(&hb_callbacks[i].sem);
}
+
+ for (i = 0; i < ARRAY_SIZE(hb_live_slots); i++)
+ INIT_LIST_HEAD(&hb_live_slots[i]);
}
/*
@@ -622,24 +593,14 @@
*/
void hb_fill_node_map(unsigned long *map, unsigned bytes)
{
- struct hb_region *reg;
- struct hb_disk_slot *slot;
-
BUG_ON(bytes < (BITS_TO_LONGS(NM_MAX_NODES) * sizeof(unsigned long)));
- memset(map, 0, bytes);
-
/* callers want to serialize this map and callbacks so that they
* can trust that they don't miss nodes coming to the party */
down_read(&hb_callback_sem);
-
- list_for_each_entry(reg, &hb_active_regions, hr_active_item) {
- read_lock(®->hr_slot_list_lock);
- list_for_each_entry(slot, ®->hr_live_list, ds_live_item)
- set_bit(slot->ds_node_num, map);
- read_unlock(®->hr_slot_list_lock);
- }
-
+ spin_lock(&hb_live_lock);
+ memcpy(map, &hb_live_node_bitmap, bytes);
+ spin_unlock(&hb_live_lock);
up_read(&hb_callback_sem);
}
EXPORT_SYMBOL(hb_fill_node_map);
@@ -977,10 +938,6 @@
goto out;
}
- down_write(&hb_callback_sem);
- list_add_tail(®->hr_active_item, &hb_active_regions);
- up_write(&hb_callback_sem);
-
ret = wait_event_interruptible(hb_steady_queue,
atomic_read(®->hr_steady_iterations) == 0);
if (ret == 0)
@@ -1105,8 +1062,6 @@
goto out; /* ENOMEM */
INIT_LIST_HEAD(®->hr_active_item);
- rwlock_init(®->hr_slot_list_lock);
- INIT_LIST_HEAD(®->hr_live_list);
hb_init_region_params(reg, HB_DEFAULT_BLOCK_BITS);
config_item_init_type_name(®->hr_item, name, &hb_region_type);
@@ -1271,6 +1226,9 @@
hbprintk("missed hb callback(%d) due to EINTR!\n", type);
goto out;
}
+
+ hbprintk("calling %d callbacks for node %s (%u) on slot %d\n",
+ type, node->nd_name, node->nd_num, idx);
list_for_each(iter, &hbcall->list) {
f = list_entry(iter, struct hb_callback_func, hc_item);
More information about the Ocfs2-commits
mailing list