[Ocfs2-tools-devel] [PATCH 15/39] ocfs2_controld: add CPG skeleton

Joel Becker joel.becker at oracle.com
Fri Mar 14 16:52:38 PDT 2008


Add a skeleton cpg interface to ocfs2_controld.  It connects to the CPG
service and joins a group for the daemon itself.

Signed-off-by: Joel Becker <joel.becker at oracle.com>
---
 ocfs2_controld/Makefile         |    5 +-
 ocfs2_controld/cman.c           |   37 +++++-
 ocfs2_controld/cpg.c            |  287 +++++++++++++++++++++++++++++++++++++++
 ocfs2_controld/main.c           |   19 +--
 ocfs2_controld/ocfs2_controld.h |    9 +-
 5 files changed, 336 insertions(+), 21 deletions(-)
 create mode 100644 ocfs2_controld/cpg.c

diff --git a/ocfs2_controld/Makefile b/ocfs2_controld/Makefile
index 00a3b04..e5476e8 100644
--- a/ocfs2_controld/Makefile
+++ b/ocfs2_controld/Makefile
@@ -14,6 +14,7 @@ LIBO2CB_LIBS = -L$(TOPDIR)/libo2cb -lo2cb
 LIBO2CB_DEPS = $(TOPDIR)/libo2cb/libo2cb.a
 LIBOCFS2_LIBS = -L$(TOPDIR)/libocfs2 -locfs2
 LIBOCFS2_DEPS = $(TOPDIR)/libocfs2/libocfs2.a
+LIBCPG_LIBS = $(CPG_LDFLAGS) -lcpg
 
 ifdef OCFS2_DEBUG
 OPTS += -ggdb
@@ -29,7 +30,7 @@ DEFINES = -DOCFS2_FLAT_INCLUDES -DO2DLM_FLAT_INCLUDES \
 
 UNINST_HFILES = ocfs2_controld.h
 
-DAEMON_CFILES = main.c cman.c
+DAEMON_CFILES = main.c cman.c cpg.c
 TEST_CFILES = test_client.c
 
 DAEMON_OBJS = $(subst .c,.o,$(DAEMON_CFILES))
@@ -43,7 +44,7 @@ DIST_FILES =				\
 	$(addsuffix .in,$(MANS))
 
 ocfs2_controld.cman: $(DAEMON_OBJS) $(LIBO2CB_DEPS)
-	$(LINK) $(LIBO2CB_LIBS) $(COM_ERR_LIBS) -lcman
+	$(LINK) $(LIBO2CB_LIBS) $(COM_ERR_LIBS) $(LIBCPG_LIBS) -lcman
 
 test_client: $(TEST_OBJS) $(LIBO2CB_DEPS) $(LIBOCFS2_DEPS)
 	$(LINK) $(LIBOCFS2_LIBS) $(LIBO2CB_LIBS) $(COM_ERR_LIBS)
diff --git a/ocfs2_controld/cman.c b/ocfs2_controld/cman.c
index cbe3d95..e7e019d 100644
--- a/ocfs2_controld/cman.c
+++ b/ocfs2_controld/cman.c
@@ -22,6 +22,7 @@
  *  of the GNU General Public License v.2.
  */
 
+#include <stdio.h>
 #include <string.h>
 #include <stdlib.h>
 #include <time.h>
@@ -40,6 +41,7 @@ int			cman_ci;
 char *			clustername;
 cman_cluster_t		cluster;
 static cman_handle_t	ch;
+static cman_handle_t	ch_admin;
 extern struct list_head mounts;
 static cman_node_t      old_nodes[O2NM_MAX_NODES];
 static int              old_node_count;
@@ -47,6 +49,11 @@ static cman_node_t      cman_nodes[O2NM_MAX_NODES];
 static int              cman_node_count;
 
 
+int kill_cman(int nodeid)
+{
+	return cman_kill_node(ch_admin, nodeid);
+}
+
 static int is_member(cman_node_t *node_list, int count, int nodeid)
 {
 	int i;
@@ -147,7 +154,7 @@ static void cman_callback(cman_handle_t h, void *private, int reason, int arg)
 	}
 }
 
