[Ocfs2-commits] mfasheh commits r1959 - trunk/fs/ocfs2/dlm

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Mon Mar 7 18:55:02 CST 2005


Author: mfasheh
Signed-off-by: khackel
Date: 2005-03-07 18:55:01 -0600 (Mon, 07 Mar 2005)
New Revision: 1959

Modified:
   trunk/fs/ocfs2/dlm/dlmlock.c
   trunk/fs/ocfs2/dlm/dlmmaster.c
   trunk/fs/ocfs2/dlm/dlmmod.c
   trunk/fs/ocfs2/dlm/dlmmod.h
   trunk/fs/ocfs2/dlm/dlmthread.c
   trunk/fs/ocfs2/dlm/dlmunlock.c
Log:
* implement a 'purge' list of unused lock resources
  
* change dlmthread to wake up periodically and run the purge list

* teach all paths (dlmlock, dlmunlock, dlmthread) which may add or remove a 
  lock from a resource how to put / remove it from the purge list.

* have dlmlock take a ref on the resource it's dealing with (required for   
  this purge list stuff to work)      

* while we're there, clean up the exit path -- this made dropping our ref   
  alot easier.

* move the code for not shuffling non local locks into dlm_kick_thread so   
  that they never get queued in the 1st place

Signed-off-by: khackel



Modified: trunk/fs/ocfs2/dlm/dlmlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmlock.c	2005-03-07 23:29:15 UTC (rev 1958)
+++ trunk/fs/ocfs2/dlm/dlmlock.c	2005-03-08 00:55:01 UTC (rev 1959)
@@ -130,6 +130,7 @@
 		if (dlm_do_ast(dlm, res, lock) < 0)
 			dlmprintk0("eek\n");
 
+	dlm_lockres_calc_usage(dlm, res);
 	dlm_kick_thread(dlm, res);
 
 	return status;
@@ -153,6 +154,7 @@
 
 	spin_lock(&res->spinlock);
 	if (res->state & DLM_LOCK_RES_RECOVERING) {
+		spin_unlock(&res->spinlock);
 		status = DLM_RECOVERING;
 		goto bail;
 	}
@@ -175,8 +177,11 @@
 		list_del(&lock->list);
 		lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
 	}
+	spin_unlock(&res->spinlock);
+
+	dlm_lockres_calc_usage(dlm, res);
 bail:
-	spin_unlock(&res->spinlock);
+
 	wake_up(&res->wq);
 	return status;
 }

Modified: trunk/fs/ocfs2/dlm/dlmmaster.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmaster.c	2005-03-07 23:29:15 UTC (rev 1958)
+++ trunk/fs/ocfs2/dlm/dlmmaster.c	2005-03-08 00:55:01 UTC (rev 1959)
@@ -385,6 +385,7 @@
 	BUG_ON(!list_empty(&res->blocked));
 	BUG_ON(!list_empty(&res->dirty));
 	BUG_ON(!list_empty(&res->recovering));
+	BUG_ON(!list_empty(&res->purge));
 
 	kfree(res->lockname.name);
 
@@ -396,11 +397,27 @@
 	kref_get(&res->refs);
 }
 
+dlm_lock_resource *dlm_lockres_grab(dlm_ctxt *dlm,
+				    dlm_lock_resource *res)
+{
+	spin_lock(&dlm->spinlock);
+	__dlm_lockres_get(res);
+	spin_unlock(&dlm->spinlock);
+
+	return res;
+}
+
+void __dlm_lockres_put(dlm_ctxt *dlm,
+		       dlm_lock_resource *res)
+{
+	kref_put(&res->refs, dlm_lockres_release);
+}
+
 void dlm_lockres_put(dlm_ctxt *dlm,
 		     dlm_lock_resource *res)
 {
 	spin_lock(&dlm->spinlock);
-	kref_put(&res->refs, dlm_lockres_release);
+	__dlm_lockres_put(dlm, res);
 	spin_unlock(&dlm->spinlock);
 }
 
@@ -429,6 +446,7 @@
 	INIT_LIST_HEAD(&res->blocked);
 	INIT_LIST_HEAD(&res->dirty);
 	INIT_LIST_HEAD(&res->recovering);
+	INIT_LIST_HEAD(&res->purge);
 
 	kref_init(&res->refs, dlm_lockres_release);
 
@@ -439,6 +457,8 @@
 	
 	res->state = DLM_LOCK_RES_IN_PROGRESS;
 
+	res->last_used = 0;
+
 	memset(res->lvb, 0, DLM_LVB_LEN);
 }
 

