[Ocfs2-commits] mfasheh commits r2153 - trunk/fs/ocfs2/cluster

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Wed Apr 20 13:10:32 CDT 2005


Author: mfasheh
Signed-off-by: zab
Date: 2005-04-20 13:10:31 -0500 (Wed, 20 Apr 2005)
New Revision: 2153

Modified:
   trunk/fs/ocfs2/cluster/heartbeat.c
   trunk/fs/ocfs2/cluster/heartbeat.h
Log:
* Take advantage of typechecking and name the hb callback enum

* The heartbeat node bitmap wasn't being set and cleared at the right time,
  and leaving a heartbeat region didn't check to clear and deliver node down
  events.

* We avoid node up / down event racing by introducing an event queue from
  which events are actually deliever in fifo order.

* Remove the sem argument from the hb_callbacks struct and just use the
  global hb_callback_sem instead.

Signed-off-by: zab



Modified: trunk/fs/ocfs2/cluster/heartbeat.c
===================================================================
--- trunk/fs/ocfs2/cluster/heartbeat.c	2005-04-20 17:59:03 UTC (rev 2152)
+++ trunk/fs/ocfs2/cluster/heartbeat.c	2005-04-20 18:10:31 UTC (rev 2153)
@@ -69,7 +69,6 @@
 #define __user
 #endif
 
-static void hb_do_callbacks(int type, struct nm_node *node, int idx);
 
 /* 
  * The first heartbeat pass had one global thread that would serialize all hb
@@ -86,14 +85,17 @@
 static spinlock_t hb_live_lock = SPIN_LOCK_UNLOCKED;
 static struct list_head hb_live_slots[NM_MAX_NODES];
 static unsigned long hb_live_node_bitmap[BITS_TO_LONGS(NM_MAX_NODES)];
+static LIST_HEAD(hb_node_events);
 
+
 static DECLARE_WAIT_QUEUE_HEAD(hb_steady_queue);
 
 static struct hb_callback {
 	struct list_head list;
-	struct semaphore sem;
 } hb_callbacks[HB_NUM_CB];
 
+static struct hb_callback *hbcall_from_type(enum hb_callback_type type);
+
 #ifndef ENABLE_HBPRINTK
 #define hbprintk(x, arg...)    
 #define hbprintk0(x)
@@ -108,6 +110,13 @@
 
 #define HB_DEFAULT_BLOCK_BITS         9
 
+struct hb_node_event {
+	struct list_head        hn_item;
+	enum hb_callback_type   hn_event_type;
+	struct nm_node          *hn_node;
+	int                     hn_node_num;
+};
+
 struct hb_disk_slot {
 	hb_disk_heartbeat_block *ds_raw_block;
 	u8			ds_node_num;
@@ -122,7 +131,6 @@
 struct hb_region {
 	struct config_item	hr_item;
 	/* protected by the hr_callback_sem */
-	struct list_head	hr_active_item;
 	struct task_struct 	*hr_task;
 	unsigned int		hr_fs_block_bytes;
 	unsigned int		hr_fs_block_bits;
@@ -437,12 +445,129 @@
 	hb_block->time = cpu_to_le64(cputime);
 }
 
