[Ocfs2-commits] mfasheh commits r1593 - branches/dlm-glue/src

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Mon Oct 25 20:06:25 CDT 2004


Author: mfasheh
Date: 2004-10-25 20:06:23 -0500 (Mon, 25 Oct 2004)
New Revision: 1593

Added:
   branches/dlm-glue/src/dlmglue.c
   branches/dlm-glue/src/dlmglue.h
Log:
* start adding the glue code. this is barely linked into the rest of
  the file system at this point.



Added: branches/dlm-glue/src/dlmglue.c
===================================================================
--- branches/dlm-glue/src/dlmglue.c	2004-10-26 01:03:25 UTC (rev 1592)
+++ branches/dlm-glue/src/dlmglue.c	2004-10-26 01:06:23 UTC (rev 1593)
@@ -0,0 +1,1349 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * middle.c
+ *
+ * description here
+ *
+ * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ */
+
+
+#include "middle.h"
+
+/* lock ids are made up in the following manner:
+ * name[0]     --> type
+ * name[1-6]   --> 6 pad characters, reserved for now
+ * name[7-22]  --> block number, expressed in hex as 16 chars
+ * name[23-30] --> i_generation, expressed in hex 8 chars
+ * name[31]    --> '\0' */
+#define OCFS2_LOCK_ID_MAX_LEN  32
+#define OCFS2_LOCK_ID_PAD "000000"
+
+enum ocfs2_lock_type {
+	OCFS_TYPE_META = 0,
+	OCFS_TYPE_DATA,
+	OCFS_NUM_LOCK_TYPES
+};
+
+static char ocfs2_lock_type_char[OCFS_NUM_LOCK_TYPES] = {
+	[OCFS_TYPE_META]	'M',
+	[OCFS_TYPE_DATA] 	'D'
+};
+
+static int ocfs2_build_lock_name(enum ocfs_lock_type type,
+				 u64 blkno,
+				 u32 generation,
+				 char **ret);
+
+static void ocfs2_ast_func(void *opaque);
+/* so far, all locks have gotten along with the same BAST. */
+static void ocfs2_bast_func(void *opaque, dlm_lock_type type);
+
+static dlm_astlockfunc_t ocfs2_lock_type_asts[OCFS_NUM_LOCK_TYPES] = {
+	[OCFS_TYPE_META]	ocfs2_ast_func,
+	[OCFS_TYPE_DATA] 	ocfs2_ast_func
+};
+static dlm_astlockfunc_t ocfs2_lock_type_basts[OCFS_NUM_LOCK_TYPES] = {
+	[OCFS_TYPE_META]	ocfs2_bast_func,
+	[OCFS_TYPE_DATA] 	ocfs2_bast_func
+};
+
+/* Called after we refresh our inode, only has any effect if we have
+ * an EX lock. This populates the LVB with the initial values for our
+ * change set. */
+static void ocfs2_reset_meta_lvb_values(struct inode *inode);
+static inline int ocfs2_lvb_is_trustable(ocfs2_lock *lock)
+{
+	ocfs2_lvb *lvb = (ocfs2_lvb *) lock->l_lksb.lvb;
+	int ret = 0;
+
+	spin_lock(&lock->l_lockres.lr_lock);
+	if (lvb->lvb_seq &&
+	    lock->l_local_seq == lvb->lvb_seq)
+		ret = 1;
+	spin_unlock(&lock->l_lockres.lr_lock);
+
+	return ret;
+}
+
+static inline void ocfs2_set_local_seq_from_lvb(ocfs2_lock *lock)
+{
+	ocfs2_lvb *lvb = (ocfs2_lvb *) lock->l_lksb.lvb;
+
+	spin_lock(&lock->l_lockres.lr_lock);
+	if (lvb->lvb_seq)
+		lock->l_local_seq = lvb->lvb_seq;
+	spin_unlock(&lock->l_lockres.lr_lock);
+}
+
+/* fill in new values as we add them to the lvb. */
+static inline void ocfs2_meta_lvb_get_values(struct inode *inode, 
+					     unsigned int *trunc_clusters)
+{
+	ocfs2_lock_res *lockres = &OCFS_I(inode)->ip_lockres;
+	ocfs2_lock *lock = &lockres->lr_meta;
+	ocfs2_meta_lvb *lvb;
+
+	spin_lock(&lockres->lr_lock);
+	OCFS_ASSERT(lock->l_level > LKM_NLMODE);
+
+	lvb = lock->l_lksb.lvb;
+	if (trunc_clusters)
+		*trunc_clusters = lvb->lvb_trunc_clusters;
+
+	spin_unlock(&lockres->lr_lock);
+}
+
+static int ocfs2_build_lock_name(enum ocfs_lock_type type,
+				 u64 blkno,
+				 u32 generation,
+				 char **ret)
+{
+	int len;
+	char *name = NULL;
+
+	LOG_ENTRY();
+
+	OCFS_ASSERT(type < OCFS_NUM_LOCK_TYPES);
+
+	name = kmalloc(OCFS2_LOCK_ID_MAX_LEN, GFP_KERNEL);
+	if (!name) {
+		len = -ENOMEM;
+		goto bail;
+	}
+	memset(name, 0, OCFS2_LOCK_ID_MAX_LEN);
+
+	len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN - 1, "%c%s%016llx%08x", 
+		       ocfs2_lock_type_char[type], OCFS2_LOCK_ID_PAD, blkno, 
+		       generation);
+
+	OCFS_ASSERT(len = (OCFS2_LOCK_ID_MAX_LEN - 1));
+
+	printk("built lock resource with name: %s\n", name);
+
+	*ret = name;
+bail:
+	LOG_EXIT();
+	return (len);
+}
+
+int ocfs2_lock_res_init(struct inode *inode,
+			ocfs2_lock_res *res)
+{
+	int status;
+
+	LOG_ENTRY();
+
+	memset(res, 0, sizeof(ocfs2_lock_res));
+
+	spin_lock_init(&res->lr_lock);
+	init_waitqueue_head(&res->lr_busy);
+	init_waitqueue_head (&res->lr_blocked);
+	init_waitqueue_head(&res->lr_refreshing);
+	res->lr_inode = inode;
+
+	/* build the data and metadata locks. */
+	status = ocfs2_build_lock_name(OCFS_TYPE_META, 
+				       OCFS_I(inode)->ip_blkno,
+				       inode->i_generation,
+				       res->lr_meta.l_name);
+	if (status < 0) {
+		LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+	status = ocfs2_build_lock_name(OCFS_TYPE_DATA,
+				       OCFS_I(inode)->ip_blkno,
+				       inode->i_generation,
+				       res->lr_data.l_name);
+	if (status < 0) {
+		kfree(res->lr_meta.l_name);
+		res->lr_meta.l_name = NULL;
+		LOG_ERROR_STATUS(status);
+		goto bail;
+	}	
+	res->lr_meta.l_level = LKM_IVMODE;
+	res->lr_meta.l_lockres = res;
+	res->lr_data.l_level = LKM_IVMODE;
+	res->lr_data.l_lockres = res;
+bail:
+	LOG_EXIT_STATUS(status);
+	return status;
+}
+
+void ocfs2_lock_res_free(ocfs2_lock_res *res)
+{
+	if (res->lr_data.l_name)
+		kfree(res->lr_data.l_name);
+	if (res->lr_meta.l_name)
+		kfree(res->lr_meta.l_name);
+}
+
+static inline void ocfs2_inc_holders(ocfs2_lock *lock,
+				     dlm_lock_type level)
+{
+	OCFS_ASSERT(lock);
+
+	switch(level) {
+	case LKM_EXMODE:
+		lock->l_ex_holders++;
+		break;
+	case LKM_PRMODE:
+		lock->l_ro_holders++;
+		break;
+	default:
+		BUG();
+	}
+}
+
+static inline void ocfs2_dec_holders(ocfs2_lock *lock,
+				     dlm_lock_type level)
+{
+	OCFS_ASSERT(lock);
+
+	switch(level) {
+	case LKM_EXMODE:
+		OCFS_ASSERT(lock->l_ex_holders);
+		lock->l_ex_holders--;
+		break;
+	case LKM_PRMODE:
+		OCFS_ASSERT(lock->l_ro_holders);
+		lock->l_ro_holders--;
+		break;
+	default:
+		BUG();
+	}
+}
+
+static inline void ocfs2_handle_downconvert_action(struct inode *inode,
+						   ocfs2_lock *lock)
+{
+	OCFS_ASSERT(lock->l_flags & OCFS2_LOCK_BUSY);
+	OCFS_ASSERT(lock->l_flags & OCFS2_LOCK_ATTACHED);
+	OCFS_ASSERT(lock->l_flags & OCFS2_LOCK_BLOCKED);
+
+	lock->l_level = lock->l_requested;
+	lock->l_blocking = LKM_NLMODE;
+	lock->l_flags &= ~OCFS2_LOCK_BLOCKED;
+	lock->l_flags &= ~OCFS2_LOCK_BUSY;
+	wake_up_all(&lockres->lr_blocked);
+}
+
+static inline void ocfs2_handle_data_convert_action(struct inode *inode,
+						    ocfs2_lock *lock)
+{
+	OCFS_ASSERT(lock->l_flags & OCFS2_LOCK_BUSY);
+	OCFS_ASSERT(lock->l_flags & OCFS2_LOCK_ATTACHED);
+
+	lock->l_level = lock->l_requested;
+	lock->l_flags &= ~OCFS2_LOCK_BUSY;
+}
+
+static inline void ocfs2_handle_meta_convert_action(struct inode *inode,
+						    ocfs2_lock *lock)
+{
+	ocfs_super *osb = OCFS2_SB(inode->i_sb);
+
+	OCFS_ASSERT(lock->l_flags & OCFS2_LOCK_BUSY);
+	OCFS_ASSERT(lock->l_flags & OCFS2_LOCK_ATTACHED);
+
+	/* Convert from RO to EX doesn't really need anything as our
+	 * information is already up to data. Convert from NL to
+	 * *anything* however should mark the inode as needing an
+	 * update. */
+	if (lock->l_level == LKM_NLMODE) {
+		ocfs_inc_inode_seq(osb, inode);
+		lock->l_flags |= OCFS2_LOCK_NEEDS_REFRESH;
+	}
+
+	lock->l_level = lock->l_requested;
+	lock->l_flags &= ~OCFS2_LOCK_BUSY;
+}
+
+static inline void ocfs2_handle_attach_action(ocfs2_lock *lock)
+{
+	OCFS_ASSERT(lock->l_flags & OCFS2_LOCK_BUSY);
+	OCFS_ASSERT(!(lock->l_flags & OCFS2_LOCK_ATTACHED));
+
+	lock->l_level = lock->l_requested;
+	lock->l_flags |= OCFS2_LOCK_ATTACHED;
+	/* should this part be in ocfs2_ast_func? */
+	lock->l_flags &= ~OCFS2_LOCK_BUSY;
+}
+
+/* can we get a lock type in this proto to? 
+ * warning, this ast is for metadata locks only. */
+static void ocfs2_ast_func(void *opaque)
+{
+	ocfs2_lock *lock = data;
+	ocfs2_lock_res *lockres = lock->l_lockres;
+	struct inode *inode = lockres->lr_inode;
+	dlm_lockstatus *lksb;
+
+	printk("AST fired for inode %llu\n", OCFS_I(inode)->ip_blkno);
+	OCFS_ASSERT((lock == lockres->lr_meta) || (lock == lockres->lr_data));
+
+	spin_lock(&lockres->lr_lock);
+	lksb = &(lock->l_lksb);
+	if (lksb->status != DLM_NORMAL) {
+		printk("ocfs2_meta_ast_func: lksb status value of %u on "
+		       "inode %llu\n", lksb->status, OCFS_I(inode)->ip_blkno);
+		spin_unlock(&lockres->lr_lock);
+		return;
+	}
+
+	switch(lock->l_action) {
+	case OCFS2_AST_ATTACH:
+		ocfs2_handle_attach_action(lock);
+		break;
+	case OCFS2_AST_CONVERT:
+		if (lock == lockres->lr_meta)
+			ocfs2_handle_meta_convert_action(inode, lock);
+		else
+			ocfs2_handle_data_convert_action(inode, lock);
+		break;
+	case OCFS2_AST_DOWNCONVERT:
+		ocfs2_handle_downconvert_action(inode, lock);
+		break;
+	default:
+		BUG();
+	}
+	/* set it to something invalid so if we get called again we
+	 * can catch it. */
+	lock->l_action = OCFS2_AST_INVALID;
+	spin_unlock(&lockres->lr_lock);
+	wake_up_all(&lockres->lr_busy);
+}
+
+static void ocfs2_bast_func(void *opaque, dlm_lock_type type)
+{
+	ocfs2_lock *lock = data;
+	ocfs2_lock_res *lockres = lock->l_lockres;
+	struct inode *inode = lockres->lr_inode;
+	ocfs_super *osb = OCFS2_SB(inode->i_sb);
+
+	printk("BAST fired for inode %llu\n", OCFS_I(inode)->ip_blkno);
+	spin_lock(&lockres->lr_lock);
+	lock->l_flags |= OCFS2_LOCK_BLOCKED;
+	if (type > lock->l_blocking)
+		lock->l_blocking = type;
+	spin_unlock(&lockres->lr_lock);
+	ocfs2_schedule_blocked_inode(inode);
+	ocfs2_kick_vote_thread(osb);
+}
+
+static int ocfs2_lock_create(struct inode *inode,
+			     enum ocfs2_lock_type type,
+			     ocfs2_lock *lock)
+{
+	int ret = 0;
+	dlm_status status;
+	ocfs_super *osb = OCFS2_SB(inode->i_sb);
+	ocfs2_lock_res *lockres = OCFS_I(inode)->ip_lockres;
+
+	LOG_ENTRY();
+
+	spin_lock(&lockres->lr_lock);
+	if (lock->l_flags & OCFS2_LOCK_ATTACHED) {
+		spin_unlock(&lockres->lr_lock);
+		goto bail;
+	}
+
+	lock->l_action = OCFS2_AST_ATTACH;
+	lock->l_requested = LKM_NLMODE;
+	lock->l_flags |= OCFS2_LOCK_BUSY;
+	spin_unlock(&lockres->lr_lock);
+
+	status = dlmlock(osb->dlm,
+			 LKM_NLMODE,
+			 0,
+			 &lock->l_lksb,
+			 lock->l_name,
+			 ocfs2_ast_funcs[type],
+			 ocfs2_bast_funcs[type],
+			 lock);
+	if (status != DLM_NORMAL) {
+		LOG_ERROR_ARGS("Dlm returns %d\n", status);
+		ret = -ENOENT;
+		ocfs2_recover_from_dlm_error(lockres, lock);
+	}
+
+bail:
+	LOG_EXIT_STATUS(ret);
+	return ret;
+}
+
+static inline void ocfs2_recover_from_dlm_error(ocfs2_lock_res *lockres,
+						ocfs2_lock *lock)
+{
+	spin_lock(&lockres->lock);
+	lock->l_flags &= ~OCFS2_LOCK_BUSY;
+	lock->l_action = OCFS2_AST_INVALID;
+	spin_unlock(&lockres->lr_lock);
+}
+
+static inline int ocfs2_check_wait_flag(ocfs2_lock_res lockres,
+					ocfs2_lock *lock,
+					int flag)
+{
+	int ret;
+	spin_lock(&lockres->lr_lock);
+	ret = lock->l_flags & flag;
+	spin_unlock(&lockres->lr_lock);
+	return ret;
+}
+
+static inline void ocfs2_wait_on_busy_lock(ocfs2_lock_res *lockres,
+					   ocfs2_lock *lock)
+
+{
+	wait_event_interruptible(&lock->lr_busy,
+				 !ocfs2_check_wait_flag(lockres,
+							lock,
+							OCFS2_LOCK_BUSY));
+}
+
+static inline void ocfs2_wait_on_blocked_lock(ocfs2_lock_res *lockres,
+					      ocfs2_lock *lock)
+
+{
+	wait_event_interruptible(&lock->lr_blocked,
+				 !ocfs2_check_wait_flag(lockres,
+							lock,
+							OCFS2_LOCK_BLOCKED));
+}
+
+static inline void ocfs2_wait_on_refreshing_lock(ocfs2_lock_res *lockres,
+						 ocfs2_lock *lock)
+
+{
+	wait_event_interruptible(&lock->lr_blocked,
+				 !ocfs2_check_wait_flag(lockres,
+							lock,
+							OCFS2_LOCKC_REFRESHING));
+}
+
+static int ocfs2_cluster_lock(struct inode *inode, 
+			      enum ocfs2_lock_type type,
+			      ocfs2_lock *lock, 
+			      dlm_lock_type level)
+{
+	ocfs_super *osb = OCFS2_SB(inode->i_sb);
+	ocfs2_lock_res *lockres = OCFS_I(inode)->ip_lockres;
+	dlm_status status;
+	int ret;
+
+	LOG_ENTRY();
+
+again:
+	if (signal_pending(current)) {
+		ret = -EINTR;
+		goto bail;
+	}
+
+	spin_lock(&lockres->lr_lock);
+	if (lock->l_flags & OCFS2_LOCK_BUSY) {
+		/* is someone sitting in dlm_lock? If so, wait on
+		 * them. */
+		spin_unlock(lockres->lr_lock);
+		ocfs2_wait_on_busy_lock(lockres, lock);
+		goto again;
+	}
+
+	if (!(lock->l_flags & OCFS2_LOCK_ATTACHED)) {
+		/* lock has not been created yet. */
+		spin_unlock(&lockres->lr_lock);
+		ret = ocfs_lock_create(inode, lock);
+		if (ret < 0) {
+			LOG_ERROR_STATUS(ret);
+			goto bail;
+		}
+		goto again;
+	}
+
+	if (lock->l_flags & OCFS2_LOCK_BLOCKED) {
+		/* is the lock is currently blocked on behalf of
+		 * another node */
+		spin_unlock(&lockres->lr_lock);
+		ocfs2_wait_on_blocked_lock(lockres, lock);
+		goto again;
+	}
+
+	if (level > lock->l_level) {
+		lock->l_action = OCFS2_AST_CONVERT;
+		lock->l_requested = level;
+		lock->l_flags |= OCFS2_LOCK_BUSY;
+		spin_unlock(&lockres->lock);
+
+		/* call dlm_lock to upgrade lock now */
+		status = dlmlock(osb->dlm,
+				 level,
+				 LKM_CONVERT|LKM_VALBLK,
+				 &lock->l_lksb,
+				 lock->l_name,
+				 ocfs2_lock_type_asts[type],
+				 ocfs2_lock_type_basts[type],
+				 lock);
+		if (status != DLM_NORMAL) {
+			LOG_ERROR_ARGS("Dlm returns %d\n", status);
+			ret = -ENOENT;
+			ocfs2_recover_from_dlm_error(lockres, lock);
+			goto bail;
+		}
+
+		ocfs2_wait_on_busy_lock(lockres, lock);
+		goto again;
+	}
+
+	/* Ok, if we get here then we're good to go. */
+	ocfs2_inc_lock_holders(lock, level);
+
+	spin_unlock(&lockres->lr_lock);
+
+bail:
+	LOG_EXIT_STATUS(ret);
+	return ret;
+}
+
+int ocfs2_data_lock(struct inode *inode,
+		    int write)
+{
+	int status, level;
+	ocfs2_lock *lock;
+
+	OCFS_ASSERT(inode);
+
+	LOG_ENTRY();
+
+	lock = &(OCFS_I(inode)->ip_lockres.lr_data);
+
+	level = write ? LKM_EXMODE : LKM_PRMODE;
+
+	status = ocfs2_cluster_lock(inode, OCFS_TYPE_DATA, lock, level);
+	if (status < 0)
+		LOG_ERROR_STATUS(status);
+
+	LOG_EXIT_STATUS(status);
+	return status;
+}
+
+void ocfs2_data_vote_on_unlock(struct inode *inode)
+{
+	ocfs2_lock_res *lockres = &OCFS_I(inode)->ip_lockres;
+	ocfs2_lock *lock = lockres->lr_data;
+	int kick = 0;
+
+	/* Data locking needs to be fast. If we know that another node
+	 * is waiting on our lock, don't just wait for the vote thread
+	 * to timeout before it processes this -- kick it
+	 * pre-emptively when we reach a release condition. */
+	spin_lock(&lockres->lr_lock);
+	if (lock->l_flags & OCFS2_LOCK_BLOCKED) {
+		switch(lock->l_blocking) {
+		case LKM_EXMODE:
+			if (!lock->l_ex_holders && !lock->l_ro_holders)
+				kick = 1;
+			break;
+		case LKM_PRMODE:
+			if (!lock->l_ex_holders)
+				kick = 1;
+			break;
+		default:
+			BUG();
+		}
+	}
+	spin_unlock(&lockres->lr_lock);
+
+	if (kick)
+		kick_vote_thread(OCFS2_SB(inode->i_sb));
+}
+
+int ocfs2_data_unlock(struct inode *inode,
+		      int write)
+{
+	int level;
+	ocfs2_lock *lock;
+
+	OCFS_ASSERT(inode);
+
+	lock = &(OCFS_I(inode)->ip_lockres.lr_data);
+	level = write ? LKM_EXMODE : LKM_PRMODE;
+	ocfs2_dec_holders(lock, level);
+
+	ocfs2_data_vote_on_unlock(inode);
+
+	return 0;
+}
+
+static inline int ocfs2_wait_on_recovery(ocfs_super *osb)
+{
+	/* recovery_count is incremented once for every dead node and
+	 * decremented once when it gets recovered. */
+	wait_event_interruptible(&osb->recovery_event,
+				 !atomic_read(&osb->num_recovery_threads));
+
+	if (signal_pending(current))
+		return -EINTR;
+
+	return 0;
+}
+
+/* Call this with the lockres locked. I am reasonably sure we don't
+ * need ip_lock in this function as anyone who would be changing those
+ * values is supposed to be blocked in ocfs2_meta_lock right now. */
+void __ocfs2_stuff_meta_lvb(struct inode *inode)
+{
+	ocfs_inode_private *oip = OCFS_I(inode);
+	ocfs2_lock_res *lockres = oip->ip_lockres;
+	ocfs2_meta_lvb *lvb     = lockres->lr_meta.l_lksb.lvb;
+
+	ip_clusters = oip->ip_clusters;
+
+	lvb->lvb_iclusters = oip->ip_clusters;
+	lvb->lvb_iuid      = inode->i_uid;
+	lvb->lvb_igid      = inode->i_gid;
+	lvb->lvb_isize     = inode->i_size;
+	lvb->lvb_imode     = inode->i_mode;
+	lvb->lvb_inlink    = inode->i_nlink;
+	lvb->lvb_iatime    = ocfs_get_seconds(inode->i_atime);
+	lvb->lvb_ictime    = ocfs_get_seconds(inode->i_ctime);
+	lvb->lvb_imtime    = ocfs_get_seconds(inode->i_mtime);
+}
+
+void ocfs2_refresh_inode_from_lvb(struct inode *inode)
+{
+	ocfs_inode_private *oip = OCFS_I(inode);
+	ocfs2_lock_res *lockres = oip->ip_lockres;
+	ocfs2_meta_lvb *lvb     = lockres->lr_meta.l_lksb.lvb;
+
+	/* We're safe here without the lockres lock... */
+	spin_lock(&oip->ip_lock);
+	oip->ip_clusters = lvb->lvb_iclusters;
+	inode->i_uid     = lvb->lvb_iuid;
+	inode->i_gid     = lvb->lvb_igid;
+	inode->i_size    = lvb->lvb_isize;
+	inode->i_mode    = lvb->lvb_imode;
+	inode->i_nlink   = lvb->lvb_inlink;
+	inode->i_blocks  = (inode->i_size + osb->sb->s_blocksize - 1) 
+		>> osb->sb->s_blocksize_bits;
+	OCFS_SET_INODE_TIME(inode, i_atime, lvb->lvb_iatime);
+	OCFS_SET_INODE_TIME(inode, i_ctime, lvb->lvb_ictime);
+	OCFS_SET_INODE_TIME(inode, i_mtime, lvb->lvb_imtime);
+	spin_unlock(&oip->ip_lock);
+}
+
+static void ocfs2_reset_meta_lvb_values(struct inode *inode)
+{
+	ocfs2_lock_res *lockres = &OCFS_I(inode)->ip_lockres;
+	ocfs2_lock *lock = &lockres->lr_meta;
+	ocfs2_meta_lvb *lvb = (ocfs2_meta_lvb *) lock->l_lksb.lvb;
+	u32 i_clusters;
+
+	spin_lock(&OCFS_I(inode)->ip_lock);
+	i_clusters = OCFS_I(inode)->ip_clusters;
+	spin_unlock(&OCFS_I(inode)->ip_lock);
+
+	spin_lock(&lockres.lr_lock);
+	if (lock->l_level == LKM_EXMODE)
+		lvb->trunc_clusters = i_clusters;
+	spin_unlock(&lockres.lr_lock);
+}
+
+void __ocfs2_lvb_on_downconvert(ocfs2_lock *lock, dlm_lock_type new_level)
+{
+	ocfs2_lvb *lvb = (ocfs2_lvb *) lock->l_lksb.lvb;
+
+	if (lock->l_level == LKM_EXMODE) {
+		lvb->lvb_seq++;
+		/* Overflow? */
+		if (!lvb->lvb_seq)
+			lvb->lvb_seq = 1;
+		lock->l_local_seq = lvb->lvb_seq;
+		if (new_level == LKM_NLMODE)
+			lock->l_local_seq++;
+	} else if (lock->l_level == LKM_PRMODE) {
+		if (lvb->lvb_seq)
+			lock->l_local_seq++;
+	}
+}
+
+/* may or may not return a bh if it went to disk. */
+int ocfs2_meta_lock_update(struct inode *inode, struct buffer_head **bh)
+{
+	int status = 0;
+	u32 trustable_clusters = 0;
+	ocfs2_lock_res *lockres;
+	ocfs2_lock *lock;
+	ocfs2_dinode *fe;
+
+	lockres = &OCFS_I(inode)->ip_lockres;
+	lock = &lockres->lr_meta;
+
+refresh_check:
+	spin_lock(&lockres->lr_lock);
+	if (!(lock->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
+		spin_unlock(&lockres->lr_lock);
+		goto bail;
+	}
+
+	if (lock->l_flags & OCFS2_LOCK_REFRESHING) {
+		spin_unlock(&lockres->lr_lock);
+		if (signal_pending(current)) {
+			status = -EINTR;
+			goto bail;
+		}
+		ocfs2_wait_on_refreshing_lock(lockres, lock);
+		goto refresh_check:
+	}
+
+	/* Ok, I'll be the one to refresh this lock. */
+	lock->l_flags |= OCFS2_LOCK_REFRESHING;
+	spin_unlock(&lockres->lr_lock);
+
+	/* we don't want to use the LVB for bitmap files as the
+	 * used/set bit union is not currently sent over the wire. */
+	if (!(OCFS_I(inode)->ip_flags & OCFS_INODE_BITMAP) &&
+	    ocfs2_lvb_is_trustable(lock)) {
+		/* yay, fastpath! */
+		ocfs2_meta_lvb_get_values(inode, &trustable_clusters);
+		ocfs2_refresh_inode_from_lvb(inode);
+	} else {
+		/* Boo, we have to go to disk. */
+		/* read bh, cast, ocfs_refresh_inode */
+		status = ocfs_read_block(osb, OCFS_I(inode)->ip_blkno, bh,
+					 OCFS_BH_CACHED, inode);
+		if (status < 0) {
+			LOG_ERROR_STATUS(status);
+			goto bail;
+		}
+		fe = (ocfs2_inode *) (*bh)->b_data;
+
+		/* This is a good chance to make sure we're not
+		 * locking an invalid object. */
+		OCFS_ASSERT(IS_VALID_FILE_ENTRY(fe));
+		OCFS_ASSERT(inode->i_generation == 
+			    le32_to_cpu(fe->i_generation));
+		if ((fe->i_dtime) || (!(fe->i_flags & OCFS2_VALID_FL)))
+			BUG();
+
+		ocfs_refresh_inode(inode, fe);
+	}
+
+	printk("inode %llu, I can only trust %u clusters\n",
+	       oip->ip_blkno, trustable_clusters);
+
+	ocfs2_extent_map_trunc(inode, trustable_clusters);
+
+	ocfs2_set_local_seq_from_lvb(&oip->ip_lockres.lr_meta);
+	ocfs2_reset_meta_lvb_values(inode);
+
+	spin_lock(&lockres->lr_lock);
+	lock->l_flags &= ~OCFS2_LOCK_REFRESHING;
+	lock->l_flags &= ~OCFS2_LOCK_NEEDS_REFRESH;
+	spin_unlock(&lockres->lr_lock);
+
+	wake_up_all(&lockres->lr_refreshing);
+bail:
+	return status;
+}
+
+int ocfs2_meta_lock(struct inode *inode,
+		    ocfs_journal_handle *handle,
+		    struct buffer_head **ret_bh,
+		    int ex)
+{
+	int status, level;
+	ocfs2_lock *lock;
+	ocfs_super *osb = OCFS2_SB(inode->i_sb);
+	struct buffer_head *bh = NULL;
+
+	OCFS_ASSERT(inode);
+
+	if (handle && !ex)
+		BUG();
+
+	LOG_ENTRY();
+
+	/* we skip recovery wait on journal inodes as those can be
+	 * locked from ocfs_recover_node. */
+	if (!INODE_JOURNAL(inode)) {
+		status = ocfs2_wait_on_recovery(osb);
+		if (status < 0) {
+			LOG_ERROR_STATUS(status);
+			goto bail;
+		}
+	}
+
+	lockres = &OCFS_I(inode)->ip_lockres;
+	lock = &(OCFS_I(inode)->ip_lockres.lr_meta);
+	level = ex ? LKM_EXMODE : LKM_PRMODE;
+
+	status = ocfs2_cluster_lock(inode, OCFS_TYPE_META, lock, level);
+	if (status < 0) {
+		LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+
+	if (!INODE_JOURNAL(inode)) {
+		status = ocfs2_wait_on_recovery(osb);
+		if (status < 0) {
+			LOG_ERROR_STATUS(status);
+			goto bail;
+		}
+	}
+
+	status = ocfs2_meta_lock_update(inode, &bh);
+	if (status < 0) {
+		LOG_ERROR_STATUS(status);
+		goto bail;
+	}
+
+	if (ret_bh && !bh) {
+		/* caller wants a buffer head but we haven't read it yet. */
+		status = ocfs_read_block(osb, OCFS_I(inode)->ip_blkno, &bh,
+					 OCFS_BH_CACHED, inode);
+		if (status < 0) {
+			LOG_ERROR_STATUS(status);
+			goto bail;
+		}
+	}
+	if (ret_bh) {
+		*ret_bh = bh;
+		get_bh(*ret_bh);
+	}
+	if (handle) {
+		status = ocfs_handle_add_lock(handle, inode);
+		if (status < 0)
+			LOG_ERROR_STATUS(status);
+	}
+bail:
+	if (bh)
+		brelse(bh);
+
+	LOG_EXIT_STATUS(status);
+	return status;
+}
+
+int ocfs2_meta_unlock(struct inode *inode,
+		      int ex)
+{
+	int status, level;
+	ocfs2_lock *lock;
+
+	OCFS_ASSERT(inode);
+
+	lock = &(OCFS_I(inode)->ip_lockres.lr_meta);
+	level = ex ? LKM_EXMODE : LKM_PRMODE;
+	ocfs2_dec_holders(lock, level);
+	return 0;
+}
+
+int ocfs2_dlm_init(ocfs_super *osb)
+{
+	/* sets osb->dlm */
+}
+
+void ocfs2_unlock_ast(void *opaque, dlm_status status)
+{
+	ocfs2_lock *lock = opaque;
+	ocfs2_lock_res *lockres = lock->l_lockres;
+
+	if (status != DLM_NORMAL)
+		LOG_ERROR_ARGS("Dlm returns status %d\n", status);
+
+	spin_lock(&lockres->lr_lock);
+	lock->l_level = LKM_IVMODE;
+	lock->l_flags &= ~OCFS2_LOCK_BUSY;
+	spin_unlock(&lockres->lr_lock);
+
+	wake_up_all(&lockres->lr_busy);
+}
+
+int ocfs2_drop_lock(ocfs_super *osb,
+		    ocfs2_lock_res *lockres,
+		    ocfs2_lock *lock)
+{
+	int ret = 0;
+	dlm_status status;
+
+	spin_lock(&lockres->lr_lock);
+	OCFS_ASSERT(!lock->l_ex_holders);
+	OCFS_ASSERT(!lock->l_ro_holders);
+
+	if (lock->l_flags & OCFS2_LOCK_BUSY)
+		printk("ocfs2: destroying busy lock!\n");
+	if (lock->l_flags & OCFS2_LOCK_BLOCKED)
+		printk("ocfs2: destroying blocked lock!\n");
+
+	if (!(lock->l_flags & OCFS2_LOCK_ATTACHED)) {
+		spin_unlock(&lockres->lr_lock);
+		goto bail;
+	}
+
+	lock->l_flags &= ~OCFS2_LOCK_ATTACHED;
+
+	/* is this necessary? */
+	lock->l_flags |= OCFS2_LOCK_BUSY;
+	spin_unlock(&lockres->lr_lock);
+
+	status = dlmunlock(osb->dlm,
+			   &lock->l_lksb,
+			   LKM_VALBLK,
+			   ocfs2_unlock_ast,
+			   lock);
+	if (status != DLM_NORMAL) {
+		LOG_ERROR_ARGS("Dlm returns %d\n", status);
+		ret = -ENOENT;
+		goto bail;
+	}
+
+	ocfs2_wait_on_busy_lock(lockres, lock);
+	if (signal_pending(current)) {
+		printk("ocfs2_drop_lock: Signal caught!\n");
+		ret = -EINTR;
+	}
+bail:
+	LOG_EXIT_STATUS(ret);
+	return ret;
+}
+
+int ocfs2_drop_inode_locks(struct inode *inode)
+{
+	int status, err;
+	ocfs2_lock_res *lockres = OCFS_I(inode)->ip_lockres;
+
+	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), lockres->lr_data, lock);
+	if (err < 0)
+		LOG_ERROR_STATUS(err);
+
+	status = err;
+	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), lockres->lr_meta, lock);
+	if (err < 0)
+		LOG_ERROR_STATUS(err);
+	if (err < 0 && !status)
+		status = err;
+
+	return status;
+}
+
+/* WARNING: This function lives in a world where the only three lock
+ * levels are EX, PR, and NL. It *will* have to be adjusted when more
+ * lock types are added. */
+dlm_lock_type ocfs2_highest_compat_lock_level(dlm_lock_type level)
+{
+	dlm_lock_type new_level = LKM_EXMODE;
+
+	if (level == LKM_EXMODE)
+		new_level = LKM_NLMODE;
+	else if (level == LKM_PRMODE)
+		new_level = LKM_PRMODE;
+	return new_level;
+}
+
+/* called with the spinlock held, and WILL drop it. */
+int __ocfs2_downconvert_lock(ocfs_super *osb,
+			     ocf2_lock_res *lockres,
+			     ocfs2_lock *lock,
+			     dlm_lock_type new_level,
+			     int lvb)
+{
+	int status, flags = LKM_CONVERT;
+
+	OCFS_ASSERT(lock->l_blocking > LKM_NLMODE);
+	OCFS_ASSERT(lock->l_level > new_level);
+//	new_level = ocfs2_highest_compat_lock_level(lock->l_blocking);
+
+	lock->l_action = OCFS2_AST_DOWNCONVERT;
+	lock->l_requested = new_level;
+	lock->flags |= OCFS2_LOCK_BUSY;
+	spin_unlock(&lockres->lr_lock);
+
+	if (lvb)
+		flags |= LKM_VALBLK;
+
+	status = dlmlock(osb->dlm,
+			 new_level,
+			 flags,
+			 &lock->l_lksb,
+			 lock->l_name,
+			 ocfs2_lock_type_asts[type],
+			 ocfs2_lock_type_basts[type],
+			 lock);
+	if (status != DLM_NORMAL) {
+		LOG_ERROR_ARGS("Dlm returns %d\n", status);
+		status = -ENOENT;
+		ocfs2_recover_from_dlm_error(lockres, lock);
+		goto bail;
+	}
+
+	return status;
+}
+
+int ocfs2_process_blocked_meta(struct inode *inode, int *requeue)
+{
+	ocfs2_lock_res *lockres = OCFS_I(inode)->ip_lockres;
+	ocfs2_lock *lock = lockres->lr_meta;
+	dlm_lock_type new_level;
+	int set_lvb = 0;
+
+	spin_lock(&lockres->lr_lock);
+	if (!(lock->l_flags & OCFS2_LOCK_BLOCKED)) {
+		spin_unlock(&lockres->lr_lock);
+		return 0;
+	}
+
+	OCFS_ASSERT(!(lock->l_flags & OCFS2_LOCK_BUSY));
+	OCFS_ASSERT(lock->l_level == LKM_EXMODE || 
+		    lock->l_level == LKM_PRMODE);
+
+	if (!lock->l_ro_holders && !lock->l_ex_holders) {
+		new_level = ocfs2_highest_compat_lock_level(lock->l_blocking);
+		if (lock->l_level == LKM_EXMODE) {
+			__ocfs2_stuff_meta_lvb(inode);
+			set_lvb = 1;
+		}
+		__ocfs2_lvb_on_downconvert(lock, new_level);
+		return __ocfs2_downconvert_lock(lockres, lock, new_level,
+						set_lvb);
+	}
+	if (lock->l_ex_holders)
+		ocfs_start_checkpoint(OCFS2_SB(inode->i_sb));
+
+	*requeue++;
+	spin_unlock(&lockres->lr_lock);
+
+	return 0;
+}
+
+int ocfs2_process_blocked_data(struct inode *inode, int *requeue)
+{
+	int status = 0;
+	ocfs2_lock_res *lockres = OCFS_I(inode)->ip_lockres;
+	ocfs2_lock *lock = lockres->lr_meta;
+	dlm_lock_type blocking;
+	dlm_lock_type new_level;
+
+	spin_lock(&lockres->lr_lock);
+	if (!(lock->l_flags & OCFS2_LOCK_BLOCKED)) {
+		spin_unlock(&lockres->lr_lock);
+		return 0;
+	}
+
+	OCFS_ASSERT(!(lock->l_flags & OCFS2_LOCK_BUSY));
+
+recheck:
+	/* if we're blocking an exclusive and we have *anyone* in I/O,
+	 * then requeue. */
+	if ((lock->l_blocking == LKM_EXMODE) 
+	    && (lock->l_ex_holders || lock->l_ro_holders)) {
+		spin_unlock(&lockres->lr_lock);
+		*requeue++;
+		return 0;
+	}
+	/* If it's a PR we're blocking, then only
+	 * requeue if we've got anyone doing write (ex) I/O */
+	if ((lock->l_blocking == LKM_PRMODE) 
+		   && lock->l_ex_holders) {
+		spin_unlock(&lockres->lr_lock);
+		*requeue++;
+		return 0;
+	}
+
+	/* if we get here, then we know that anyone doing incompatible
+	 * I/O is currently blocked. Save off a copy of what we're
+	 * blocking as it may change while we're not holding the spin
+	 * lock. */
+	blocking = lock->l_blocking;
+	spin_unlock(&lockres->lr_lock);
+
+	/* 3 cases: EX blocking EX
+	 *	    EX blocking RO
+	 *          RO blocking EX */
+	sync_mapping_buffers(inode->i_mapping);
+	if (blocking == LKM_EXMODE)
+		ocfs_truncate_inode_pages(inode, 0);
+
+	spin_lock(&lockres->lr_lock);
+	if (blocking != lock->l_blocking) {
+		/* If this changed underneath us, then we can't drop
+		 * it just yet. */
+		goto recheck;
+	}
+
+	new_level = ocfs2_highest_compat_lock_level(lock->l_blocking);
+	return __ocfs2_downconvert_lock(lockres, lock, new_level, 0);
+}
+
+void ocfs2_process_blocked_inode(struct inode *inode)
+{
+	int status;
+	int reqeue = 0;
+
+	/* At this point, we've been taken off the blocked_list */
+	status = ocfs2_process_blocked_meta(inode, &requeue);
+	if (status < 0)
+		LOG_ERROR_STATUS(status);
+
+	status = ocfs2_process_blocked_data(inode, &requeue);
+	if (status < 0)
+		LOG_ERROR_STATUS(status);
+
+	if (requeue)
+		ocfs2_schedule_blocked_inode(inode);
+}
+
+void ocfs2_schedule_blocked_inode(struct inode *inode)
+{
+	ocfs_super *osb = OCFS2_SB(inode->i_sb);
+
+	atomic_inc(&inode->i_count);
+
+	spin_lock(&osb->vote_task_lock);
+	if (list_empty(&(OCFS_I(inode)->ip_blocked_list)))
+		list_add_tail(&osb->blocked_inode_list,
+			      &(OCFS_I(inode)->ip_blocked_list));
+	osb->blocked_inode_count++;
+	spin_unlock(&osb->vote_task_lock);
+}
+
+static inline void ocfs2_kick_vote_thread(ocfs_super *osb)
+{
+	atomic_set(&osb->vote_event_woken, 1);
+	wake_up(&osb->vote_event);
+}
+
+typedef struct _ocfs2_vote_msg
+{
+	u32 m_req_node;
+	u32 m_request;
+	u64 m_blkno;
+	u32 m_generation;
+} ocfs2_vote_msg;
+
+typedef struct _ocfs2_vote_work {
+	struct list_head   w_list;
+	ocfs2_vote_msg_hdr w_msg;
+} ocfs2_vote_work;
+
+enum ocfs2_vote_request {
+	OCFS2_VOTE_REQ_INVALID = 0
+	OCFS2_VOTE_REQ_DELETE = 1,
+	OCFS2_VOTE_REQ_DENTRY,
+	OCFS2_VOTE_REQ_RENAME
+};
+
+static int ocfs2_process_delete_request(struct inode *inode)
+{
+	int response = -EBUSY;
+
+	LOG_TRACE_ARGS("DELETE vote on inode %lu, read "
+		       "lnk_cnt = %u\n", inode->i_ino, 
+		       inode->i_nlink);
+
+	/* force this as ours may be out of date. */
+	inode->i_nlink = 0;
+
+	spin_lock(&OCFS_I(inode)->ip_lock);
+	/* vote no if the file is still open. */
+	if (OCFS_I(inode)->ip_open_cnt > 0) {
+		LOG_TRACE_PROCESS_VOTE("open count = %u\n", 
+		       OCFS_I(inode)->ip_open_cnt);
+		spin_unlock(&OCFS_I(inode)->ip_lock);
+		goto done;
+	}
+	spin_unlock(&OCFS_I(inode)->ip_lock);
+
+	/* vote no if someone's extending it. */
+	spin_lock(&oin_num_ext_lock);
+	if (OCFS_I(inode)->ip_num_extends) {
+		spin_unlock(&oin_num_ext_lock);
+		LOG_TRACE_PROCESS_VOTE("extends pending\n");
+		goto done;
+	}
+	spin_unlock(&oin_num_ext_lock);
+
+	/* directories are a bit ugly... What if someone is sitting in
+	 * it? We want to make sure the inode is removed completely as
+	 * a result of the iput in process_vote. */
+	if (S_ISDIR(inode->i_mode) && (atomic_read(&inode->i_count) != 1)) {
+		LOG_TRACE_PROCESS_VOTE("i_count = %u\n", 
+		       atomic_read(&inode->i_count));
+		goto done;
+	}
+
+	/* If we get here, then we're voting 'yes', so commit the
+	 * delete on our side. */
+	response = 0;
+
+	spin_lock(&OCFS_I(inode)->ip_lock);
+	SET_INODE_DELETED(inode);
+	/* We set the SKIP_DELETE flag on the inode so we don't try to
+	 * delete it in delete_inode ourselves. */
+	OCFS_SET_FLAG(OCFS_I(inode)->ip_flags, 
+		      OCFS_INODE_SKIP_DELETE);
+	spin_unlock(&OCFS_I(inode)->ip_lock);
+
+	d_prune_aliases (inode);
+
+	/* TODO: How much of this is really necessary? */
+	sync_mapping_buffers(inode->i_mapping);
+	ocfs_truncate_inode_pages(inode, 0);
+	ocfs2_extent_map_trunc(inode, 0);
+
+done:
+	return response;
+}
+
+static int ocfs2_process_dentry_request(struct inode *inode,
+					int rename)
+{
+	d_prune_aliases (inode);
+
+	/* for rename, we don't drop link counts */
+	if (!rename) {
+		if (S_ISDIR(inode->i_mode))
+			inode->i_nlink = 0;
+		else
+			inode->i_nlink--;
+	}
+
+	/* we always vote yes on this request type. */
+	return 0;
+}
+
+void ocfs2_process_vote(ocfs_super *osb,
+			ocfs2_vote_msg *msg)
+{
+	int vote_response = 0;
+	int rename = 0;
+	struct inode *inode = NULL;
+
+	inode = ilookup(osb, msg->m_blkno);
+	if (!inode)
+		goto respond;
+
+	OCFS_ASSERT(inode->i_generation == msg->m_generation);
+
+	switch (msg->m_request) {
+	case OCFS2_VOTE_REQ_DELETE:
+		vote_response = ocfs2_process_delete_request(inode);
+		break;
+	case OCFS2_VOTE_REQ_RENAME:
+		rename = 1;
+	case OCFS2_VOTE_REQ_DENTRY:
+		vote_response = ocfs2_process_dentry_request(inode, rename);
+		break;
+	default:
+		printk("ocfs2_process_vote: node %u, invalid request: %u\n",
+		       msg->m_req_node, msg->m_request);
+		vote_response = -EINVAL;
+	}
+
+respond:
+//vote response here...
+bail:
+	if (inode)
+		iput(inode);
+}
+
+void ocfs2_vote_thread_do_work(ocfs_super *osb)
+{
+	struct inode *inode = NULL;
+	ocfs_inode_private *ip;
+	int processed;
+	ocfs2_vote_work *work;
+
+	spin_lock(&osb->vote_task_lock);
+	processed = osb->blocked_inode_count;
+	while (processed--) {
+		OCFS_ASSERT(!list_empty(&osb->blocked_inode_list));
+
+		ip = list_entry(&osb->blocked_inode_list->next,
+				ocfs_inode_private, ip_blocked_list);
+		list_del_init(&ip->ip_blocked_list);
+		osb->blocked_inode_count--;
+		spin_unlock(&osb->vote_task_lock);
+
+		inode = ip->ip_inode;
+		processed--;
+
+		ocfs2_process_blocked_inode(inode);
+
+		iput(inode);
+
+		spin_lock(&osb->vote_task_lock);
+	}
+
+	while (osb->vote_count) {
+		OCFS_ASSERT(!list_empty(&osb->vote_list));
+		work = list_entry(&osb->vote_list->next,
+				  ocfs2_vote_work, w_list);
+		list_del(&w->w_list);
+		osb->vote_count--;
+		spin_unlock(&osb->vote_task_lock);
+
+		ocfs2_process_vote(osb, &w.w_msg);
+		kfree(w);
+
+		spin_lock(&osb->vote_task_lock);
+	}
+	spin_unlock(&osb->vote_task_lock);
+}
+
+int ocfs2_vote_thread(void *arg)
+{
+	int status = 0, workit;
+	ocfs_super *osb = arg;
+	char proc[16];
+
+	sprintf (proc, "ocfs2vote-%d", osb->osb_id);
+	ocfs_daemonize (proc, strlen(proc), 0);
+
+	spin_lock(&osb->vote_task_lock);
+	osb->vote_task = current;
+	init_completion (&osb->vote_event_complete);
+
+	while (1) {
+		spin_unlock(&osb->vote_task_lock);
+
+#define OCFS2_VOTE_THREAD_TIMEOUT (HZ >> 1)
+		wait_event_interruptible_timeout(&osb->vote_event,
+						 atomic_read(
+							 &osb->wake_vote_task),
+						 OCFS2_VOTE_THREAD_TIMEOUT);
+
+		atomic_set(&osb->vote_event_woken, 0);
+
+		status = ocfs2_vote_thread_do_work(osb);
+		if (status < 0)
+			LOG_ERROR_STATUS(status);
+
+		spin_lock(&osb->vote_task_lock);
+		if (osb->vote_exit && 
+		    !list_empty(&osb->blocked_inode_list) &&
+		    !list_empty(&osb->vote_list))
+			break;
+	}
+
+	osb->vote_task = NULL;
+	spin_unlock(&osb->vote_task_lock);
+
+	complete(&osb->vote_event_complete);
+
+	return status;
+}

