[Ocfs2-tools-devel] [PATCH 06/11] libo2dlm: Add fsdlm operations.

Joel Becker joel.becker at oracle.com
Tue May 27 18:44:26 PDT 2008


Support the full API for fsdlm.

Signed-off-by: Joel Becker <joel.becker at oracle.com>
---
 include/o2dlm/o2dlm.h |    3 +-
 libo2dlm/o2dlm.c      |  498 ++++++++++++++++++++++++++++++++++++++++---------
 libo2dlm/o2dlm_test.c |    3 +
 libocfs2/dlm.c        |   11 +-
 4 files changed, 426 insertions(+), 89 deletions(-)

diff --git a/include/o2dlm/o2dlm.h b/include/o2dlm/o2dlm.h
index e6aed2c..99bee5f 100644
--- a/include/o2dlm/o2dlm.h
+++ b/include/o2dlm/o2dlm.h
@@ -85,7 +85,8 @@ errcode_t o2dlm_unlock(struct o2dlm_ctxt *ctxt,
  * 'len' is the amount to read into 'lvb'
  *
  * We can only read LVB_MAX bytes out of the lock, even if you
- * specificy a len larger than that.
+ * specificy a len larger than that.  For classic o2dlm, LVB_MAX is
+ * 64 bytes.  For fsdlm, it is 32 bytes.
  * 
  * If you want to know how much was read, then pass 'bytes_read'
  */
diff --git a/libo2dlm/o2dlm.c b/libo2dlm/o2dlm.c
index 1c33f9a..bdf9a68 100644
--- a/libo2dlm/o2dlm.c
+++ b/libo2dlm/o2dlm.c
@@ -34,10 +34,12 @@
 #include <errno.h>
 #include <assert.h>
 #include <inttypes.h>
-
 #include <sys/statfs.h>
 #include <string.h>
 
+#include <errno.h>
+#include <libdlm.h>
+
 #include "o2dlm/o2dlm.h"
 #include "ocfs2-kernel/kernel-list.h"
 
@@ -54,22 +56,28 @@ struct o2dlm_lock_res
 	int                   l_flags; /* limited set of flags */
 	enum o2dlm_lock_level l_level; /* either PR or EX */
 	int                   l_fd;    /* the fd returned by the open call */
+	struct dlm_lksb       l_lksb;  /* lksb for fsdlm locking */
+	char                  l_lvb[DLM_LVB_LEN]; /* LVB for fsdlm */
 };
 
 struct o2dlm_ctxt
 {
+	int		ct_classic;
 	struct list_head *ct_hash;
 	unsigned int     ct_hash_size;
 	char             ct_domain_path[O2DLM_MAX_FULL_DOMAIN_PATH]; /* domain
 								      * dir */
 	char             ct_ctxt_lock_name[O2DLM_LOCK_ID_MAX_LEN];
+	dlm_lshandle_t   ct_handle;
 };
 
-#
+
 static errcode_t o2dlm_lock_nochecks(struct o2dlm_ctxt *ctxt,
 				     const char *lockid,
 				     int lockflags,
 				     enum o2dlm_lock_level level);
