[Ocfs2-commits] zab commits r2676 - in branches/locking-changes/fs/ocfs2: . cluster

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Thu Nov 3 16:39:02 CST 2005


Author: zab
Signed-off-by: mfasheh
Date: 2005-11-03 16:39:00 -0600 (Thu, 03 Nov 2005)
New Revision: 2676

Modified:
   branches/locking-changes/fs/ocfs2/aops.c
   branches/locking-changes/fs/ocfs2/cluster/masklog.h
   branches/locking-changes/fs/ocfs2/dlmglue.c
   branches/locking-changes/fs/ocfs2/dlmglue.h
Log:
o Work around the page lock and dlm lock order inversion.  aops unlock their
  locked page before blocking on the vote thread.  When they wake up they
  return AOP_TRUNCATED_PAGE which tells the dlm to try calling again with
  a new locked page.

Signed-off-by: mfasheh


Modified: branches/locking-changes/fs/ocfs2/aops.c
===================================================================
--- branches/locking-changes/fs/ocfs2/aops.c	2005-11-02 00:09:16 UTC (rev 2675)
+++ branches/locking-changes/fs/ocfs2/aops.c	2005-11-03 22:39:00 UTC (rev 2676)
@@ -198,8 +198,10 @@
 
 	mlog_entry("(0x%p, %lu)\n", file, (page ? page->index : 0));
 
-	ret = ocfs2_meta_lock(inode, NULL, NULL, 0);
-	if (ret < 0) {
+	ret = ocfs2_meta_lock_with_page(inode, NULL, NULL, 0, page);
+	if (ret != 0) {
+		if (ret == AOP_TRUNCATED_PAGE)
+			unlock = 0;
 		mlog_errno(ret);
 		goto out;
 	}
@@ -226,8 +228,10 @@
 		goto out_alloc;
 	}
 
-	ret = ocfs2_data_lock(inode, 0);
-	if (ret < 0) {
+	ret = ocfs2_data_lock_with_page(inode, 0, page);
+	if (ret != 0) {
+		if (ret == AOP_TRUNCATED_PAGE)
+			unlock = 0;
 		mlog_errno(ret);
 		goto out_alloc;
 	}
@@ -283,8 +287,8 @@
 
 	mlog_entry("(0x%p, 0x%p, %u, %u)\n", file, page, from, to);
 
-	ret = ocfs2_meta_lock(inode, NULL, NULL, 0);
-	if (ret < 0) {
+	ret = ocfs2_meta_lock_with_page(inode, NULL, NULL, 0, page);
+	if (ret != 0) {
 		mlog_errno(ret);
 		goto out;
 	}
@@ -397,14 +401,14 @@
 		locklevel = 1;
 	}
 
-	ret = ocfs2_meta_lock(inode, NULL, &di_bh, locklevel);
-	if (ret < 0) {
+	ret = ocfs2_meta_lock_with_page(inode, NULL, &di_bh, locklevel, page);
+	if (ret != 0) {
 		mlog_errno(ret);
 		goto out;
 	}
 
-	ret = ocfs2_data_lock(inode, 1);
-	if (ret < 0) {
+	ret = ocfs2_data_lock_with_page(inode, 1, page);
+	if (ret != 0) {
 		mlog_errno(ret);
 		goto out_unlock_meta;
 	}

Modified: branches/locking-changes/fs/ocfs2/cluster/masklog.h
===================================================================
--- branches/locking-changes/fs/ocfs2/cluster/masklog.h	2005-11-02 00:09:16 UTC (rev 2675)
+++ branches/locking-changes/fs/ocfs2/cluster/masklog.h	2005-11-03 22:39:00 UTC (rev 2676)
@@ -198,8 +198,10 @@
 } while (0)
 
 #define mlog_errno(st) do {						\
-	if ((st) != -ERESTARTSYS && (st) != -EINTR)			\
-		mlog(ML_ERROR, "status = %lld\n", (long long)(st));	\
+	int _st = (st);							\
+	if (_st != -ERESTARTSYS && _st != -EINTR &&			\
+	    _st != AOP_TRUNCATED_PAGE)					\
+		mlog(ML_ERROR, "status = %lld\n", (long long)_st);	\
 } while (0)
 
 #define mlog_entry(fmt, args...) do {					\

Modified: branches/locking-changes/fs/ocfs2/dlmglue.c
===================================================================
--- branches/locking-changes/fs/ocfs2/dlmglue.c	2005-11-02 00:09:16 UTC (rev 2675)
+++ branches/locking-changes/fs/ocfs2/dlmglue.c	2005-11-03 22:39:00 UTC (rev 2676)
@@ -30,6 +30,7 @@
 #include <linux/smp_lock.h>
 #include <linux/crc32.h>
 #include <linux/kthread.h>
+#include <linux/pagemap.h>
 
 #include <cluster/heartbeat.h>
 #include <cluster/nodemanager.h>
@@ -175,10 +176,6 @@
 			     int dlm_flags);
 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
 						     int wanted);
