[Ocfs2-commits] zab commits r2130 - trunk/fs/ocfs2/cluster

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Fri Apr 8 20:47:31 CDT 2005


Author: zab
Signed-off-by: mfasheh
Date: 2005-04-08 20:47:29 -0500 (Fri, 08 Apr 2005)
New Revision: 2130

Modified:
   trunk/fs/ocfs2/cluster/heartbeat.c
Log:
o nodes are live as long as *any* hb region sees activity.  track region slots
  in global state so that we can only call the callbacks when that changes.
  this lets us mount multiple volumes on a node.

Signed-off-by: mfasheh


Modified: trunk/fs/ocfs2/cluster/heartbeat.c
===================================================================
--- trunk/fs/ocfs2/cluster/heartbeat.c	2005-04-09 00:16:35 UTC (rev 2129)
+++ trunk/fs/ocfs2/cluster/heartbeat.c	2005-04-09 01:47:29 UTC (rev 2130)
@@ -17,9 +17,6 @@
  * License along with this program; if not, write to the
  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
  * Boston, MA 021110-1307, USA.
- *
- * TODO:
- * 	- make sure attributes can't be written to after object commital 
  */
 
 #include <linux/module.h>
@@ -81,11 +78,14 @@
  * from multiple hb region threads.
  */
 static DECLARE_RWSEM(hb_callback_sem);
-/*
- * region setup and teardown races with node_fill_map here.  We use
- * the callback sem to protect them.
+
+/* 
+ * multiple hb threads are watching multiple regions.  A node is live 
+ * whenever any of the threads sees activity from the node in its region.
  */
-static LIST_HEAD(hb_active_regions);
+static spinlock_t hb_live_lock = SPIN_LOCK_UNLOCKED;
+static struct list_head hb_live_slots[NM_MAX_NODES];
+static unsigned long hb_live_node_bitmap[BITS_TO_LONGS(NM_MAX_NODES)];
 
 static DECLARE_WAIT_QUEUE_HEAD(hb_steady_queue);
 
@@ -114,8 +114,7 @@
 	unsigned long		ds_last_time;
 	u16			ds_equal_samples;
 	u16			ds_changed_samples;