-static void exit_cman(int ci)
+static void dead_cman(int ci)
 {
 	if (ci != cman_ci) {
 		log_error("Unknown connection %d", ci);
@@ -183,7 +190,15 @@ int setup_cman(void)
 	ch = cman_init(NULL);
 	if (!ch) {
 		log_error("cman_init error %d", errno);
-		return -ENOTCONN;
+		rv = -ENOTCONN;
+		goto fail_finish;
+	}
+
+	ch_admin = cman_admin_init(NULL);
+	if (!ch) {
+		log_error("cman_admin_init error %d", errno);
+		rv = -ENOTCONN;
+		goto fail_finish;
 	}
 
 	rv = cman_start_notification(ch, cman_callback);
@@ -220,13 +235,27 @@ int setup_cman(void)
 	/* Fill the node list */
 	statechange();
 
-	cman_ci = client_add(fd, process_cman, exit_cman);
+	cman_ci = client_add(fd, process_cman, dead_cman);
 	return 0;
 
  fail_stop:
 	cman_stop_notification(ch);
  fail_finish:
-	cman_finish(ch);
+	if (ch_admin)
+		cman_finish(ch_admin);
+	if (ch)
+		cman_finish(ch);
+	ch = ch_admin = NULL;
 	return rv;
 }
 
+void exit_cman(void)
+{
+	if (ch_admin)
+		cman_finish(ch_admin);
+	if (ch) {
+		log_debug("closing cman connection");
+		cman_stop_notification(ch);
+		cman_finish(ch);
+	}
+}
diff --git a/ocfs2_controld/cpg.c b/ocfs2_controld/cpg.c
new file mode 100644
index 0000000..9131842
--- /dev/null
+++ b/ocfs2_controld/cpg.c
@@ -0,0 +1,287 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * Copyright (C) 2007 Oracle.  All rights reserved.
+ *
+ *  This copyrighted material is made available to anyone wishing to use,
+ *  modify, copy, or redistribute it subject to the terms and conditions
+ *  of the GNU General Public License v.2.
+ */
+
+/* Portions of this file are: */
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <syslog.h>
+#include <time.h>
+#include <openais/cpg.h>
+
+#include "ocfs2-kernel/kernel-list.h"
+
+#include "ocfs2_controld.h"
+
+struct cgroup {
+	struct list_head	cg_list;	/* List of all CPG groups */
+
+	cpg_handle_t		cg_handle;
+	int			cg_fd;
+	int			cg_ci;
+
+	struct cpg_name		cg_name;
+	struct cpg_address	cg_members[CPG_MEMBERS_MAX];
+	int			cg_member_count;
+
+	/* Callback state */
+	int			cg_got_confchg;
+	struct cpg_address	cg_cb_members[CPG_MEMBERS_MAX];
+	struct cpg_address	cg_cb_joined[CPG_MEMBERS_MAX];
+	struct cpg_address	cg_cb_left[CPG_MEMBERS_MAX];
+	int			cg_cb_member_count;
+	int			cg_cb_joined_count;
+	int			cg_cb_left_count;
+};
+
+struct cgroup daemon_group;
+static int daemon_joined;
+struct list_head group_list;
+static int message_flow_control_on;
+
+static void group_change(struct cgroup *cgroup)
+{
+}
+
+static void daemon_change(struct cgroup *cgroup)
+{
+	int i, found = 0;
+
+	log_debug("ocfs2_controld confchg: members %d, left %d, joined %d",
+		  cgroup->cg_cb_member_count,
+		  cgroup->cg_cb_left_count,
+		  cgroup->cg_cb_joined_count);
+
+	memcpy(&cgroup->cg_members, &cgroup->cg_cb_members,
+	       sizeof(&cgroup->cg_cb_members));
+	cgroup->cg_member_count = cgroup->cg_cb_member_count;
+
+	for (i = 0; i < cgroup->cg_cb_member_count; i++) {
+		if ((cgroup->cg_cb_members[i].nodeid == our_nodeid) &&
+		    (cgroup->cg_cb_members[i].pid == (uint32_t)getpid())) {
+		    found = 1;
+		}
+	}
+
+	if (found)
+		daemon_joined = 1;
+	else
+		log_error("this node is not in the ocfs2_controld confchg: %u %u",
+			  our_nodeid, (uint32_t)getpid());
+
+}
+
+static void process_configuration_change(struct cgroup *cgroup)
+{
+	if (cgroup == &daemon_group)
+		daemon_change(cgroup);
+	else
+		group_change(cgroup);
+}
+
+static struct cgroup *client_to_group(int ci)
+{
+	if (ci == daemon_group.cg_ci)
+		return &daemon_group;
+
+	log_error("unknown client %d", ci);
+	return NULL;
+}
+
+static struct cgroup *handle_to_group(cpg_handle_t handle)
+{
+	if (handle == daemon_group.cg_handle)
+		return &daemon_group;
+
+	log_error("unknown handle %llu", (unsigned long long)handle);
+
+	return NULL;
+}
+
+static void deliver_cb(cpg_handle_t handle, struct cpg_name *group_name,
+		       uint32_t nodeid, uint32_t pid,
+		       void *data, int data_len)
+{
+	log_debug("deliver called");
+}
+
+static void confchg_cb(cpg_handle_t handle, struct cpg_name *group_name,
+		       struct cpg_address *member_list,
+		       int member_list_entries,
+		       struct cpg_address *left_list,
+		       int left_list_entries,
+		       struct cpg_address *joined_list,
+		       int joined_list_entries)
+{
+	int i;
+	struct cgroup *cgroup;
+
+	log_debug("confchg called");
+
+	cgroup = handle_to_group(handle);
+	if (!cgroup)
+		return;
+
+	if (left_list_entries > CPG_MEMBERS_MAX) {
+		log_debug("left_list_entries %d", left_list_entries);
+		left_list_entries = CPG_MEMBERS_MAX;
+	} else if (joined_list_entries > CPG_MEMBERS_MAX) {
+		log_debug("joined_list_entries %d", joined_list_entries);
+		joined_list_entries = CPG_MEMBERS_MAX;
+	} else if (member_list_entries > CPG_MEMBERS_MAX) {
+		log_debug("member_list_entries %d", member_list_entries);
+		member_list_entries = CPG_MEMBERS_MAX;
+	}
+
+	cgroup->cg_cb_left_count = left_list_entries;
+	cgroup->cg_cb_joined_count = joined_list_entries;
+	cgroup->cg_cb_member_count = member_list_entries;
+
+	for (i = 0; i < left_list_entries; i++)
+		cgroup->cg_cb_left[i] = left_list[i];
+	for (i = 0; i < joined_list_entries; i++)
+		cgroup->cg_cb_joined[i] = joined_list[i];
+	for (i = 0; i < member_list_entries; i++)
+		cgroup->cg_cb_members[i] = member_list[i];
+
+	cgroup->cg_got_confchg = 1;
+}
+
+static cpg_callbacks_t callbacks = {
+	.cpg_deliver_fn	= deliver_cb,
+	.cpg_confchg_fn	= confchg_cb,
+};
+
+static void process_cpg(int ci)
+{
+	cpg_error_t error;
+	cpg_flow_control_state_t flow_control_state;
+	struct cgroup *cgroup;
+
+	cgroup = client_to_group(ci);
+	if (!cgroup)
+		return;
+
+	cgroup->cg_got_confchg = 0;
+	error = cpg_dispatch(cgroup->cg_handle, CPG_DISPATCH_ONE);
+	if (error != CPG_OK) {
+		log_error("cpg_dispatch error %d", error);
+		return;
+	}
+
+	error = cpg_flow_control_state_get(cgroup->cg_handle,
+					   &flow_control_state);
+	if (error != CPG_OK)
+		log_error("cpg_flow_control_state_get %d", error);
+	else if (flow_control_state == CPG_FLOW_CONTROL_ENABLED) {
+		message_flow_control_on = 1;
+		log_debug("flow control on");
+	} else {
+		if (message_flow_control_on)
+			log_debug("flow control off");
+		message_flow_control_on = 0;
+	}
+
+	if (cgroup->cg_got_confchg)
+		process_configuration_change(cgroup);
+}
+
+static void dead_cpg(int ci)
+{
+	if (ci == daemon_group.cg_ci) {
+		log_error("cpg connection died");
+		shutdown_daemon();
+		
+		/* We can't talk to cpg anymore */
+		daemon_group.cg_handle = 0;
+	}
+
+	client_dead(ci);
+}
+
+int setup_cpg(void)
+{
+	cpg_error_t error;
+
+	error = cpg_initialize(&daemon_group.cg_handle, &callbacks);
+	if (error != CPG_OK) {
+		log_error("cpg_initialize error %d", error);
+		return error;
+	}
+
+	cpg_fd_get(daemon_group.cg_handle, &daemon_group.cg_fd);
+	daemon_group.cg_ci = client_add(daemon_group.cg_fd,
+					process_cpg, dead_cpg);
+
+	strcpy(daemon_group.cg_name.value, "ocfs2_controld");
+	daemon_group.cg_name.length = strlen(daemon_group.cg_name.value);
+
+	do {
+		error = cpg_join(daemon_group.cg_handle,
+				 &daemon_group.cg_name);
+		if (error == CPG_OK) {
+			log_debug("cpg_join succeeded");
+			error = 0;
+		} else if (error == CPG_ERR_TRY_AGAIN) {
+			log_debug("cpg_join retry");
+			sleep(1);
+		} else {
+			log_error("cpg_join error %d", error);
+			cpg_finalize(daemon_group.cg_handle);
+			daemon_group.cg_handle = 0;
+		}
+	} while (error == CPG_ERR_TRY_AGAIN);
+
+	return error;
+}
+
+void exit_cpg(void)
+{
+	int i;
+	cpg_error_t error;
+
+	if (!daemon_group.cg_handle)
+		return;
+
+	for (i = 0; i < 10; i++) {
+		error = cpg_leave(daemon_group.cg_handle,
+				  &daemon_group.cg_name);
+		if (error == CPG_ERR_TRY_AGAIN) {
+			if (!i)
+				log_debug("cpg_leave retry");
+			sleep(1);
+			continue;
+		}
+
+		if (error == CPG_OK)
+			log_debug("cpg_leave succeeded");
+		else
+			log_error("cpg_leave error %d", error);
+
+		break;
+	}
+
+	log_debug("closing cpg connection");
+	cpg_finalize(daemon_group.cg_handle);
+}
diff --git a/ocfs2_controld/main.c b/ocfs2_controld/main.c
index f0ec2fc..cd8651a 100644
--- a/ocfs2_controld/main.c
+++ b/ocfs2_controld/main.c
@@ -22,6 +22,7 @@
  *  of the GNU General Public License v.2.
  */
 
+#include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <sys/types.h>
@@ -63,11 +64,6 @@ static struct pollfd *pollfd = NULL;
 static int time_to_die = 0;
 
 static int sigpipe_write_fd;
-static int groupd_fd;
-
-extern struct list_head mounts;
-extern struct list_head withdrawn_mounts;
-int no_withdraw;
 
 char *prog_name;
 int daemon_debug_opt;
@@ -469,12 +465,9 @@ static int loop(void)
 	if (rv < 0)
 		goto out;
 
-#if 0
-	rv = groupd_fd = setup_groupd();
+	rv = setup_cpg();
 	if (rv < 0)
 		goto out;
-	client_add(groupd_fd);
-#endif
 
 	log_debug("setup done");
 
@@ -516,6 +509,9 @@ stop:
 	bail_on_mounts();
 #endif
 
+	exit_cpg();
+	exit_cman();
+
 out:
 	return rv;
 }