-static int ocfs2_cluster_lock(ocfs2_super *osb,
-			      struct ocfs2_lock_res *lockres,
-			      int level,
-			      int lkm_flags);
 static void ocfs2_cluster_unlock(ocfs2_super *osb,
 				 struct ocfs2_lock_res *lockres,
 				 int level);
@@ -868,10 +865,33 @@
 	mw->mw_goal = goal;
 }
 
+/* returns 0 if the mw that was removed was already satisfied, -EBUSY
+ * if the mask still hadn't reached its goal */
+static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
+				      struct ocfs2_mask_waiter *mw)
+{
+	unsigned long flags;
+	int ret = 0;
+
+	spin_lock_irqsave(&lockres->l_lock, flags);
+	if (!list_empty(&mw->mw_item)) {
+		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
+			ret = -EBUSY;
+
+		list_del_init(&mw->mw_item);
+		init_completion(&mw->mw_complete);
+	}
+	spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+	return ret;
+
+}
+
 static int ocfs2_cluster_lock(ocfs2_super *osb,
 			      struct ocfs2_lock_res *lockres,
 			      int level,
-			      int lkm_flags)
+			      int lkm_flags,
+			      int arg_flags)
 {
 	struct ocfs2_mask_waiter mw;
 	enum dlm_status status;
@@ -986,6 +1006,22 @@
 unlock:
 	spin_unlock_irqrestore(&lockres->l_lock, flags);
 out:
+	/*
+	 * This is helping work around a lock inversion between the page lock
+	 * and dlm locks.  One path holds the page lock while calling aops
+	 * which block acquiring dlm locks.  The voting thread holds dlm
+	 * locks while acquiring page locks while down converting data locks.
+	 * This block is helping an aop path notice the inversion and back
+	 * off to unlock its page lock before trying the dlm lock again.
+	 */
+	if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
+	    mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
+		wait = 0;
+		if (lockres_remove_mask_waiter(lockres, &mw))
+			ret = -EAGAIN;
+		else
+			goto again;
+	}
 	if (wait) {
 		ret = ocfs2_wait_for_mask(&mw);
 		if (ret == 0)
@@ -1093,7 +1129,8 @@
 
 	level = write ? LKM_EXMODE : LKM_PRMODE;
 
-	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0);
+	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
+				    0);
 	if (status < 0)
 		mlog_errno(status);
 
@@ -1117,8 +1154,9 @@
 	mlog_exit_void();
 }
 
-int ocfs2_data_lock(struct inode *inode,
-		    int write)
+int ocfs2_data_lock_full(struct inode *inode,
+			 int write,
+			 int arg_flags)
 {
 	int status = 0, level;
 	struct ocfs2_lock_res *lockres;
@@ -1145,7 +1183,8 @@
 
 	level = write ? LKM_EXMODE : LKM_PRMODE;
 
-	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0);
+	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
+				    0, arg_flags);
 	if (status < 0)
 		mlog_errno(status);
 
@@ -1154,6 +1193,24 @@
 	return status;
 }
 
+/* see ocfs2_meta_lock_with_page() */
+int ocfs2_data_lock_with_page(struct inode *inode,
+			      int write,
+			      struct page *page)
+{
+	int ret;
+
+	ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
+	if (ret == -EAGAIN) {
+		unlock_page(page);
+		if (ocfs2_data_lock(inode, write) == 0)
+			ocfs2_data_unlock(inode, write);
+		ret = AOP_TRUNCATED_PAGE;
+	}
+
+	return ret;
+}
+
 static void ocfs2_vote_on_unlock(ocfs2_super *osb,
 				 struct ocfs2_lock_res *lockres)
 {
@@ -1536,7 +1593,7 @@
 	if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
 		dlm_flags |= LKM_NOQUEUE;
 
-	status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags);
+	status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
 	if (status < 0) {
 		if (status != -EAGAIN && status != -EIOCBRETRY)
 			mlog_errno(status);
@@ -1597,6 +1654,47 @@
 	return status;
 }
 
