[Ocfs2-commits] mfasheh commits r2153 - trunk/fs/ocfs2/cluster
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Wed Apr 20 13:10:32 CDT 2005
Author: mfasheh
Signed-off-by: zab
Date: 2005-04-20 13:10:31 -0500 (Wed, 20 Apr 2005)
New Revision: 2153
Modified:
trunk/fs/ocfs2/cluster/heartbeat.c
trunk/fs/ocfs2/cluster/heartbeat.h
Log:
* Take advantage of typechecking and name the hb callback enum
* The heartbeat node bitmap wasn't being set and cleared at the right time,
and leaving a heartbeat region didn't check to clear and deliver node down
events.
* We avoid node up / down event racing by introducing an event queue from
which events are actually deliever in fifo order.
* Remove the sem argument from the hb_callbacks struct and just use the
global hb_callback_sem instead.
Signed-off-by: zab
Modified: trunk/fs/ocfs2/cluster/heartbeat.c
===================================================================
--- trunk/fs/ocfs2/cluster/heartbeat.c 2005-04-20 17:59:03 UTC (rev 2152)
+++ trunk/fs/ocfs2/cluster/heartbeat.c 2005-04-20 18:10:31 UTC (rev 2153)
@@ -69,7 +69,6 @@
#define __user
#endif
-static void hb_do_callbacks(int type, struct nm_node *node, int idx);
/*
* The first heartbeat pass had one global thread that would serialize all hb
@@ -86,14 +85,17 @@
static spinlock_t hb_live_lock = SPIN_LOCK_UNLOCKED;
static struct list_head hb_live_slots[NM_MAX_NODES];
static unsigned long hb_live_node_bitmap[BITS_TO_LONGS(NM_MAX_NODES)];
+static LIST_HEAD(hb_node_events);
+
static DECLARE_WAIT_QUEUE_HEAD(hb_steady_queue);
static struct hb_callback {
struct list_head list;
- struct semaphore sem;
} hb_callbacks[HB_NUM_CB];
+static struct hb_callback *hbcall_from_type(enum hb_callback_type type);
+
#ifndef ENABLE_HBPRINTK
#define hbprintk(x, arg...)
#define hbprintk0(x)
@@ -108,6 +110,13 @@
#define HB_DEFAULT_BLOCK_BITS 9
+struct hb_node_event {
+ struct list_head hn_item;
+ enum hb_callback_type hn_event_type;
+ struct nm_node *hn_node;
+ int hn_node_num;
+};
+
struct hb_disk_slot {
hb_disk_heartbeat_block *ds_raw_block;
u8 ds_node_num;
@@ -122,7 +131,6 @@
struct hb_region {
struct config_item hr_item;
/* protected by the hr_callback_sem */
- struct list_head hr_active_item;
struct task_struct *hr_task;
unsigned int hr_fs_block_bytes;
unsigned int hr_fs_block_bits;
@@ -437,12 +445,129 @@
hb_block->time = cpu_to_le64(cputime);
}
+static void hb_fire_callbacks(struct hb_callback *hbcall,
+ struct nm_node *node,
+ int idx)
+{
+ struct list_head *iter;
+ struct hb_callback_func *f;
+
+ list_for_each(iter, &hbcall->list) {
+ f = list_entry(iter, struct hb_callback_func, hc_item);
+ (f->hc_func)(node, idx, f->hc_data);
+ }
+}
+
+/* Will run the list in order until we process the passed event */
+static void hb_run_event_list(struct hb_node_event *queued_event)
+{
+ int empty;
+ struct hb_callback *hbcall;
+ struct hb_node_event *event;
+
+ spin_lock(&hb_live_lock);
+ empty = list_empty(&queued_event->hn_item);
+ spin_unlock(&hb_live_lock);
+ if (empty)
+ return;
+
+ /* Holding callback sem assures we don't alter the callback
+ * lists when doing this, and serializes ourselves with other
+ * processes wanting callbacks. */
+ down_write(&hb_callback_sem);
+
+ spin_lock(&hb_live_lock);
+ while (!list_empty(&hb_node_events)
+ && !list_empty(&queued_event->hn_item)) {
+ event = list_entry(hb_node_events.next,
+ struct hb_node_event,
+ hn_item);
+ list_del_init(&event->hn_item);
+ spin_unlock(&hb_live_lock);
+
+ hbprintk("Node %s event for %d\n",
+ event->hn_event_type == HB_NODE_UP_CB ? "UP" : "DOWN",
+ event->hn_node_num);
+
+ hbcall = hbcall_from_type(event->hn_event_type);
+
+ /* We should *never* have gotten on to the list with a
+ * bad type... This isn't something that we should try
+ * to recover from. */
+ BUG_ON(IS_ERR(hbcall));
+
+ hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
+
+ spin_lock(&hb_live_lock);
+ }
+ spin_unlock(&hb_live_lock);
+
+ up_write(&hb_callback_sem);
+}
+
+static void hb_queue_node_event(struct hb_node_event *event,
+ enum hb_callback_type type,
+ struct nm_node *node,
+ int node_num)
+{
+ assert_spin_locked(&hb_live_lock);
+
+ event->hn_event_type = type;
+ event->hn_node = node;
+ event->hn_node_num = node_num;
+
+ hbprintk("Queue node %s event for node %d\n",
+ type == HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
+
+ list_add_tail(&event->hn_item, &hb_node_events);
+}
+
+static void hb_shutdown_slot(struct hb_disk_slot *slot)
+{
+ struct hb_node_event event =
+ { .hn_item = LIST_HEAD_INIT(event.hn_item), };
+ struct nm_node *node;
+
+ node = nm_get_node_by_num(slot->ds_node_num);
+ if (!node)
+ return;
+
+ spin_lock(&hb_live_lock);
+ if (!list_empty(&slot->ds_live_item)) {
+ hbprintk("Shutdown, node %d leaves region\n",
+ slot->ds_node_num);
+
+ list_del_init(&slot->ds_live_item);
+
+ if (list_empty(&hb_live_slots[slot->ds_node_num])) {
+ clear_bit(slot->ds_node_num, hb_live_node_bitmap);
+
+ hb_queue_node_event(&event, HB_NODE_DOWN_CB, node,
+ slot->ds_node_num);
+ }
+ }
+ spin_unlock(&hb_live_lock);
+
+ hb_run_event_list(&event);
+
+ nm_node_put(node);
+}
+
static int hb_check_slot(struct hb_disk_slot *slot)
{
- int type = HB_NUM_CB;
+ int changed = 0;
+ struct hb_node_event event =
+ { .hn_item = LIST_HEAD_INIT(event.hn_item), };
+ struct nm_node *node;
hb_disk_heartbeat_block *hb_block = slot->ds_raw_block;
u64 cputime;
+ /* Is this correct? Do we assume that the node doesn't exist
+ * if we're not configured for him? */
+ node = nm_get_node_by_num(slot->ds_node_num);
+ if (!node)
+ return 0;
+
/* we don't care if these wrap.. the state transitions below
* clear at the right places */
cputime = le64_to_cpu(hb_block->time);
@@ -457,13 +582,21 @@
* changes at any time during their dead time */
if (list_empty(&slot->ds_live_item) &&
slot->ds_changed_samples >= HB_LIVE_THRESHOLD) {
+ hbprintk("Node %d joined my region\n", slot->ds_node_num);
+
/* first on the list generates a callback */
- if (list_empty(&hb_live_slots[slot->ds_node_num]))
- type = HB_NODE_UP_CB;
+ if (list_empty(&hb_live_slots[slot->ds_node_num])) {
+ set_bit(slot->ds_node_num, hb_live_node_bitmap);
+
+ hb_queue_node_event(&event, HB_NODE_UP_CB, node,
+ slot->ds_node_num);
+
+ changed = 1;
+ }
+
list_add_tail(&slot->ds_live_item,
&hb_live_slots[slot->ds_node_num]);
- set_bit(slot->ds_node_num, hb_live_node_bitmap);
slot->ds_equal_samples = 0;
goto out;
}
@@ -476,12 +609,19 @@
* samples.. reset the missed counter whenever we see
* activity */
if (slot->ds_equal_samples >= HB_DEAD_THRESHOLD) {
+ hbprintk("Node %d left my region\n", slot->ds_node_num);
+
/* last off the live_slot generates a callback */
list_del_init(&slot->ds_live_item);
- if (list_empty(&hb_live_slots[slot->ds_node_num]))
- type = HB_NODE_DOWN_CB;
+ if (list_empty(&hb_live_slots[slot->ds_node_num])) {
+ clear_bit(slot->ds_node_num, hb_live_node_bitmap);
- clear_bit(slot->ds_node_num, hb_live_node_bitmap);
+ hb_queue_node_event(&event, HB_NODE_DOWN_CB, node,
+ slot->ds_node_num);
+
+ changed = 1;
+ }
+
slot->ds_changed_samples = 0;
goto out;
}
@@ -491,15 +631,11 @@
}
out:
spin_unlock(&hb_live_lock);
- if (type != HB_NUM_CB) {
- struct nm_node *node = nm_get_node_by_num(slot->ds_node_num);
- if (node) {
- hb_do_callbacks(type, node, slot->ds_node_num);
- nm_node_put(node);
- }
- return 1;
- }
- return 0;
+
+ hb_run_event_list(&event);
+
+ nm_node_put(node);
+ return changed;
}
/* This could be faster if we just implmented a find_last_bit, but I
@@ -590,6 +726,7 @@
static int hb_thread(void *data)
{
struct hb_region *reg = data;
+ int i;
hbprintk("hb thread running\n");
@@ -599,6 +736,9 @@
schedule_timeout(msecs_to_jiffies(HB_THREAD_MS));
}
+ for(i = 0; i < reg->hr_blocks; i++)
+ hb_shutdown_slot(®->hr_slots[i]);
+
hbprintk("hb thread exiting\n");
return 0;
@@ -608,13 +748,15 @@
{
int i;
- for (i = 0; i < ARRAY_SIZE(hb_callbacks); i++) {
+ for (i = 0; i < ARRAY_SIZE(hb_callbacks); i++)
INIT_LIST_HEAD(&hb_callbacks[i].list);
- init_MUTEX(&hb_callbacks[i].sem);
- }
for (i = 0; i < ARRAY_SIZE(hb_live_slots); i++)
INIT_LIST_HEAD(&hb_live_slots[i]);
+
+ INIT_LIST_HEAD(&hb_node_events);
+
+ memset(hb_live_node_bitmap, 0, sizeof(hb_live_node_bitmap));
}
/*
@@ -1090,7 +1232,6 @@
if (reg == NULL)
goto out; /* ENOMEM */
- INIT_LIST_HEAD(®->hr_active_item);
hb_init_region_params(reg, HB_DEFAULT_BLOCK_BITS);
config_item_init_type_name(®->hr_item, name, &hb_region_type);
@@ -1108,11 +1249,6 @@
{
struct hb_region *reg = to_hb_region(item);
- down_read(&hb_callback_sem);
- if (!list_empty(®->hr_active_item))
- list_del_init(®->hr_active_item);
- up_read(&hb_callback_sem);
-
/* stop the thread when the user removes the region dir */
if (reg->hr_task) {
kthread_stop(reg->hr_task);
@@ -1159,19 +1295,21 @@
kfree(hs);
}
-
/* hb callback registration and issueing */
-static struct hb_callback *hbcall_from_type(int type)
+static struct hb_callback *hbcall_from_type(enum hb_callback_type type)
{
- if (type < HB_NODE_DOWN_CB || type >= HB_NUM_CB)
+ if (type == HB_NUM_CB)
return ERR_PTR(-EINVAL);
return &hb_callbacks[type];
}
-void hb_setup_callback(struct hb_callback_func *hc, int type, hb_cb_func *func,
- void *data, int priority)
+void hb_setup_callback(struct hb_callback_func *hc,
+ enum hb_callback_type type,
+ hb_cb_func *func,
+ void *data,
+ int priority)
{
INIT_LIST_HEAD(&hc->hc_item);
hc->hc_func = func;
@@ -1186,7 +1324,6 @@
struct hb_callback_func *tmp;
struct list_head *iter;
struct hb_callback *hbcall;
- int ret;
BUG_ON(!list_empty(&hc->hc_item));
@@ -1194,10 +1331,8 @@
if (IS_ERR(hbcall))
return PTR_ERR(hbcall);
- ret = down_interruptible(&hbcall->sem);
- if (ret)
- goto out;
-
+ down_write(&hb_callback_sem);
+
list_for_each(iter, &hbcall->list) {
tmp = list_entry(iter, struct hb_callback_func, hc_item);
if (hc->hc_priority < tmp->hc_priority) {
@@ -1208,66 +1343,26 @@
if (list_empty(&hc->hc_item))
list_add_tail(&hc->hc_item, &hbcall->list);
- up(&hbcall->sem);
-out:
- return ret;
+ up_write(&hb_callback_sem);
+
+ return 0;
}
EXPORT_SYMBOL(hb_register_callback);
int hb_unregister_callback(struct hb_callback_func *hc)
{
- struct hb_callback *hbcall;
- int ret;
-
BUG_ON(list_empty(&hc->hc_item));
- hbcall = hbcall_from_type(hc->hc_type);
- if (IS_ERR(hbcall))
- return PTR_ERR(hbcall);
+ down_write(&hb_callback_sem);
- ret = down_interruptible(&hbcall->sem);
- if (ret)
- goto out;
-
list_del_init(&hc->hc_item);
- up(&hbcall->sem);
+ up_write(&hb_callback_sem);
-out:
- return ret;
+ return 0;
}
EXPORT_SYMBOL(hb_unregister_callback);
-static void hb_do_callbacks(int type, struct nm_node *node, int idx)
-{
- struct list_head *iter;
- struct hb_callback_func *f;
- struct hb_callback *hbcall;
-
- hbcall = hbcall_from_type(type);
- if (IS_ERR(hbcall))
- return;
-
- /* XXX not interruptible? this is in the hb thread.. */
- down_write(&hb_callback_sem);
-
- if (down_interruptible(&hbcall->sem)) {
- hbprintk("missed hb callback(%d) due to EINTR!\n", type);
- goto out;
- }
-
- hbprintk("calling %d callbacks for node %s (%u) on slot %d\n",
- type, node->nd_name, node->nd_num, idx);
-
- list_for_each(iter, &hbcall->list) {
- f = list_entry(iter, struct hb_callback_func, hc_item);
- (f->hc_func)(node, idx, f->hc_data);
- }
- up(&hbcall->sem);
-out:
- up_write(&hb_callback_sem);
-}
-
/* Makes sure our local node is configured with a node number, and is
* heartbeating. */
int hb_check_local_node_heartbeating(void)
Modified: trunk/fs/ocfs2/cluster/heartbeat.h
===================================================================
--- trunk/fs/ocfs2/cluster/heartbeat.h 2005-04-20 17:59:03 UTC (rev 2152)
+++ trunk/fs/ocfs2/cluster/heartbeat.h 2005-04-20 18:10:31 UTC (rev 2153)
@@ -30,7 +30,7 @@
#include "ocfs2_heartbeat.h"
/* callback stuff */
-enum {
+enum hb_callback_type {
HB_NODE_DOWN_CB = 0,
HB_NODE_UP_CB,
HB_NUM_CB
@@ -44,7 +44,7 @@
hb_cb_func *hc_func;
void *hc_data;
int hc_priority;
- int hc_type;
+ enum hb_callback_type hc_type;
};
/* number of changes to be seen as live */
@@ -55,11 +55,15 @@
struct config_group *hb_alloc_hb_set(void);
void hb_free_hb_set(struct config_group *group);
-void hb_setup_callback(struct hb_callback_func *hc, int type, hb_cb_func *func,
- void *data, int priority);
+void hb_setup_callback(struct hb_callback_func *hc,
+ enum hb_callback_type type,
+ hb_cb_func *func,
+ void *data,
+ int priority);
int hb_register_callback(struct hb_callback_func *hc);
int hb_unregister_callback(struct hb_callback_func *hc);
-void hb_fill_node_map(unsigned long *map, unsigned bytes);
+void hb_fill_node_map(unsigned long *map,
+ unsigned bytes);
void hb_init(void);
int hb_check_local_node_heartbeating(void);
More information about the Ocfs2-commits
mailing list