+static errcode_t o2dlm_unlock_lock_res(struct o2dlm_ctxt *ctxt,
+				       struct o2dlm_lock_res *lockres);
 
 static errcode_t o2dlm_generate_random_value(int64_t *value)
 {
@@ -134,8 +142,15 @@ static errcode_t o2dlm_alloc_ctxt(const char *mnt_path,
 		goto exit_and_free;
 	}
 
-	len = snprintf(ctxt->ct_domain_path, PATH_MAX + 1, "%s/%s",
-		       mnt_path, dirname);
+	if (mnt_path) {
+		ctxt->ct_classic = 1;
+		len = snprintf(ctxt->ct_domain_path, PATH_MAX + 1, "%s/%s",
+			       mnt_path, dirname);
+	} else {
+		ctxt->ct_classic = 0;
+		len = snprintf(ctxt->ct_domain_path, PATH_MAX + 1, "%s",
+			       dirname);
+	}
 	if (len == (PATH_MAX + 1)) {
 		err = O2DLM_ET_BAD_DOMAIN_DIR;
 		goto exit_and_free;
@@ -348,6 +363,8 @@ static struct o2dlm_lock_res *o2dlm_new_lock_res(const char *id,
 		lockres->l_flags = flags;
 		lockres->l_level = level;
 		lockres->l_fd    = -1;
+		lockres->l_lksb.sb_lvbptr = lockres->l_lvb;
+		memset(lockres->l_lksb.sb_lvbptr, 0, DLM_LVB_LEN);
 	}
 	return lockres;
 }
@@ -359,10 +376,9 @@ static struct o2dlm_lock_res *o2dlm_new_lock_res(const char *id,
  * Classic o2dlm
  */
 
-static errcode_t o2dlm_lock_nochecks_classic(struct o2dlm_ctxt *ctxt,
-					     const char *lockid,
-					     int lockflags,
-					     enum o2dlm_lock_level level)
+static errcode_t o2dlm_lock_classic(struct o2dlm_ctxt *ctxt,
+				    const char *lockid, int lockflags,
+				    enum o2dlm_lock_level level)
 {
 	int ret, flags, fd;
 	char *path;
@@ -411,8 +427,8 @@ static errcode_t o2dlm_lock_nochecks_classic(struct o2dlm_ctxt *ctxt,
 	return 0;
 }
 
-static errcode_t o2dlm_unlock_lock_res(struct o2dlm_ctxt *ctxt,
-				       struct o2dlm_lock_res *lockres)
+static errcode_t o2dlm_unlock_lock_res_classic(struct o2dlm_ctxt *ctxt,
+					       struct o2dlm_lock_res *lockres)
 {
 	int ret, len = PATH_MAX + 1;
 	char *path;
@@ -444,26 +460,6 @@ static errcode_t o2dlm_unlock_lock_res(struct o2dlm_ctxt *ctxt,
 	return 0;
 }
 
-static errcode_t o2dlm_unlock_classic(struct o2dlm_ctxt *ctxt, char *lockid)
-{
-	int ret;
-	struct o2dlm_lock_res *lockres;
-
-	lockres = o2dlm_find_lock_res(ctxt, lockid);
-	if (!lockres)
-		return O2DLM_ET_UNKNOWN_LOCK;
-
-	o2dlm_remove_lock_res(lockres);
-
-	ret = o2dlm_unlock_lock_res(ctxt, lockres);
-
-	free(lockres);
-
-	if (ret && (ret != O2DLM_ET_BUSY_LOCK))
-		return ret;
-	return 0;
-}
-
 static errcode_t o2dlm_read_lvb_classic(struct o2dlm_ctxt *ctxt,
 					char *lockid,
 					char *lvb,
@@ -614,6 +610,339 @@ free_and_exit:
 	return error;
 }
 
+static errcode_t o2dlm_initialize_classic(const char *dlmfs_path,
+					  const char *domain_name,
+					  struct o2dlm_ctxt **dlm_ctxt)
+{
+	errcode_t ret, dir_created = 0;
+	struct o2dlm_ctxt *ctxt;
+
+	if (!dlmfs_path)
+		return O2DLM_ET_INVALID_ARGS;
+
+	if (strlen(domain_name) >= O2DLM_DOMAIN_MAX_LEN)
+		return O2DLM_ET_NAME_TOO_LONG;
+
+	if ((strlen(dlmfs_path) + strlen(domain_name)) >
+	    O2DLM_MAX_FULL_DOMAIN_PATH)
+		return O2DLM_ET_NAME_TOO_LONG;
+
+	ret = o2dlm_check_user_dlmfs(dlmfs_path);
+	if (ret)
+		return ret;
+
+	ret = o2dlm_alloc_ctxt(dlmfs_path, domain_name, &ctxt);
+	if (ret)
+		return ret;
+
+	ret = o2dlm_check_domain_dir(ctxt);
+	if (ret) {
+		if (ret != O2DLM_ET_NO_DOMAIN_DIR) {
+			o2dlm_free_ctxt(ctxt);
+			return ret;
+		}
+
+		/* the domain does not yet exist - create it ourselves. */
+		ret = o2dlm_create_domain(ctxt);
+		if (ret) {
+			o2dlm_free_ctxt(ctxt);
+			return ret;
+		}
+		dir_created = 1;
+	}
+
+	/* What we want to do here is create a lock which we'll hold
+	 * open for the duration of this context. This way if another
+	 * process won't be able to shut down this domain underneath
+	 * us. */
+	ret = o2dlm_lock_nochecks(ctxt, ctxt->ct_ctxt_lock_name, 0,
+				  O2DLM_LEVEL_PRMODE);
+	if (ret) {
+		if (dir_created)
+			o2dlm_delete_domain_dir(ctxt); /* best effort
+							* cleanup. */
+		o2dlm_free_ctxt(ctxt);
+		return ret;
+	}
+
+	*dlm_ctxt = ctxt;
+	return 0;
+}
+
+/*
+ * fsdlm operations
+ */
+
+static void to_fsdlm_lock(enum o2dlm_lock_level level, int lockflags,
+			  uint32_t *fsdlm_mode, uint32_t *fsdlm_flags)
+{
+	switch (level) {
+		case O2DLM_LEVEL_PRMODE:
+			*fsdlm_mode = LKM_PRMODE;
+			break;
+		case O2DLM_LEVEL_EXMODE:
+			*fsdlm_mode = LKM_EXMODE;
+			break;
+		default:
+			*fsdlm_mode = LKM_NLMODE;
+			break;
+	}
+
+	if (lockflags & O2DLM_TRYLOCK)
+		*fsdlm_flags = LKF_NOQUEUE;
+	else
+		*fsdlm_flags = 0;
+}
+
+static errcode_t o2dlm_lock_fsdlm(struct o2dlm_ctxt *ctxt,
+				  const char *lockid, int lockflags,
+				  enum o2dlm_lock_level level)
+{
+	int rc;
+	errcode_t ret = 0;
+	struct o2dlm_lock_res *lockres;
+	uint32_t mode, flags;
+
+	lockres = o2dlm_find_lock_res(ctxt, lockid);
+	if (lockres)
+		return O2DLM_ET_RECURSIVE_LOCK;
+
+	lockflags &= O2DLM_VALID_FLAGS;
+	lockres = o2dlm_new_lock_res(lockid, level, lockflags);
+	if (!lockres)
+		return O2DLM_ET_NO_MEMORY;
+
+	to_fsdlm_lock(level, lockflags, &mode, &flags);
+	flags |= LKF_VALBLK;  /* Always use LVBs */
+	rc = dlm_ls_lock_wait(ctxt->ct_handle, mode, &lockres->l_lksb,
+			      flags, lockid, strlen(lockid),
+			      0, NULL, NULL, NULL);
+	if (rc)
+		rc = errno;
+	else
+		rc = lockres->l_lksb.sb_status;
+	switch (rc) {
+		case 0:
+			/* Success! */
+			break;
+		case EAGAIN:
+			if (lockflags & O2DLM_TRYLOCK)
+				ret = O2DLM_ET_TRYLOCK_FAILED;
+			else
+				ret = O2DLM_ET_LOCKING;
+			break;
+		case EINVAL:
+			ret = O2DLM_ET_INVALID_ARGS;
+			break;
+		case ENOMEM:
+			ret = O2DLM_ET_NO_MEMORY;
+			break;
+		case ECANCEL:
+			ret = O2DLM_ET_LOCKING;
+			break;
+		default:
+			ret = O2DLM_ET_INTERNAL_FAILURE;
+			break;
+	}
+
+	if (!ret)
+		o2dlm_insert_lock_res(ctxt, lockres);
+	else
+		free(lockres);
+
+	return ret;
+}
+
+static errcode_t o2dlm_unlock_lock_res_fsdlm(struct o2dlm_ctxt *ctxt,
+					     struct o2dlm_lock_res *lockres)
+{
+	int rc;
+	errcode_t ret = 0;
+
+	rc = dlm_ls_unlock_wait(ctxt->ct_handle, lockres->l_lksb.sb_lkid,
+				LKF_VALBLK, &lockres->l_lksb);
+	if (rc)
+		rc = errno;
+	else
+		rc = lockres->l_lksb.sb_status;
+
+	switch (rc) {
+		case 0:
+		case EUNLOCK:
+			/* Success! */
+			break;
+		case ENOTCONN:
+			ret = O2DLM_ET_SERVICE_UNAVAILABLE;
+			break;
+		case EINVAL:
+			ret = O2DLM_ET_INVALID_ARGS;
+			break;
+		case ENOENT:
+			ret = O2DLM_ET_UNKNOWN_LOCK;
+			break;
+		default:
+			ret = O2DLM_ET_INTERNAL_FAILURE;
+			break;
+	}
+
+	return ret;
+}
+
+static errcode_t o2dlm_write_lvb_fsdlm(struct o2dlm_ctxt *ctxt,
+				       char *lockid,
+				       const char *lvb,
+				       unsigned int len,
+				       unsigned int *bytes_written)
+{
+	struct o2dlm_lock_res *lockres;
+
+	if (!ctxt || !lockid || !lvb)
+		return O2DLM_ET_INVALID_ARGS;
+
+	lockres = o2dlm_find_lock_res(ctxt, lockid);
+	if (!lockres)
+		return O2DLM_ET_UNKNOWN_LOCK;
+
+	/* fsdlm only supports DLM_LVB_LEN for userspace locks */
+	if (len > DLM_LVB_LEN)
+		len = DLM_LVB_LEN;
+	memcpy(lockres->l_lksb.sb_lvbptr, lvb, len);
+	if (bytes_written)
+		*bytes_written = len;
+
+	return 0;
+}
+
+static errcode_t o2dlm_read_lvb_fsdlm(struct o2dlm_ctxt *ctxt,
+				      char *lockid,
+				      char *lvb,
+				      unsigned int len,
+				      unsigned int *bytes_read)
+{
+	struct o2dlm_lock_res *lockres;
+
+	lockres = o2dlm_find_lock_res(ctxt, lockid);
+	if (!lockres)
+		return O2DLM_ET_UNKNOWN_LOCK;
+
+	/* fsdlm only supports DLM_LVB_LEN for userspace locks */
+	if (len > DLM_LVB_LEN)
+		len = DLM_LVB_LEN;
+	memcpy(lvb, lockres->l_lksb.sb_lvbptr, len);
+	if (bytes_read)
+		*bytes_read = len;
+
+	return 0;
+}
+
+static errcode_t o2dlm_initialize_fsdlm(const char *domain_name,
+					struct o2dlm_ctxt **dlm_ctxt)
+{
+	errcode_t ret;
+	struct o2dlm_ctxt *ctxt;
+
+	if (strlen(domain_name) >= O2DLM_DOMAIN_MAX_LEN)
+		return O2DLM_ET_NAME_TOO_LONG;
+
+	ret = o2dlm_alloc_ctxt(NULL, domain_name, &ctxt);
+	if (ret)
+		return ret;
+
+	ctxt->ct_handle = dlm_create_lockspace(ctxt->ct_domain_path, 0600);
+	if (!ctxt->ct_handle && (errno == EEXIST))
+		ctxt->ct_handle = dlm_open_lockspace(ctxt->ct_domain_path);
+
+	if (!ctxt->ct_handle) {
+		switch (errno) {
+			case EINVAL:
+				ret = O2DLM_ET_INVALID_ARGS;
+				break;
+			case ENOMEM:
+				ret = O2DLM_ET_NO_MEMORY;
+				break;
+			case EACCES:
+			case EPERM:
+				ret = O2DLM_ET_BAD_DOMAIN_DIR;
+				break;
+			default:
+				ret = O2DLM_ET_INTERNAL_FAILURE;
+				break;
+		}
+		return ret;
+	}
+
+	/* What we want to do here is create a lock which we'll hold
+	 * open for the duration of this context. This way if another
+	 * process won't be able to shut down this domain underneath
+	 * us. */
+	ret = o2dlm_lock_nochecks(ctxt, ctxt->ct_ctxt_lock_name, 0,
+				  O2DLM_LEVEL_PRMODE);
+	if (ret) {
+		/* Ignore the error, we want ret to be propagated */
+		dlm_release_lockspace(ctxt->ct_domain_path,
+				      ctxt->ct_handle, 0);
+		o2dlm_free_ctxt(ctxt);
+		return ret;
+	}
+
+	*dlm_ctxt = ctxt;
+	return 0;
+}
+
+static errcode_t o2dlm_destroy_fsdlm(struct o2dlm_ctxt *ctxt)
+{
+	int i, rc;
+	errcode_t ret, error = 0;
+	struct o2dlm_lock_res *lockres;
+        struct list_head *p, *n, *bucket;
+
+	for(i = 0; i < ctxt->ct_hash_size; i++) {
+		bucket = &ctxt->ct_hash[i];
+
+		list_for_each_safe(p, n, bucket) {
+			lockres = list_entry(p, struct o2dlm_lock_res,
+					     l_bucket);
+
+			o2dlm_remove_lock_res(lockres);
+
+			ret = o2dlm_unlock_lock_res(ctxt, lockres);
+			if (ret && (ret != O2DLM_ET_BUSY_LOCK))
+				error = O2DLM_ET_FAILED_UNLOCKS;
+			free(lockres);
+		}
+	}
+	if (error)
+		goto free_and_exit;
+
+	rc = dlm_release_lockspace(ctxt->ct_domain_path, ctxt->ct_handle, 0);
+	if (!rc)
+		goto free_and_exit;
+
+	switch(errno) {
+		case EBUSY:
+			/* Do nothing */
+			break;
+		case EINVAL:
+			error = O2DLM_ET_INVALID_ARGS;
+			break;
+		case ENOMEM:
+			error = O2DLM_ET_NO_MEMORY;
+			break;
+		case EACCES:
+		case EPERM:
+			error = O2DLM_ET_BAD_DOMAIN_DIR;
+			break;
+		default:
+			error = O2DLM_ET_INTERNAL_FAILURE;
+			break;
+	}
+
+free_and_exit:
+	o2dlm_free_ctxt(ctxt);
+	return error;
+}
+
+
 /*
  * Public API
  */
@@ -630,7 +959,10 @@ static errcode_t o2dlm_lock_nochecks(struct o2dlm_ctxt *ctxt,
 	if (level != O2DLM_LEVEL_PRMODE && level != O2DLM_LEVEL_EXMODE)
 		return O2DLM_ET_INVALID_LOCK_LEVEL;
 
-	return o2dlm_lock_nochecks_classic(ctxt, lockid, lockflags, level);
+	if (ctxt->ct_classic)
+		return o2dlm_lock_classic(ctxt, lockid, lockflags, level);
+	else
+		return o2dlm_lock_fsdlm(ctxt, lockid, lockflags, level);
 }
 
 errcode_t o2dlm_lock(struct o2dlm_ctxt *ctxt,
@@ -648,13 +980,37 @@ errcode_t o2dlm_lock(struct o2dlm_ctxt *ctxt,
 	return o2dlm_lock_nochecks(ctxt, lockid, lockflags, level);
 }
 
+static errcode_t o2dlm_unlock_lock_res(struct o2dlm_ctxt *ctxt,
+				       struct o2dlm_lock_res *lockres)
+{
+	if (ctxt->ct_classic)
+		return o2dlm_unlock_lock_res_classic(ctxt, lockres);
+	else
+		return o2dlm_unlock_lock_res_fsdlm(ctxt, lockres);
+}
+
 errcode_t o2dlm_unlock(struct o2dlm_ctxt *ctxt,
 		       char *lockid)
 {
+	int ret;
+	struct o2dlm_lock_res *lockres;
+
 	if (!ctxt || !lockid)
 		return O2DLM_ET_INVALID_ARGS;
 
-	return o2dlm_unlock_classic(ctxt, lockid);
+	lockres = o2dlm_find_lock_res(ctxt, lockid);
+	if (!lockres)
+		return O2DLM_ET_UNKNOWN_LOCK;
+
+	o2dlm_remove_lock_res(lockres);
+
+	ret = o2dlm_unlock_lock_res(ctxt, lockres);
+
+	free(lockres);
+
+	if (ret && (ret != O2DLM_ET_BUSY_LOCK))
+		return ret;
+	return 0;
 }
 
 errcode_t o2dlm_read_lvb(struct o2dlm_ctxt *ctxt,
@@ -666,7 +1022,12 @@ errcode_t o2dlm_read_lvb(struct o2dlm_ctxt *ctxt,
 	if (!ctxt || !lockid || !lvb)
 		return O2DLM_ET_INVALID_ARGS;
 
-	return o2dlm_read_lvb_classic(ctxt, lockid, lvb, len, bytes_read);
+	if (ctxt->ct_classic)
+		return o2dlm_read_lvb_classic(ctxt, lockid, lvb, len,
+					      bytes_read);
+	else
+		return o2dlm_read_lvb_fsdlm(ctxt, lockid, lvb, len,
+					    bytes_read);
 }
 
 errcode_t o2dlm_write_lvb(struct o2dlm_ctxt *ctxt,
@@ -678,67 +1039,27 @@ errcode_t o2dlm_write_lvb(struct o2dlm_ctxt *ctxt,
 	if (!ctxt || !lockid || !lvb)
 		return O2DLM_ET_INVALID_ARGS;
 
-	return o2dlm_write_lvb_classic(ctxt, lockid, lvb, len,
-				       bytes_written);
+	if (ctxt->ct_classic)
+		return o2dlm_write_lvb_classic(ctxt, lockid, lvb, len,
+					       bytes_written);
+	else
+		return o2dlm_write_lvb_fsdlm(ctxt, lockid, lvb, len,
+					     bytes_written);
 }
 
+/* NULL dlmfs_path means fsdlm */
 errcode_t o2dlm_initialize(const char *dlmfs_path,
 			   const char *domain_name,
 			   struct o2dlm_ctxt **dlm_ctxt)
 {
-	errcode_t ret, dir_created = 0;
-	struct o2dlm_ctxt *ctxt;
-
-	if (!dlmfs_path || !domain_name || !dlm_ctxt)
+	if (!domain_name || !dlm_ctxt)
 		return O2DLM_ET_INVALID_ARGS;
 
-	if (strlen(domain_name) >= O2DLM_DOMAIN_MAX_LEN)
-		return O2DLM_ET_NAME_TOO_LONG;
-
-	if ((strlen(dlmfs_path) + strlen(domain_name)) >
-	    O2DLM_MAX_FULL_DOMAIN_PATH)
-		return O2DLM_ET_NAME_TOO_LONG;
-
-	ret = o2dlm_check_user_dlmfs(dlmfs_path);
-	if (ret)
-		return ret;
-
-	ret = o2dlm_alloc_ctxt(dlmfs_path, domain_name, &ctxt);
-	if (ret)
-		return ret;
-
-	ret = o2dlm_check_domain_dir(ctxt);
-	if (ret) {
-		if (ret != O2DLM_ET_NO_DOMAIN_DIR) {
-			o2dlm_free_ctxt(ctxt);
-			return ret;
-		}
-
-		/* the domain does not yet exist - create it ourselves. */
-		ret = o2dlm_create_domain(ctxt);
-		if (ret) {
-			o2dlm_free_ctxt(ctxt);
-			return ret;
-		}
-		dir_created = 1;
-	}
-
-	/* What we want to do here is create a lock which we'll hold
-	 * open for the duration of this context. This way if another
-	 * process won't be able to shut down this domain underneath
-	 * us. */
-	ret = o2dlm_lock_nochecks(ctxt, ctxt->ct_ctxt_lock_name, 0,
-				  O2DLM_LEVEL_PRMODE);
-	if (ret) {
-		if (dir_created)
-			o2dlm_delete_domain_dir(ctxt); /* best effort
-							* cleanup. */
-		o2dlm_free_ctxt(ctxt);
-		return ret;
-	}
-
-	*dlm_ctxt = ctxt;
-	return 0;
+	if (dlmfs_path)
+		return o2dlm_initialize_classic(dlmfs_path, domain_name,
+						dlm_ctxt);
+	else
+		return o2dlm_initialize_fsdlm(domain_name, dlm_ctxt);
 }
 
 errcode_t o2dlm_destroy(struct o2dlm_ctxt *ctxt)
@@ -746,5 +1067,8 @@ errcode_t o2dlm_destroy(struct o2dlm_ctxt *ctxt)
 	if (!ctxt)
 		return O2DLM_ET_INVALID_ARGS;
 
-	return o2dlm_destroy_classic(ctxt);
+	if (ctxt->ct_classic)
+		return o2dlm_destroy_classic(ctxt);
+	else
+		return o2dlm_destroy_fsdlm(ctxt);
 }
diff --git a/libo2dlm/o2dlm_test.c b/libo2dlm/o2dlm_test.c
index 830a339..b89d0d5 100644
--- a/libo2dlm/o2dlm_test.c
+++ b/libo2dlm/o2dlm_test.c
@@ -346,6 +346,9 @@ int main(int argc, char **argv)
 	if (argc < 2) {
 		dlmfs_path = DEFAULT_DLMFS_PATH;
 		printf("No fs path provided, using %s\n", dlmfs_path);
+	} else if (!strcmp(argv[1], "-u")) {
+		dlmfs_path = NULL;
+		printf("Using fsdlm\n");
 	} else {
 		dlmfs_path = argv[1];
 		printf("Using fs at %s\n", dlmfs_path);
diff --git a/libocfs2/dlm.c b/libocfs2/dlm.c
index c244cc0..1d1b30f 100644
--- a/libocfs2/dlm.c
+++ b/libocfs2/dlm.c
@@ -138,6 +138,7 @@ errcode_t ocfs2_initialize_dlm(ocfs2_filesys *fs, const char *service)
 	errcode_t ret = 0;
 	struct o2cb_cluster_desc cluster;
 	struct o2cb_region_desc desc;
+	char *stack_path;
 
 	ret = ocfs2_fill_cluster_desc(fs, &cluster);
 	if (ret)
@@ -153,7 +154,15 @@ errcode_t ocfs2_initialize_dlm(ocfs2_filesys *fs, const char *service)
 	if (ret)
 		goto bail;
 
-	ret = o2dlm_initialize(DEFAULT_DLMFS_PATH, fs->uuid_str, &dlm_ctxt);
+	/*
+	 * NULL c_stack means o2cb, means use DLMFS, means
+	 * pass DLMFS_PATH.  If we're using a userspace stack, pass NULL.
+	 */
+	if (cluster.c_stack)
+		stack_path = NULL;
+	else
+		stack_path = DEFAULT_DLMFS_PATH;
+	ret = o2dlm_initialize(stack_path, fs->uuid_str, &dlm_ctxt);
 	if (ret) {
 		/* What to do with an error code? */
 
-- 
1.5.4.5




More information about the Ocfs2-tools-devel mailing list