@@ -591,7 +587,6 @@ static void print_usage(void)
 	printf("Options:\n");
 	printf("\n");
 	printf("  -D	       Enable debugging code and don't fork\n");
-	printf("  -w	       Disable withdraw\n");
 	printf("  -h	       Print this help, then exit\n");
 	printf("  -V	       Print program version information, then exit\n");
 }
@@ -606,10 +601,6 @@ static void decode_arguments(int argc, char **argv)
 
 		switch (optchar) {
 
-		case 'w':
-			no_withdraw = 1;
-			break;
-
 		case 'D':
 			daemon_debug_opt = 1;
 			break;
diff --git a/ocfs2_controld/ocfs2_controld.h b/ocfs2_controld/ocfs2_controld.h
index ed57fd9..e7a719d 100644
--- a/ocfs2_controld/ocfs2_controld.h
+++ b/ocfs2_controld/ocfs2_controld.h
@@ -25,15 +25,17 @@
 #ifndef __OCFS2_CONTROLD_H
 #define __OCFS2_CONTROLD_H
 
-
 #define DUMP_SIZE			(1024 * 1024)
 
+
 extern char *prog_name;
+extern struct list_head mounts;
 extern int daemon_debug_opt;
 extern char daemon_debug_buf[1024];
 extern char dump_buf[DUMP_SIZE];
 extern int dump_point;
 extern int dump_wrap;
+extern int our_nodeid;
 
 extern void daemon_dump_save(void);
 
@@ -67,5 +69,10 @@ void shutdown_daemon(void);
 
 int setup_cman(void);
 char *nodeid2name(int nodeid);
+int kill_cman(int nodeid);
+void exit_cman(void);
+
+int setup_cpg(void);
+void exit_cpg(void);
 
 #endif
-- 
1.5.3.8




More information about the Ocfs2-tools-devel mailing list