Added: branches/dlm-glue/src/dlmglue.h
===================================================================
--- branches/dlm-glue/src/dlmglue.h	2004-10-26 01:03:25 UTC (rev 1592)
+++ branches/dlm-glue/src/dlmglue.h	2004-10-26 01:06:23 UTC (rev 1593)
@@ -0,0 +1,162 @@
+/* -*- mode: c; c-basic-offset: 8; -*-
+ * vim: noexpandtab sw=8 ts=8 sts=0:
+ *
+ * middle.h
+ *
+ * description here
+ *
+ * Copyright (C) 2002, 2004 Oracle.  All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ */
+
+
+#ifndef MIDDLE_H
+#define MIDDLE_H
+
+struct _ocfs2_lock_res;
+
+enum ocfs2_ast_action {
+	OCFS2_AST_INVALID = 0,
+	OCFS2_AST_ATTACH,
+	OCFS2_AST_CONVERT
+};
+
+typedef struct _ocfs2_lock {
+	struct _ocfs2_lock_res *l_lockres;
+	int                     l_flags;
+	char                   *l_name;
+	dlm_lock_type           l_level;
+	unsigned int            l_ro_holders;
+	unsigned int            l_ex_holders;
+	dlm_lockstatus          l_lksb;
+	u32                     l_local_seq;
+
+	/* used from AST/BAST funcs. */
+	ocfs2_ast_action        l_action;
+	dlm_lock_type           l_requested;
+	dlm_lock_type           l_blocking;
+} ocfs2_lock;
+
+#define OCFS2_LOCK_ATTACHED      (0x00000001) /* have we initialized
+					       * the lvb */
+#define OCFS2_LOCK_BUSY          (0x00000002) /* we are currently in
+					       * dlm_lock */
+#define OCFS2_LOCK_BLOCKED       (0x00000004) /* blocked waiting to
+					       * downconvert*/
+#define OCFS2_LOCK_NEEDS_REFRESH (0x00000008)
+#define OCFS2_LOCK_REFRESHING    (0x00000010)
+
+
+typedef struct _ocfs2_lock_res {
+	/* do i need this or can't i just use container_of? */
+	struct inode     *lr_inode;
+	/* actually, should probably just use ip_lock. */
+	spinlock_t        lr_lock;
+
+	ocfs2_lock        lr_meta;
+	ocfs2_lock        lr_data;
+//	ocfs2_lock        lr_file;
+
+	/* should i just make these two a single wait queue? */
+	wait_queue_head_t lr_busy;
+	wait_queue_head_t lr_blocked;
+	wait_queue_head_t lr_refreshing;
+} ocfs2_lock_res;
+
+/*
+ * LVB Sequence number rules:
+ * local seq and lvb seq are initialized to zero.
+ *
+ * Note that the lvb is basically invalid until the 1st EX downconvert
+ * as he's the only guy that can set it valid. This is ok though as PR
+ * holders would have to do an I/O under lock anyway.
+ *
+ * NL->PR:
+ * NL->EX:
+ * If LVB is valid:
+ *   if local seq == lvb seq, then we are up to date with the contents.
+ *   otherwise, we take the slow path to get up to date and then set our
+ *   local seq to the lvb seq.
+ *
+ * PR->NL: 
+ * If LVB is valid:
+ *   We increment our local seq. -- this allows up to
+ *   one set of changes to the lvb before we considers ourselves
+ *   invalid.
+ *
+ * PR->EX:
+ *   Do nothing.
+ *
+ * EX->NL:
+ * EX->PR:
+ * Set the LVB as valid.
+ * Populate the LVB contents (this is lock type specific)
+ * Increment the LVB seq.
+ * Set my local seq to the LVB seq.
+ * if (EX->NL)
+ *   do an additional increment of my local seq.
+ */
+typedef struct _ocfs2_lvb {
+	u32 lvb_seq;
+} ocfs2_lvb;
+typedef struct _ocfs2_meta_lvb {
+	ocfs2_lvb lvb;
+	u32       lvb_trunc_clusters;
+	u32       lvb_iclusters;
+	u32       lvb_iuid;
+	u32       lvb_igid;
+	u64       lvb_isize;
+	u16       lvb_imode;
+	u16       lvb_inlink;
+	u64       lvb_iatime;
+	u64       lvb_ictime;
+	u64       lvb_imtime;
+} ocfs2_meta_lvb;
+
+int ocfs2_dlm_init(ocfs_super *osb);
+int ocfs2_lock_res_init(struct inode *inode,
+			ocfs2_lock_res *res);
+void ocfs2_lock_res_free(ocfs2_lock_res *res);
+int ocfs2_drop_inode_locks(struct inode *inode);
+int ocfs2_data_lock(struct inode *inode,
+		    int write);
+int ocfs2_data_unlock(struct inode *inode,
+		      int write);
+int ocfs2_meta_lock(struct inode *inode,
+		    ocfs_journal_handle *handle,
+		    struct buffer_head **ret_bh,
+		    int ex);
+int ocfs2_meta_unlock(struct inode *inode,
+		      int ex);
+
+static inline void ocfs2_lvb_set_trunc_clusters(struct inode *inode,
+						unsigned int trunc_clusters)
+{
+	ocfs2_lock_res *lockres = &OCFS_I(inode)->ip_lockres;
+	ocfs2_lock *lock = &lockres->lr_meta;
+	ocfs2_meta_lvb *lvb;
+
+	spin_lock(&lockres->lr_lock);
+	OCFS_ASSERT(lock->l_level == LKM_EXMODE);
+
+	lvb = lock->l_lksb.lvb;
+	if (lvb->lvb_trunc_clusters > trunc_clusters)
+		lvb->lvb_trunc_clusters = trunc_clusters;
+	spin_unlock(&lockres->lr_lock);
+}
+
+#endif



More information about the Ocfs2-commits mailing list