+static void hb_fire_callbacks(struct hb_callback *hbcall,
+			      struct nm_node *node,
+			      int idx)
+{
+	struct list_head *iter;
+	struct hb_callback_func *f;
+
+	list_for_each(iter, &hbcall->list) {
+		f = list_entry(iter, struct hb_callback_func, hc_item);
+		(f->hc_func)(node, idx, f->hc_data);
+	}
+}
+
+/* Will run the list in order until we process the passed event */
+static void hb_run_event_list(struct hb_node_event *queued_event)
+{
+	int empty;
+	struct hb_callback *hbcall;
+	struct hb_node_event *event;
+
+	spin_lock(&hb_live_lock);
+	empty = list_empty(&queued_event->hn_item);
+	spin_unlock(&hb_live_lock);
+	if (empty)
+		return;
+
+	/* Holding callback sem assures we don't alter the callback
+	 * lists when doing this, and serializes ourselves with other
+	 * processes wanting callbacks. */
+	down_write(&hb_callback_sem);
+
+	spin_lock(&hb_live_lock);
+	while (!list_empty(&hb_node_events) 
+	       && !list_empty(&queued_event->hn_item)) {
+		event = list_entry(hb_node_events.next,
+				   struct hb_node_event,
+				   hn_item);
+		list_del_init(&event->hn_item);
+		spin_unlock(&hb_live_lock);
+
+		hbprintk("Node %s event for %d\n",
+			 event->hn_event_type == HB_NODE_UP_CB ? "UP" : "DOWN",
+			 event->hn_node_num);
+
+		hbcall = hbcall_from_type(event->hn_event_type);
+
+		/* We should *never* have gotten on to the list with a
+		 * bad type... This isn't something that we should try
+		 * to recover from. */
+		BUG_ON(IS_ERR(hbcall));
+
+		hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
+
+		spin_lock(&hb_live_lock);
+	}
+	spin_unlock(&hb_live_lock);
+
+	up_write(&hb_callback_sem);
+}
+
+static void hb_queue_node_event(struct hb_node_event *event,
+				enum hb_callback_type type,
+				struct nm_node *node,
+				int node_num)
+{
+	assert_spin_locked(&hb_live_lock);
+
+	event->hn_event_type = type;
+	event->hn_node = node;
+	event->hn_node_num = node_num;
+
+	hbprintk("Queue node %s event for node %d\n",
+		 type == HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
+
+	list_add_tail(&event->hn_item, &hb_node_events);
+}
+
+static void hb_shutdown_slot(struct hb_disk_slot *slot)
+{
+	struct hb_node_event event =
+		{ .hn_item = LIST_HEAD_INIT(event.hn_item), };
+	struct nm_node *node;
+
+	node = nm_get_node_by_num(slot->ds_node_num);
+	if (!node)
+		return;
+
+	spin_lock(&hb_live_lock);
+	if (!list_empty(&slot->ds_live_item)) {
+		hbprintk("Shutdown, node %d leaves region\n",
+			 slot->ds_node_num);
+
+		list_del_init(&slot->ds_live_item);
+
+		if (list_empty(&hb_live_slots[slot->ds_node_num])) {
+			clear_bit(slot->ds_node_num, hb_live_node_bitmap);
+
+			hb_queue_node_event(&event, HB_NODE_DOWN_CB, node,
+					    slot->ds_node_num);
+		}
+	}
+	spin_unlock(&hb_live_lock);
+
+	hb_run_event_list(&event);
+
+	nm_node_put(node);
+}
+
 static int hb_check_slot(struct hb_disk_slot *slot)
 {
-	int type = HB_NUM_CB;
+	int changed = 0;
+	struct hb_node_event event = 
+		{ .hn_item = LIST_HEAD_INIT(event.hn_item), };
+	struct nm_node *node;
 	hb_disk_heartbeat_block *hb_block = slot->ds_raw_block;
 	u64 cputime;
 
+	/* Is this correct? Do we assume that the node doesn't exist
+	 * if we're not configured for him? */
+	node = nm_get_node_by_num(slot->ds_node_num);
+	if (!node)
+		return 0;
+
 	/* we don't care if these wrap.. the state transitions below
 	 * clear at the right places */
 	cputime = le64_to_cpu(hb_block->time);
@@ -457,13 +582,21 @@
 	 * changes at any time during their dead time */
 	if (list_empty(&slot->ds_live_item) &&
 	    slot->ds_changed_samples >= HB_LIVE_THRESHOLD) {
+		hbprintk("Node %d joined my region\n", slot->ds_node_num);
+
 		/* first on the list generates a callback */
-		if (list_empty(&hb_live_slots[slot->ds_node_num]))
-			type = HB_NODE_UP_CB;
+		if (list_empty(&hb_live_slots[slot->ds_node_num])) {
+			set_bit(slot->ds_node_num, hb_live_node_bitmap);
+
+			hb_queue_node_event(&event, HB_NODE_UP_CB, node,
+					    slot->ds_node_num);
+	
+			changed = 1;
+		}
+
 		list_add_tail(&slot->ds_live_item,
 			      &hb_live_slots[slot->ds_node_num]);
 
-		set_bit(slot->ds_node_num, hb_live_node_bitmap);
 		slot->ds_equal_samples = 0;
 		goto out;
 	}
@@ -476,12 +609,19 @@
 	 * samples..  reset the missed counter whenever we see 
 	 * activity */
 	if (slot->ds_equal_samples >= HB_DEAD_THRESHOLD) {
+		hbprintk("Node %d left my region\n", slot->ds_node_num);
+
 		/* last off the live_slot generates a callback */
 		list_del_init(&slot->ds_live_item);
-		if (list_empty(&hb_live_slots[slot->ds_node_num]))
-			type = HB_NODE_DOWN_CB;
+		if (list_empty(&hb_live_slots[slot->ds_node_num])) {
+			clear_bit(slot->ds_node_num, hb_live_node_bitmap);
 
-		clear_bit(slot->ds_node_num, hb_live_node_bitmap);
+			hb_queue_node_event(&event, HB_NODE_DOWN_CB, node,
+					    slot->ds_node_num);
+
+			changed = 1;
+		}
+
 		slot->ds_changed_samples = 0;
 		goto out;
 	}
@@ -491,15 +631,11 @@
 	}
 out:
 	spin_unlock(&hb_live_lock);
-	if (type != HB_NUM_CB) {
-		struct nm_node *node = nm_get_node_by_num(slot->ds_node_num);
-		if (node) {
-			hb_do_callbacks(type, node, slot->ds_node_num);
-			nm_node_put(node);
-		}
-		return 1;
-	}
-	return 0;
+
+	hb_run_event_list(&event);
+
+	nm_node_put(node);
+	return changed;
 }
 
 /* This could be faster if we just implmented a find_last_bit, but I
@@ -590,6 +726,7 @@
 static int hb_thread(void *data)
 {
 	struct hb_region *reg = data;
+	int i;
 
 	hbprintk("hb thread running\n");
 
@@ -599,6 +736,9 @@
 		schedule_timeout(msecs_to_jiffies(HB_THREAD_MS));
 	}
 
+	for(i = 0; i < reg->hr_blocks; i++)
+		hb_shutdown_slot(&reg->hr_slots[i]);
+
 	hbprintk("hb thread exiting\n");
 
 	return 0;
@@ -608,13 +748,15 @@
 {
 	int i;
 
-	for (i = 0; i < ARRAY_SIZE(hb_callbacks); i++) {
+	for (i = 0; i < ARRAY_SIZE(hb_callbacks); i++)
 		INIT_LIST_HEAD(&hb_callbacks[i].list);
-		init_MUTEX(&hb_callbacks[i].sem);
-	}
 
 	for (i = 0; i < ARRAY_SIZE(hb_live_slots); i++)
 		INIT_LIST_HEAD(&hb_live_slots[i]);
+
+	INIT_LIST_HEAD(&hb_node_events);
+
+	memset(hb_live_node_bitmap, 0, sizeof(hb_live_node_bitmap));
 }
 
 /*
@@ -1090,7 +1232,6 @@
 	if (reg == NULL)
 		goto out; /* ENOMEM */
 