-	/* protected by the hr_slot_list_lock */ 
-	struct list_head	ds_live_item; /* on alive_list when live */
+	struct list_head	ds_live_item;
 };
 
 /* each thread owns a region.. when we're asked to tear down the region
@@ -141,9 +140,6 @@
 	struct page             **hr_slot_data;
 	struct block_device	*hr_bdev;
 	struct hb_disk_slot	*hr_slots;
-	/* a single hb-thread writer and many fill_node readers are protected */
-	rwlock_t		hr_slot_list_lock;
-	struct list_head	hr_live_list;
 
 	/* let the person setting up hb wait for it to return until it
 	 * has reached a 'steady' state.  This will be fixed when we have
@@ -151,20 +147,6 @@
 	atomic_t		hr_steady_iterations;
 };
 
-static int hb_do_node_down(struct nm_node *node, int idx)
-{
-	hbprintk("hb_do_node_down:  node=%u\n", node->nd_num);
-	hb_do_callbacks(HB_NODE_DOWN_CB, node, idx);
-	return 0;
-}
-
-static int hb_do_node_up(struct nm_node *node, int idx)
-{
-	hbprintk("hb_do_node_up: node=%u\n", node->nd_num);
-	hb_do_callbacks(HB_NODE_UP_CB, node, idx);
-	return 0;
-}
-
 struct hb_bio_wait_ctxt {
 	atomic_t          wc_num_reqs;
 	struct completion wc_io_complete;
@@ -455,19 +437,74 @@
 	hb_block->time = cpu_to_le64(cputime);
 }
 
-static void hb_do_disk_heartbeat(struct hb_region *reg)
+static int hb_check_slot(struct hb_disk_slot *slot)
 {
+	int type = HB_NUM_CB;
+	hb_disk_heartbeat_block *hb_block = slot->ds_raw_block;
 	u64 cputime;
-	int i, ret;
-	struct nm_node *node;
-	struct hb_disk_slot *slot;
-	struct list_head *pos, *tmp;
-	hb_disk_heartbeat_block *hb_block;
-	/* only need to worry about locking when we touch the reg lists
-	 * which fill_node_map sees.  otherwise only we touch these
-	 * lists and the slot items */
-	LIST_HEAD(newborn);
-	LIST_HEAD(deceased);
+
+	/* we don't care if these wrap.. the state transitions below
+	 * clear at the right places */
+	cputime = le64_to_cpu(hb_block->time);
+	if (slot->ds_last_time != cputime)
+		slot->ds_changed_samples++;
+	else
+		slot->ds_equal_samples++;
+	slot->ds_last_time = cputime;
+
+	spin_lock(&hb_live_lock);
+	/* dead nodes only come to life after some number of 
+	 * changes at any time during their dead time */
+	if (list_empty(&slot->ds_live_item) &&
+	    slot->ds_changed_samples >= HB_LIVE_THRESHOLD) {
+		/* first on the list generates a callback */
+		if (list_empty(&hb_live_slots[slot->ds_node_num]))
+			type = HB_NODE_UP_CB;
+		list_add_tail(&slot->ds_live_item,
+			      &hb_live_slots[slot->ds_node_num]);
+
+		set_bit(slot->ds_node_num, hb_live_node_bitmap);
+		slot->ds_equal_samples = 0;
+		goto out;
+	}
+
+	/* if the list is dead, we're done.. */
+	if (list_empty(&slot->ds_live_item))
+		goto out;
+
+	/* live nodes only go dead after enough consequtive missed
+	 * samples..  reset the missed counter whenever we see 
+	 * activity */
+	if (slot->ds_equal_samples >= HB_DEAD_THRESHOLD) {
+		/* last off the live_slot generates a callback */
+		list_del_init(&slot->ds_live_item);
+		if (list_empty(&hb_live_slots[slot->ds_node_num]))
+			type = HB_NODE_DOWN_CB;
+
+		clear_bit(slot->ds_node_num, hb_live_node_bitmap);
+		slot->ds_changed_samples = 0;
+		goto out;
+	}
+	if (slot->ds_changed_samples) {
+		slot->ds_changed_samples = 0;
+		slot->ds_equal_samples = 0;
+	}
+out:
+	spin_unlock(&hb_live_lock);
+	if (type != HB_NUM_CB) {
+		struct nm_node *node = nm_get_node_by_num(slot->ds_node_num);
+		if (node) {
+			hb_do_callbacks(type, node, slot->ds_node_num);
+			nm_node_put(node);
+		}
+		return 1;
+	}
+	return 0;
+}
+
+static void hb_do_disk_heartbeat(struct hb_region *reg)
+{
+	int i, ret, change = 0;
 	unsigned long configured_nodes[BITS_TO_LONGS(NM_MAX_NODES)];
 	struct bio *write_bio;
 	struct hb_bio_wait_ctxt write_wc;
@@ -501,85 +538,16 @@
 
 	i = -1;
 	while((i = find_next_bit(configured_nodes, NM_MAX_NODES, i + 1)) < NM_MAX_NODES) {
-		slot = &reg->hr_slots[i];
-		hb_block = slot->ds_raw_block;
 
-		/* we don't care if these wrap.. the state transitions below
-		 * clear at the right places */
-		cputime = le64_to_cpu(hb_block->time);
-		if (slot->ds_last_time != cputime)
-			slot->ds_changed_samples++;
-		else
-			slot->ds_equal_samples++;
-		slot->ds_last_time = cputime;
-
-		/* dead nodes only come to life after some number of 
-		 * changes at any time during their dead time */
-		if (list_empty(&slot->ds_live_item) &&
-		    slot->ds_changed_samples >= HB_LIVE_THRESHOLD) {
-			list_add_tail(&slot->ds_live_item, &newborn);
-			slot->ds_equal_samples = 0;
-			continue;
-		}
-
-		/* live nodes only go dead after enough consequtive missed
-		 * samples..  reset the missed counter whenever we see 
-		 * activity */
-		if (!list_empty(&slot->ds_live_item)) {
-			if (slot->ds_equal_samples >= HB_DEAD_THRESHOLD) {
-				hbprintk("node %d JUST DIED!!!!\n", i);
-				list_move(&slot->ds_live_item, &deceased);
-				slot->ds_changed_samples = 0;
-				continue;
-			}
-			if (slot->ds_changed_samples) {
-				slot->ds_changed_samples = 0;
-				slot->ds_equal_samples = 0;
-			}
-		}
+		change |= hb_check_slot(&reg->hr_slots[i]);
 	}
 
 	/* let the person who launched us know when things are steady */
-	if (list_empty(&newborn) && list_empty(&deceased) &&
-	    (atomic_read(&reg->hr_steady_iterations) != 0)) {
+	if (!change && (atomic_read(&reg->hr_steady_iterations) != 0)) {
 		if (atomic_dec_and_test(&reg->hr_steady_iterations))
 			wake_up(&hb_steady_queue);
 	}
 
-	/* record our new live guys in the live list and call callbacks */
-	list_for_each_safe(pos, tmp, &newborn) {
-		slot = list_entry(pos, struct hb_disk_slot, ds_live_item);
-
-		write_lock(&reg->hr_slot_list_lock);
-		list_move(&slot->ds_live_item, &reg->hr_live_list);
-		write_unlock(&reg->hr_slot_list_lock);
-
-		node = nm_get_node_by_num(slot->ds_node_num);
-		if (node == NULL) {
-			hbprintk("saw hb for node %d but don't have a node\n",
-				 slot->ds_node_num);
-			continue;	
-		}
-		hb_do_node_up(node, slot->ds_node_num);
-		nm_node_put(node);
-	}
-
-	/* drop our temporary live_item linkage and call callbacks */ 
-	list_for_each_safe(pos, tmp, &deceased) {
-		slot = list_entry(pos, struct hb_disk_slot, ds_live_item);
-
-		list_del_init(&slot->ds_live_item);
-
-		node = nm_get_node_by_num(slot->ds_node_num);
-		if (node == NULL) {
-			hbprintk("node %d went down but don't have a node\n",
-				 slot->ds_node_num);
-			continue;	
-		}
-		hb_do_node_down(node, slot->ds_node_num);
-		nm_node_put(node);
-	}
-
 	/* Make sure the write hits disk before we return. */
 	hb_wait_on_io(reg, &write_wc);
 	bio_put(write_bio);
@@ -615,6 +583,9 @@
 		INIT_LIST_HEAD(&hb_callbacks[i].list);
 		init_MUTEX(&hb_callbacks[i].sem);
 	}
+
+	for (i = 0; i < ARRAY_SIZE(hb_live_slots); i++)
+		INIT_LIST_HEAD(&hb_live_slots[i]);
 }
 
 /*
@@ -622,24 +593,14 @@
  */
 void hb_fill_node_map(unsigned long *map, unsigned bytes)
 {
-	struct hb_region *reg;
-	struct hb_disk_slot *slot;
-
 	BUG_ON(bytes < (BITS_TO_LONGS(NM_MAX_NODES) * sizeof(unsigned long)));
 
-	memset(map, 0, bytes);
-
 	/* callers want to serialize this map and callbacks so that they
 	 * can trust that they don't miss nodes coming to the party */
 	down_read(&hb_callback_sem);
-
-	list_for_each_entry(reg, &hb_active_regions, hr_active_item) {
-		read_lock(&reg->hr_slot_list_lock);
-		list_for_each_entry(slot, &reg->hr_live_list, ds_live_item)
-			set_bit(slot->ds_node_num, map);
-		read_unlock(&reg->hr_slot_list_lock);
-	}
-
+	spin_lock(&hb_live_lock);
+	memcpy(map, &hb_live_node_bitmap, bytes);
+	spin_unlock(&hb_live_lock);
 	up_read(&hb_callback_sem);
 }
 EXPORT_SYMBOL(hb_fill_node_map);
