[Ocfs2-commits] zab commits r2676 - in
branches/locking-changes/fs/ocfs2: . cluster
svn-commits at oss.oracle.com
svn-commits at oss.oracle.com
Thu Nov 3 16:39:02 CST 2005
Author: zab
Signed-off-by: mfasheh
Date: 2005-11-03 16:39:00 -0600 (Thu, 03 Nov 2005)
New Revision: 2676
Modified:
branches/locking-changes/fs/ocfs2/aops.c
branches/locking-changes/fs/ocfs2/cluster/masklog.h
branches/locking-changes/fs/ocfs2/dlmglue.c
branches/locking-changes/fs/ocfs2/dlmglue.h
Log:
o Work around the page lock and dlm lock order inversion. aops unlock their
locked page before blocking on the vote thread. When they wake up they
return AOP_TRUNCATED_PAGE which tells the dlm to try calling again with
a new locked page.
Signed-off-by: mfasheh
Modified: branches/locking-changes/fs/ocfs2/aops.c
===================================================================
--- branches/locking-changes/fs/ocfs2/aops.c 2005-11-02 00:09:16 UTC (rev 2675)
+++ branches/locking-changes/fs/ocfs2/aops.c 2005-11-03 22:39:00 UTC (rev 2676)
@@ -198,8 +198,10 @@
mlog_entry("(0x%p, %lu)\n", file, (page ? page->index : 0));
- ret = ocfs2_meta_lock(inode, NULL, NULL, 0);
- if (ret < 0) {
+ ret = ocfs2_meta_lock_with_page(inode, NULL, NULL, 0, page);
+ if (ret != 0) {
+ if (ret == AOP_TRUNCATED_PAGE)
+ unlock = 0;
mlog_errno(ret);
goto out;
}
@@ -226,8 +228,10 @@
goto out_alloc;
}
- ret = ocfs2_data_lock(inode, 0);
- if (ret < 0) {
+ ret = ocfs2_data_lock_with_page(inode, 0, page);
+ if (ret != 0) {
+ if (ret == AOP_TRUNCATED_PAGE)
+ unlock = 0;
mlog_errno(ret);
goto out_alloc;
}
@@ -283,8 +287,8 @@
mlog_entry("(0x%p, 0x%p, %u, %u)\n", file, page, from, to);
- ret = ocfs2_meta_lock(inode, NULL, NULL, 0);
- if (ret < 0) {
+ ret = ocfs2_meta_lock_with_page(inode, NULL, NULL, 0, page);
+ if (ret != 0) {
mlog_errno(ret);
goto out;
}
@@ -397,14 +401,14 @@
locklevel = 1;
}
- ret = ocfs2_meta_lock(inode, NULL, &di_bh, locklevel);
- if (ret < 0) {
+ ret = ocfs2_meta_lock_with_page(inode, NULL, &di_bh, locklevel, page);
+ if (ret != 0) {
mlog_errno(ret);
goto out;
}
- ret = ocfs2_data_lock(inode, 1);
- if (ret < 0) {
+ ret = ocfs2_data_lock_with_page(inode, 1, page);
+ if (ret != 0) {
mlog_errno(ret);
goto out_unlock_meta;
}
Modified: branches/locking-changes/fs/ocfs2/cluster/masklog.h
===================================================================
--- branches/locking-changes/fs/ocfs2/cluster/masklog.h 2005-11-02 00:09:16 UTC (rev 2675)
+++ branches/locking-changes/fs/ocfs2/cluster/masklog.h 2005-11-03 22:39:00 UTC (rev 2676)
@@ -198,8 +198,10 @@
} while (0)
#define mlog_errno(st) do { \
- if ((st) != -ERESTARTSYS && (st) != -EINTR) \
- mlog(ML_ERROR, "status = %lld\n", (long long)(st)); \
+ int _st = (st); \
+ if (_st != -ERESTARTSYS && _st != -EINTR && \
+ _st != AOP_TRUNCATED_PAGE) \
+ mlog(ML_ERROR, "status = %lld\n", (long long)_st); \
} while (0)
#define mlog_entry(fmt, args...) do { \
Modified: branches/locking-changes/fs/ocfs2/dlmglue.c
===================================================================
--- branches/locking-changes/fs/ocfs2/dlmglue.c 2005-11-02 00:09:16 UTC (rev 2675)
+++ branches/locking-changes/fs/ocfs2/dlmglue.c 2005-11-03 22:39:00 UTC (rev 2676)
@@ -30,6 +30,7 @@
#include <linux/smp_lock.h>
#include <linux/crc32.h>
#include <linux/kthread.h>
+#include <linux/pagemap.h>
#include <cluster/heartbeat.h>
#include <cluster/nodemanager.h>
@@ -175,10 +176,6 @@
int dlm_flags);
static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
int wanted);
-static int ocfs2_cluster_lock(ocfs2_super *osb,
- struct ocfs2_lock_res *lockres,
- int level,
- int lkm_flags);
static void ocfs2_cluster_unlock(ocfs2_super *osb,
struct ocfs2_lock_res *lockres,
int level);
@@ -868,10 +865,33 @@
mw->mw_goal = goal;
}
+/* returns 0 if the mw that was removed was already satisfied, -EBUSY
+ * if the mask still hadn't reached its goal */
+static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
+ struct ocfs2_mask_waiter *mw)
+{
+ unsigned long flags;
+ int ret = 0;
+
+ spin_lock_irqsave(&lockres->l_lock, flags);
+ if (!list_empty(&mw->mw_item)) {
+ if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
+ ret = -EBUSY;
+
+ list_del_init(&mw->mw_item);
+ init_completion(&mw->mw_complete);
+ }
+ spin_unlock_irqrestore(&lockres->l_lock, flags);
+
+ return ret;
+
+}
+
static int ocfs2_cluster_lock(ocfs2_super *osb,
struct ocfs2_lock_res *lockres,
int level,
- int lkm_flags)
+ int lkm_flags,
+ int arg_flags)
{
struct ocfs2_mask_waiter mw;
enum dlm_status status;
@@ -986,6 +1006,22 @@
unlock:
spin_unlock_irqrestore(&lockres->l_lock, flags);
out:
+ /*
+ * This is helping work around a lock inversion between the page lock
+ * and dlm locks. One path holds the page lock while calling aops
+ * which block acquiring dlm locks. The voting thread holds dlm
+ * locks while acquiring page locks while down converting data locks.
+ * This block is helping an aop path notice the inversion and back
+ * off to unlock its page lock before trying the dlm lock again.
+ */
+ if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
+ mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
+ wait = 0;
+ if (lockres_remove_mask_waiter(lockres, &mw))
+ ret = -EAGAIN;
+ else
+ goto again;
+ }
if (wait) {
ret = ocfs2_wait_for_mask(&mw);
if (ret == 0)
@@ -1093,7 +1129,8 @@
level = write ? LKM_EXMODE : LKM_PRMODE;
- status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0);
+ status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
+ 0);
if (status < 0)
mlog_errno(status);
@@ -1117,8 +1154,9 @@
mlog_exit_void();
}
-int ocfs2_data_lock(struct inode *inode,
- int write)
+int ocfs2_data_lock_full(struct inode *inode,
+ int write,
+ int arg_flags)
{
int status = 0, level;
struct ocfs2_lock_res *lockres;
@@ -1145,7 +1183,8 @@
level = write ? LKM_EXMODE : LKM_PRMODE;
- status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0);
+ status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
+ 0, arg_flags);
if (status < 0)
mlog_errno(status);
@@ -1154,6 +1193,24 @@
return status;
}
+/* see ocfs2_meta_lock_with_page() */
+int ocfs2_data_lock_with_page(struct inode *inode,
+ int write,
+ struct page *page)
+{
+ int ret;
+
+ ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
+ if (ret == -EAGAIN) {
+ unlock_page(page);
+ if (ocfs2_data_lock(inode, write) == 0)
+ ocfs2_data_unlock(inode, write);
+ ret = AOP_TRUNCATED_PAGE;
+ }
+
+ return ret;
+}
+
static void ocfs2_vote_on_unlock(ocfs2_super *osb,
struct ocfs2_lock_res *lockres)
{
@@ -1536,7 +1593,7 @@
if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
dlm_flags |= LKM_NOQUEUE;
- status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags);
+ status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
if (status < 0) {
if (status != -EAGAIN && status != -EIOCBRETRY)
mlog_errno(status);
@@ -1597,6 +1654,47 @@
return status;
}
+/*
+ * This is working around a lock inversion between tasks acquiring DLM locks
+ * while holding a page lock and the vote thread which blocks dlm lock acquiry
+ * while acquiring page locks.
+ *
+ * ** These _with_page variantes are only intended to be called from aop
+ * methods that hold page locks and return a very specific *positive* error
+ * code that aop methods pass up to the VFS -- test for errors with != 0. **
+ *
+ * The DLM is called such that it returns -EAGAIN if it would have blocked
+ * waiting for the vote thread. In that case we unlock our page so the vote
+ * thread can make progress. Once we've done this we have to return
+ * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
+ * into the VFS who will then immediately retry the aop call.
+ *
+ * We do a blocking lock and immediate unlock before returning, though, so that
+ * the lock has a great chance of being cached on this node by the time the VFS
+ * calls back to retry the aop. This has a potential to livelock as nodes
+ * ping locks back and forth, but that's a risk we're willing to take to avoid
+ * the lock inversion simply.
+ */
+int ocfs2_meta_lock_with_page(struct inode *inode,
+ ocfs2_journal_handle *handle,
+ struct buffer_head **ret_bh,
+ int ex,
+ struct page *page)
+{
+ int ret;
+
+ ret = ocfs2_meta_lock_full(inode, handle, ret_bh, ex,
+ OCFS2_LOCK_NONBLOCK);
+ if (ret == -EAGAIN) {
+ unlock_page(page);
+ if (ocfs2_meta_lock(inode, handle, ret_bh, ex) == 0)
+ ocfs2_meta_unlock(inode, ex);
+ ret = AOP_TRUNCATED_PAGE;
+ }
+
+ return ret;
+}
+
void ocfs2_meta_unlock(struct inode *inode,
int ex)
{
@@ -1629,7 +1727,7 @@
if (ocfs2_is_hard_readonly(osb))
return -EROFS;
- status = ocfs2_cluster_lock(osb, lockres, level, 0);
+ status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
if (status < 0) {
mlog_errno(status);
goto bail;
@@ -1678,7 +1776,7 @@
if (ocfs2_is_hard_readonly(osb))
return -EROFS;
- status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0);
+ status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
if (status < 0)
mlog_errno(status);
Modified: branches/locking-changes/fs/ocfs2/dlmglue.h
===================================================================
--- branches/locking-changes/fs/ocfs2/dlmglue.h 2005-11-02 00:09:16 UTC (rev 2675)
+++ branches/locking-changes/fs/ocfs2/dlmglue.h 2005-11-03 22:39:00 UTC (rev 2676)
@@ -43,6 +43,14 @@
__be32 lvb_reserved[3];
};
+/* ocfs2_meta_lock_full() and ocfs2_data_lock_full() 'arg_flags' flags */
+/* don't wait on recovery. */
+#define OCFS2_META_LOCK_RECOVERY (0x01)
+/* Instruct the dlm not to queue ourselves on the other node. */
+#define OCFS2_META_LOCK_NOQUEUE (0x02)
+/* don't block waiting for the vote thread, instead return -EAGAIN */
+#define OCFS2_LOCK_NONBLOCK (0x04)
+
int ocfs2_dlm_init(ocfs2_super *osb);
void ocfs2_dlm_shutdown(ocfs2_super *osb);
void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res);
@@ -52,21 +60,27 @@
void ocfs2_lock_res_free(struct ocfs2_lock_res *res);
int ocfs2_create_new_inode_locks(struct inode *inode);
int ocfs2_drop_inode_locks(struct inode *inode);
-int ocfs2_data_lock(struct inode *inode,
- int write);
+int ocfs2_data_lock_full(struct inode *inode,
+ int write,
+ int arg_flags);
+#define ocfs2_data_lock(inode, write) ocfs2_data_lock_full(inode, write, 0)
+int ocfs2_data_lock_with_page(struct inode *inode,
+ int write,
+ struct page *page);
void ocfs2_data_unlock(struct inode *inode,
int write);
int ocfs2_rw_lock(struct inode *inode, int write);
void ocfs2_rw_unlock(struct inode *inode, int write);
-/* don't wait on recovery. */
-#define OCFS2_META_LOCK_RECOVERY (0x01)
-/* Instruct the dlm not to queue ourselves on the other node. */
-#define OCFS2_META_LOCK_NOQUEUE (0x02)
int ocfs2_meta_lock_full(struct inode *inode,
ocfs2_journal_handle *handle,
struct buffer_head **ret_bh,
int ex,
int arg_flags);
+int ocfs2_meta_lock_with_page(struct inode *inode,
+ ocfs2_journal_handle *handle,
+ struct buffer_head **ret_bh,
+ int ex,
+ struct page *page);
/* 99% of the time we don't want to supply any additional flags --
* those are for very specific cases only. */
#define ocfs2_meta_lock(i, h, b, e) ocfs2_meta_lock_full(i, h, b, e, 0)
More information about the Ocfs2-commits
mailing list