-	INIT_LIST_HEAD(&reg->hr_active_item);
 	hb_init_region_params(reg, HB_DEFAULT_BLOCK_BITS);
 
 	config_item_init_type_name(&reg->hr_item, name, &hb_region_type);
@@ -1108,11 +1249,6 @@
 {
 	struct hb_region *reg = to_hb_region(item);
 
-	down_read(&hb_callback_sem);
-	if (!list_empty(&reg->hr_active_item))
-		list_del_init(&reg->hr_active_item);
-	up_read(&hb_callback_sem);
-
 	/* stop the thread when the user removes the region dir */
 	if (reg->hr_task) {
 		kthread_stop(reg->hr_task);
@@ -1159,19 +1295,21 @@
 	kfree(hs);
 }
 
-
 /* hb callback registration and issueing */
 
-static struct hb_callback *hbcall_from_type(int type)
+static struct hb_callback *hbcall_from_type(enum hb_callback_type type)
 {
-	if (type < HB_NODE_DOWN_CB || type >= HB_NUM_CB)
+	if (type == HB_NUM_CB)
 		return ERR_PTR(-EINVAL);
 
 	return &hb_callbacks[type];
 }
 
-void hb_setup_callback(struct hb_callback_func *hc, int type, hb_cb_func *func,
-		       void *data, int priority)
+void hb_setup_callback(struct hb_callback_func *hc,
+		       enum hb_callback_type type,
+		       hb_cb_func *func,
+		       void *data,
+		       int priority)
 {
 	INIT_LIST_HEAD(&hc->hc_item);
 	hc->hc_func = func;
@@ -1186,7 +1324,6 @@
 	struct hb_callback_func *tmp;
 	struct list_head *iter;
 	struct hb_callback *hbcall;
-	int ret;
 
 	BUG_ON(!list_empty(&hc->hc_item));
 
@@ -1194,10 +1331,8 @@
 	if (IS_ERR(hbcall))
 		return PTR_ERR(hbcall);
 
-	ret = down_interruptible(&hbcall->sem);
-	if (ret)
-		goto out;
-	
+	down_write(&hb_callback_sem);
+
 	list_for_each(iter, &hbcall->list) {
 		tmp = list_entry(iter, struct hb_callback_func, hc_item);
 		if (hc->hc_priority < tmp->hc_priority) {
@@ -1208,66 +1343,26 @@
 	if (list_empty(&hc->hc_item)) 
 		list_add_tail(&hc->hc_item, &hbcall->list);
 
-	up(&hbcall->sem);
-out:
-	return ret;
+	up_write(&hb_callback_sem);
+
+	return 0;
 }
 EXPORT_SYMBOL(hb_register_callback);
 
 int hb_unregister_callback(struct hb_callback_func *hc)
 {
-	struct hb_callback *hbcall;
-	int ret;
-
 	BUG_ON(list_empty(&hc->hc_item));
 
-	hbcall = hbcall_from_type(hc->hc_type);
-	if (IS_ERR(hbcall))
-		return PTR_ERR(hbcall);
+	down_write(&hb_callback_sem);
 
-	ret = down_interruptible(&hbcall->sem);
-	if (ret)
-		goto out;
-
 	list_del_init(&hc->hc_item);
 
-	up(&hbcall->sem);
+	up_write(&hb_callback_sem);
 
-out:
-	return ret;
+	return 0;
 }
 EXPORT_SYMBOL(hb_unregister_callback);
 
-static void hb_do_callbacks(int type, struct nm_node *node, int idx)
-{
-	struct list_head *iter;
-	struct hb_callback_func *f;
-	struct hb_callback *hbcall;
-
-	hbcall = hbcall_from_type(type);
-	if (IS_ERR(hbcall))
-		return;
-
-	/* XXX not interruptible?  this is in the hb thread.. */
-	down_write(&hb_callback_sem);
-
-	if (down_interruptible(&hbcall->sem)) {
-		hbprintk("missed hb callback(%d) due to EINTR!\n", type);
-		goto out;
-	}
-
-	hbprintk("calling %d callbacks for node %s (%u) on slot %d\n",
-		 type, node->nd_name, node->nd_num, idx);
-	
-	list_for_each(iter, &hbcall->list) {
-		f = list_entry(iter, struct hb_callback_func, hc_item);
-		(f->hc_func)(node, idx, f->hc_data);
-	}
-	up(&hbcall->sem);
-out:
-	up_write(&hb_callback_sem);
-}
-
 /* Makes sure our local node is configured with a node number, and is
  * heartbeating. */
 int hb_check_local_node_heartbeating(void)

Modified: trunk/fs/ocfs2/cluster/heartbeat.h
===================================================================
--- trunk/fs/ocfs2/cluster/heartbeat.h	2005-04-20 17:59:03 UTC (rev 2152)
+++ trunk/fs/ocfs2/cluster/heartbeat.h	2005-04-20 18:10:31 UTC (rev 2153)
@@ -30,7 +30,7 @@
 #include "ocfs2_heartbeat.h"
 
 /* callback stuff */
-enum {
+enum hb_callback_type {
 	HB_NODE_DOWN_CB = 0,
 	HB_NODE_UP_CB,
 	HB_NUM_CB
@@ -44,7 +44,7 @@
 	hb_cb_func		*hc_func;
 	void			*hc_data;
 	int			hc_priority;
-	int			hc_type;
+	enum hb_callback_type   hc_type;
 };
 
 /* number of changes to be seen as live */ 
@@ -55,11 +55,15 @@
 struct config_group *hb_alloc_hb_set(void);
 void hb_free_hb_set(struct config_group *group);
 
-void hb_setup_callback(struct hb_callback_func *hc, int type, hb_cb_func *func,
-		      void *data, int priority);
+void hb_setup_callback(struct hb_callback_func *hc,
+		       enum hb_callback_type type,
+		       hb_cb_func *func,
+		       void *data,
+		       int priority);
 int hb_register_callback(struct hb_callback_func *hc);
 int hb_unregister_callback(struct hb_callback_func *hc); 
-void hb_fill_node_map(unsigned long *map, unsigned bytes);
+void hb_fill_node_map(unsigned long *map,
+		      unsigned bytes);
 void hb_init(void);
 int hb_check_local_node_heartbeating(void);
 



More information about the Ocfs2-commits mailing list