@@ -977,10 +938,6 @@
 		goto out;
 	}
 
-	down_write(&hb_callback_sem);
-	list_add_tail(&reg->hr_active_item, &hb_active_regions);
-	up_write(&hb_callback_sem);
-
 	ret = wait_event_interruptible(hb_steady_queue,
 				atomic_read(&reg->hr_steady_iterations) == 0);
 	if (ret == 0)
@@ -1105,8 +1062,6 @@
 		goto out; /* ENOMEM */
 
 	INIT_LIST_HEAD(&reg->hr_active_item);
-	rwlock_init(&reg->hr_slot_list_lock);
-	INIT_LIST_HEAD(&reg->hr_live_list);
 	hb_init_region_params(reg, HB_DEFAULT_BLOCK_BITS);
 
 	config_item_init_type_name(&reg->hr_item, name, &hb_region_type);
@@ -1271,6 +1226,9 @@
 		hbprintk("missed hb callback(%d) due to EINTR!\n", type);
 		goto out;
 	}
+
+	hbprintk("calling %d callbacks for node %s (%u) on slot %d\n",
+		 type, node->nd_name, node->nd_num, idx);
 	
 	list_for_each(iter, &hbcall->list) {
 		f = list_entry(iter, struct hb_callback_func, hc_item);



More information about the Ocfs2-commits mailing list