Modified: trunk/fs/ocfs2/dlm/dlmmod.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmod.c	2005-03-07 23:29:15 UTC (rev 1958)
+++ trunk/fs/ocfs2/dlm/dlmmod.c	2005-03-08 00:55:01 UTC (rev 1959)
@@ -211,7 +211,7 @@
 		   dlm_bastlockfunc_t *bast)
 {
 	dlm_status status;
-	dlm_lock_resource *res;
+	dlm_lock_resource *res = NULL;
 	dlm_lock *lock = NULL;
 	int convert = 0, recovery = 0;
 
@@ -220,21 +220,21 @@
 
 	status = DLM_BADPARAM;
 	if (mode != LKM_EXMODE && mode != LKM_PRMODE && mode != LKM_NLMODE)
-		goto error_status;
+		goto error;
 
 	if (flags & ~LKM_VALID_FLAGS)
-		goto error_status;
+		goto error;
 
 	convert = (flags & LKM_CONVERT);
 	recovery = (flags & LKM_RECOVERY);
 
 	if (recovery && (!dlm_is_recovery_lock(name, strlen(name)) ||
 		 convert) ) {
-		goto error_status;
+		goto error;
 	}
 	if (convert && (flags & LKM_LOCAL)) {
 		dlmprintk0("strange LOCAL convert request!\n");
-		goto error_status;
+		goto error;
 	}
 
 	if (convert) {
@@ -242,7 +242,7 @@
 
 		/* if converting, must pass in a valid dlm_lock */
 		if (!lksb->lockid || !lksb->lockid->lockres)
-			goto error_status;
+			goto error;
 		lock = lksb->lockid;
 
 		/* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are 
@@ -258,9 +258,9 @@
 			dlmprintk("      orig args: lksb=%p, ast=%p, bast=%p, "
 				  "astdata=%p\n", lock->lksb, lock->ast, 
 				  lock->bast, lock->astdata);
-			goto error_status;
+			goto error;
 		}
-		res = lock->lockres;
+		res = dlm_lockres_grab(dlm, lock->lockres);
 		down_read(&dlm->recovery_sem);
 
 		if (res->owner == dlm->group_index)
@@ -337,22 +337,23 @@
 		}
 	}
 
+up_error:
 	if (!recovery)
 		up_read(&dlm->recovery_sem);
-	return status;
 
-up_error:
-	if (!recovery)
-		up_read(&dlm->recovery_sem);
 error:
-	if (lock && !convert) {
-		kfree(lock);
-		lksb->lockid = NULL;
+	if (status != DLM_NORMAL) {
+		if (lock && !convert) {
+			kfree(lock);
+			lksb->lockid = NULL;
+		}
+		// this is kind of unnecessary
+		lksb->status = status;
 	}
 
-error_status:
-	// this is kind of unnecessary
-	lksb->status = status;
+	if (res)
+		dlm_lockres_put(dlm, res);
+
 	return status;
 }
 EXPORT_SYMBOL(dlmlock);
@@ -369,7 +370,7 @@
 	dlm_lock_resource *res;
 	dlm_lock *lock = NULL;
 	int call_ast = 0;
-	
+
 	dlmprintk0("\n");
 
 	if (!lksb)
@@ -387,8 +388,8 @@
 		return DLM_BADPARAM;
 
 	lock = lksb->lockid;
-	res = lock->lockres;
-	
+	res = dlm_lockres_grab(dlm, lock->lockres);
+
 	DLM_ASSERT(lock);
 	DLM_ASSERT(res);
 	dlmprintk("lock=%p res=%p\n", lock, res);
@@ -415,11 +416,22 @@
 		dlmprintk("kicking the thread\n");
 		dlm_kick_thread(dlm, res);
 	}
+
+	dlm_lockres_calc_usage(dlm, res);
+	dlm_lockres_put(dlm, res);
+
 	dlmprintk("returning status=%d!\n", status);
 	return status;
 }
 EXPORT_SYMBOL(dlmunlock);
 
+void __dlm_unhash_lock(dlm_ctxt *dlm,
+		       dlm_lock_resource *lockres)
+{
+	list_del_init(&lockres->list);
+	__dlm_lockres_put(dlm, lockres);
+}
+
 void __dlm_insert_lock(dlm_ctxt *dlm,
 		       dlm_lock_resource *res)
 {
@@ -710,6 +722,8 @@
 	INIT_LIST_HEAD(&dlm->dirty_list);
 	INIT_LIST_HEAD(&dlm->reco.resources);
 	INIT_LIST_HEAD(&dlm->reco.received);
+	INIT_LIST_HEAD(&dlm->purge_list);
+
 	dlm->dlm_thread_task = NULL;
 	init_waitqueue_head(&dlm->dlm_thread_wq);
 	INIT_LIST_HEAD(&dlm->master_list);

Modified: trunk/fs/ocfs2/dlm/dlmmod.h
===================================================================
--- trunk/fs/ocfs2/dlm/dlmmod.h	2005-03-07 23:29:15 UTC (rev 1958)
+++ trunk/fs/ocfs2/dlm/dlmmod.h	2005-03-08 00:55:01 UTC (rev 1959)
@@ -205,6 +205,8 @@
 	struct list_head list;
 	struct list_head *resources;
 	struct list_head dirty_list;
+	struct list_head purge_list;
+	unsigned int purge_count;
 	spinlock_t spinlock;
 	struct rw_semaphore recovery_sem;
 	char *name;
@@ -239,6 +241,8 @@
 #define DLM_LOCK_RES_DIRTY                0x00000008
 #define DLM_LOCK_RES_IN_PROGRESS          0x00000010 
 
+#define DLM_PURGE_INTERVAL_MS   (8 * 1000)
+
 typedef struct _dlm_lock_resource
 {
 	/* WARNING: Please see the comment in dlm_init_lockres before
@@ -249,11 +253,17 @@
 	/* please keep these next 3 in this order 
 	 * some funcs want to iterate over all lists */
 	struct list_head granted;
-	struct list_head converting; 
+	struct list_head converting;
 	struct list_head blocked;
 
 	struct list_head dirty;
 	struct list_head recovering; // dlm_recovery_ctxt.resources list
+
+	/* unused lock resources have their last_used stamped and are
+	 * put on a list for the dlm thread to run. */
+	struct list_head purge;
+	unsigned long    last_used;
+
 	spinlock_t spinlock;
 	wait_queue_head_t wq;
 	u8  owner;              //node which owns the lock resource, or unknown
@@ -593,21 +603,26 @@
 dlm_status dlmconvert_remote(dlm_ctxt *dlm, dlm_lock_resource *res, 
 			     dlm_lock *lock, int flags, int type);
 
-dlm_status dlmunlock(dlm_ctxt *dlm, dlm_lockstatus *lksb, int flags, 
+dlm_status dlmunlock(dlm_ctxt *dlm, dlm_lockstatus *lksb, int flags,
 		     dlm_astunlockfunc_t *unlockast, void *data);
-dlm_status dlmunlock_common(dlm_ctxt *dlm, dlm_lock_resource *res, 
-			    dlm_lock *lock, dlm_lockstatus *lksb, 
+dlm_status dlmunlock_common(dlm_ctxt *dlm, dlm_lock_resource *res,
+			    dlm_lock *lock, dlm_lockstatus *lksb,
 			    int flags, int *call_ast, int master_node);
-static inline dlm_status dlmunlock_master(dlm_ctxt *dlm, dlm_lock_resource *res,
-			    dlm_lock *lock, dlm_lockstatus *lksb, 
-			    int flags, int *call_ast)
+static inline dlm_status dlmunlock_master(dlm_ctxt *dlm,
+					  dlm_lock_resource *res,
+					  dlm_lock *lock,
+					  dlm_lockstatus *lksb,
+					  int flags,
+					  int *call_ast)
 {
 	return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 1);
 }
 
-static inline dlm_status dlmunlock_remote(dlm_ctxt *dlm, dlm_lock_resource *res,
-			    dlm_lock *lock, dlm_lockstatus *lksb, 
-			    int flags, int *call_ast)
+static inline dlm_status dlmunlock_remote(dlm_ctxt *dlm,
+					  dlm_lock_resource *res,
+					  dlm_lock *lock,
+					  dlm_lockstatus *lksb,
+					  int flags, int *call_ast)
 {
 	return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 0);
 }
@@ -620,9 +635,18 @@
 void dlm_put(dlm_ctxt *dlm);
 dlm_ctxt *dlm_grab(dlm_ctxt *dlm);
 
+void dlm_lockres_calc_usage(dlm_ctxt *dlm,
+			    dlm_lock_resource *res);
 void __dlm_lockres_get(dlm_lock_resource *res);
+
+dlm_lock_resource *dlm_lockres_grab(dlm_ctxt *dlm,
+				    dlm_lock_resource *res);
+void __dlm_lockres_put(dlm_ctxt *dlm,
+		       dlm_lock_resource *res);
 void dlm_lockres_put(dlm_ctxt *dlm,
 		     dlm_lock_resource *res);
+void __dlm_unhash_lock(dlm_ctxt *dlm,
+		       dlm_lock_resource *res);
 void __dlm_insert_lock(dlm_ctxt *dlm,
 		       dlm_lock_resource *res);
 dlm_lock_resource * __dlm_lookup_lock(dlm_ctxt *dlm,

Modified: trunk/fs/ocfs2/dlm/dlmthread.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmthread.c	2005-03-07 23:29:15 UTC (rev 1958)
+++ trunk/fs/ocfs2/dlm/dlmthread.c	2005-03-08 00:55:01 UTC (rev 1959)
@@ -61,10 +61,127 @@
 
 #define dlm_lock_is_remote(dlm, lock)     ((lock)->ml.node != (dlm)->group_index)
 
-/*
- * DLM THREAD
- */
+static int __dlm_lockres_unused(dlm_lock_resource *res)
+{
+	if (list_empty(&res->granted) &&
+	    list_empty(&res->converting) &&
+	    list_empty(&res->blocked) &&
+	    list_empty(&res->dirty))
+		return 1;
+	return 0;
+}
 
+/* Call whenever you may have added or deleted something from one of
+ * the lockres queue's. This will figure out whether it belongs on the
+ * unused list or not and does the appropriate thing. */
+static void __dlm_lockres_calc_usage(dlm_ctxt *dlm,
+				     dlm_lock_resource *res)
+{
+	assert_spin_locked(&dlm->spinlock);
+	assert_spin_locked(&res->spinlock);
+
+	if (__dlm_lockres_unused(res)) {
+		dlmprintk("putting lockres %.*s from purge list\n",
+			  res->lockname.len, res->lockname.name);
+
+		res->last_used = jiffies;
+		list_add_tail(&res->purge,
+			      &dlm->purge_list);
+		dlm->purge_count++;
+	} else if (!list_empty(&res->purge)) {
+		dlmprintk("removing lockres %.*s from purge list\n",
+			  res->lockname.len, res->lockname.name);
+
+		list_del_init(&res->purge);
+		dlm->purge_count--;
+	}
+}
+
+void dlm_lockres_calc_usage(dlm_ctxt *dlm,
+			    dlm_lock_resource *res)
+{
+	spin_lock(&dlm->spinlock);
+	spin_lock(&res->spinlock);
+
+	__dlm_lockres_calc_usage(dlm, res);
+
+	spin_unlock(&res->spinlock);
+	spin_unlock(&dlm->spinlock);
+}
+
+/* TODO: Eventual API: Called with the dlm spinlock held, may drop it
+ * to do migration, but will re-acquire before exit. */
+static void dlm_purge_lockres(dlm_ctxt *dlm,
+			      dlm_lock_resource *lockres)
+{
+	int master;
+
+	/* Since we can't migrate locks yet, for now we only handle
+	 * non locally mastered locks. */
+	spin_lock(&lockres->spinlock);
+	master = lockres->owner == dlm->group_index;
+	spin_unlock(&lockres->spinlock);
+
+	dlmprintk("purging lockres %.*s, master = %d\n", lockres->lockname.len,
+		  lockres->lockname.name, master);
+
+	/* Non master is the easy case -- no migration required, just
+	 * quit. */
+	if (!master)
+		__dlm_unhash_lock(dlm, lockres);
+
+	/* TODO: Wheee! Migrate lockres here! */
+}
+
+static void dlm_run_purge_list(dlm_ctxt *dlm)
+{
+	unsigned int run_max, unused;
+	unsigned long purge_jiffies;
+	dlm_lock_resource *lockres;
+
+	spin_lock(&dlm->spinlock);
+	run_max = dlm->purge_count;
+
+	while(run_max && !list_empty(&dlm->purge_list)) {
+		run_max--;
+
+		lockres = list_entry(dlm->purge_list.next,
+				     dlm_lock_resource, purge);
+		list_del_init(&lockres->purge);
+		dlm->purge_count--;
+
+		/* Status of the lockres *might* change so double
+		 * check. If the lockres is unused, holding the dlm
+		 * spinlock will prevent people from getting and more
+		 * refs on it -- there's no need to keep the lockres
+		 * spinlock. */
+		spin_lock(&lockres->spinlock);
+		unused = __dlm_lockres_unused(lockres);
+		spin_unlock(&lockres->spinlock);
+
+		if (!unused)
+			continue;
+
+		purge_jiffies = lockres->last_used + 
+			msecs_to_jiffies(DLM_PURGE_INTERVAL_MS);
+
+		/* Make sure that we want to be processing this guy at
+		 * this time. */
+		if (time_after(purge_jiffies, jiffies)) {
+			/* re-add to be processed at a later date. */
+			list_add_tail(&lockres->purge, &dlm->purge_list);
+			dlm->purge_count++;
+			continue;
+		}
+
+		/* This may drop and reacquire the dlm spinlock if it
+		 * has to do migration. */
+		dlm_purge_lockres(dlm, lockres);
+	}
+
+	spin_unlock(&dlm->spinlock);
+}
+
 void dlm_shuffle_lists(dlm_ctxt *dlm, dlm_lock_resource *res)
 {
 	dlm_lock *lock, *target;
@@ -232,17 +349,20 @@
 	spin_unlock(&res->spinlock);
 }
 
-
 /* must have NO locks when calling this */
 void dlm_kick_thread(dlm_ctxt *dlm, dlm_lock_resource *res)
 {
 	if (res) {
 		spin_lock(&dlm->spinlock);
 		spin_lock(&res->spinlock);
-		if (!(res->state & DLM_LOCK_RES_DIRTY)) {
+
+		/* don't shuffle secondary queues */
+		if ((res->owner == dlm->group_index) &&
+		    !(res->state & DLM_LOCK_RES_DIRTY)) {
 			list_add_tail(&res->dirty, &dlm->dirty_list);
 			res->state |= DLM_LOCK_RES_DIRTY;
 		}
+
 		spin_unlock(&res->spinlock);
 		spin_unlock(&dlm->spinlock);
 	}
@@ -286,36 +406,45 @@
 	return empty;
 }
 
+#define DLM_THREAD_TIMEOUT_MS (4 * 1000)
+
 static int dlm_thread(void *data)
 {
 	struct list_head *iter, *tmpiter;
 	dlm_lock_resource *res;
 	dlm_ctxt *dlm = data;
+	unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS);
 
 	dlmprintk("dlm thread running for %s...\n", dlm->name);
 
 	while (!kthread_should_stop()) {
 
+		dlm_run_purge_list(dlm);
+
 		down_read(&dlm->recovery_sem);
 		spin_lock(&dlm->spinlock);
 		list_for_each_safe(iter, tmpiter, &dlm->dirty_list) {
 			res = list_entry(iter, dlm_lock_resource, dirty);
-			/* don't shuffle secondary queues */
-			if (res->owner != dlm->group_index)
-				continue;
+
 			spin_lock(&res->spinlock);
-			list_del(&res->dirty);
+			list_del_init(&res->dirty);
 			res->state &= ~DLM_LOCK_RES_DIRTY;
+			BUG_ON(res->owner != dlm->group_index);
 			spin_unlock(&res->spinlock);
 
 			dlm_shuffle_lists(dlm, res);
+
+			spin_lock(&res->spinlock);
+			__dlm_lockres_calc_usage(dlm, res);
+			spin_unlock(&res->spinlock);
 		}
 		spin_unlock(&dlm->spinlock);
 		up_read(&dlm->recovery_sem);
 
-		wait_event_interruptible(dlm->dlm_thread_wq,
-					 !dlm_dirty_list_empty(dlm) ||
-					 kthread_should_stop());
+		wait_event_interruptible_timeout(dlm->dlm_thread_wq,
+						 !dlm_dirty_list_empty(dlm) ||
+						 kthread_should_stop(),
+						 timeout);
 	}
 
 	dlmprintk0("quitting DLM thread\n");

Modified: trunk/fs/ocfs2/dlm/dlmunlock.c
===================================================================
--- trunk/fs/ocfs2/dlm/dlmunlock.c	2005-03-07 23:29:15 UTC (rev 1958)
+++ trunk/fs/ocfs2/dlm/dlmunlock.c	2005-03-08 00:55:01 UTC (rev 1959)
@@ -146,13 +146,12 @@
 		spin_lock(&res->spinlock);
 		spin_lock(&lock->spinlock);
 	}
-	
+
 	if (actions & DLM_UNLOCK_REMOVE_LOCK)
 		list_del(&lock->list);
 	if (actions & DLM_UNLOCK_REGRANT_LOCK)
 		list_add_tail(&lock->list, &res->granted);
 
-
 leave:
 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
 	spin_unlock(&lock->spinlock);



More information about the Ocfs2-commits mailing list