+/*
+ * This is working around a lock inversion between tasks acquiring DLM locks
+ * while holding a page lock and the vote thread which blocks dlm lock acquiry
+ * while acquiring page locks.
+ *
+ * ** These _with_page variantes are only intended to be called from aop
+ * methods that hold page locks and return a very specific *positive* error
+ * code that aop methods pass up to the VFS -- test for errors with != 0. **
+ *
+ * The DLM is called such that it returns -EAGAIN if it would have blocked
+ * waiting for the vote thread.  In that case we unlock our page so the vote
+ * thread can make progress.  Once we've done this we have to return
+ * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
+ * into the VFS who will then immediately retry the aop call.
+ *
+ * We do a blocking lock and immediate unlock before returning, though, so that
+ * the lock has a great chance of being cached on this node by the time the VFS
+ * calls back to retry the aop.    This has a potential to livelock as nodes
+ * ping locks back and forth, but that's a risk we're willing to take to avoid
+ * the lock inversion simply.
+ */
+int ocfs2_meta_lock_with_page(struct inode *inode,
+			      ocfs2_journal_handle *handle,
+			      struct buffer_head **ret_bh,
+			      int ex,
+			      struct page *page)
+{
+	int ret;
+
+	ret = ocfs2_meta_lock_full(inode, handle, ret_bh, ex,
+				   OCFS2_LOCK_NONBLOCK);
+	if (ret == -EAGAIN) {
+		unlock_page(page);
+		if (ocfs2_meta_lock(inode, handle, ret_bh, ex) == 0)
+			ocfs2_meta_unlock(inode, ex);
+		ret = AOP_TRUNCATED_PAGE;
+	}
+
+	return ret;
+}
+
 void ocfs2_meta_unlock(struct inode *inode,
 		       int ex)
 {
@@ -1629,7 +1727,7 @@
 	if (ocfs2_is_hard_readonly(osb))
 		return -EROFS;
 
-	status = ocfs2_cluster_lock(osb, lockres, level, 0);
+	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
 	if (status < 0) {
 		mlog_errno(status);
 		goto bail;
@@ -1678,7 +1776,7 @@
 	if (ocfs2_is_hard_readonly(osb))
 		return -EROFS;
 
-	status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0);
+	status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
 	if (status < 0)
 		mlog_errno(status);
 

Modified: branches/locking-changes/fs/ocfs2/dlmglue.h
===================================================================
--- branches/locking-changes/fs/ocfs2/dlmglue.h	2005-11-02 00:09:16 UTC (rev 2675)
+++ branches/locking-changes/fs/ocfs2/dlmglue.h	2005-11-03 22:39:00 UTC (rev 2676)
@@ -43,6 +43,14 @@
 	__be32       lvb_reserved[3];
 };
 
+/* ocfs2_meta_lock_full() and ocfs2_data_lock_full() 'arg_flags' flags */
+/* don't wait on recovery. */
+#define OCFS2_META_LOCK_RECOVERY	(0x01)
+/* Instruct the dlm not to queue ourselves on the other node. */
+#define OCFS2_META_LOCK_NOQUEUE		(0x02)
+/* don't block waiting for the vote thread, instead return -EAGAIN */
+#define OCFS2_LOCK_NONBLOCK		(0x04)
+
 int ocfs2_dlm_init(ocfs2_super *osb);
 void ocfs2_dlm_shutdown(ocfs2_super *osb);
 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res);
@@ -52,21 +60,27 @@
 void ocfs2_lock_res_free(struct ocfs2_lock_res *res);
 int ocfs2_create_new_inode_locks(struct inode *inode);
 int ocfs2_drop_inode_locks(struct inode *inode);
-int ocfs2_data_lock(struct inode *inode,
-		    int write);
+int ocfs2_data_lock_full(struct inode *inode,
+			 int write,
+			 int arg_flags);
+#define ocfs2_data_lock(inode, write) ocfs2_data_lock_full(inode, write, 0)
+int ocfs2_data_lock_with_page(struct inode *inode,
+			      int write,
+			      struct page *page);
 void ocfs2_data_unlock(struct inode *inode,
 		       int write);
 int ocfs2_rw_lock(struct inode *inode, int write);
 void ocfs2_rw_unlock(struct inode *inode, int write);
-/* don't wait on recovery. */
-#define OCFS2_META_LOCK_RECOVERY	(0x01)
-/* Instruct the dlm not to queue ourselves on the other node. */
-#define OCFS2_META_LOCK_NOQUEUE		(0x02)
 int ocfs2_meta_lock_full(struct inode *inode,
 			 ocfs2_journal_handle *handle,
 			 struct buffer_head **ret_bh,
 			 int ex,
 			 int arg_flags);
+int ocfs2_meta_lock_with_page(struct inode *inode,
+			      ocfs2_journal_handle *handle,
+			      struct buffer_head **ret_bh,
+			      int ex,
+			      struct page *page);
 /* 99% of the time we don't want to supply any additional flags --
  * those are for very specific cases only. */
 #define ocfs2_meta_lock(i, h, b, e) ocfs2_meta_lock_full(i, h, b, e, 0)



More information about the Ocfs2-commits mailing list