[Ocfs2-commits] mfasheh commits r2227 - trunk/fs/ocfs2

svn-commits at oss.oracle.com svn-commits at oss.oracle.com
Thu May 12 15:42:59 CDT 2005


Author: mfasheh
Signed-off-by: jlbec
Date: 2005-05-12 15:42:57 -0500 (Thu, 12 May 2005)
New Revision: 2227

Modified:
   trunk/fs/ocfs2/dlmglue.c
   trunk/fs/ocfs2/inode.c
   trunk/fs/ocfs2/journal.c
   trunk/fs/ocfs2/journal.h
   trunk/fs/ocfs2/namei.c
   trunk/fs/ocfs2/ocfs.h
   trunk/fs/ocfs2/ocfs2_fs.h
   trunk/fs/ocfs2/super.c
   trunk/fs/ocfs2/vote.c
Log:
* Use per node orphan dirs. This should speed up multi node parallel deletes

* Instead of kmallocing a full ocfs2 file string, we use the hexified
  version of the inode number for the orphan dirs. This avoids some kmalloc in
  the unlink / delete_inode paths.

Signed-off-by: jlbec



Modified: trunk/fs/ocfs2/dlmglue.c
===================================================================
--- trunk/fs/ocfs2/dlmglue.c	2005-05-11 23:33:01 UTC (rev 2226)
+++ trunk/fs/ocfs2/dlmglue.c	2005-05-12 20:42:57 UTC (rev 2227)
@@ -1647,10 +1647,13 @@
 		bh = si->si_bh;
 		status = ocfs_read_block(osb, bh->b_blocknr, &bh, 0,
 					 si->si_inode);
+		if (status == 0)
+			ocfs2_update_slot_info(si);
+
+		ocfs2_complete_lock_res_refresh(lockres, status);
+
 		if (status < 0)
 			mlog_errno(status);
-
-		ocfs2_complete_lock_res_refresh(lockres, status);
 	}
 bail:
 	mlog_exit(status);

Modified: trunk/fs/ocfs2/inode.c
===================================================================
--- trunk/fs/ocfs2/inode.c	2005-05-11 23:33:01 UTC (rev 2226)
+++ trunk/fs/ocfs2/inode.c	2005-05-12 20:42:57 UTC (rev 2227)
@@ -301,6 +301,7 @@
 
 	OCFS_I(inode)->ip_clusters = fe->i_clusters;
 	OCFS_I(inode)->ip_inode = inode;
+	OCFS_I(inode)->ip_orphaned_slot = OCFS_INVALID_NODE_NUM;
 
 	if (create_ino)
 		inode->i_ino = ino_from_blkno(inode->i_sb, fe->i_blkno);
@@ -501,11 +502,12 @@
  */
 void ocfs_delete_inode(struct inode *inode)
 {
+	int status = 0;
+	int orphaned_slot;
 	struct inode *orphan_dir_inode = NULL;
 	struct inode *inode_alloc_inode = NULL;
 	ocfs_journal_handle *handle = NULL;
 	ocfs_super *osb = OCFS_SB(inode->i_sb);
-	int status = 0;
 	struct buffer_head *orphan_dir_bh = NULL;
 	struct buffer_head *inode_alloc_bh = NULL;
 	struct buffer_head *fe_bh = NULL;
@@ -577,6 +579,28 @@
 		goto bail;
 	}
 
+	spin_lock(&OCFS_I(inode)->ip_lock);
+	orphaned_slot = OCFS_I(inode)->ip_orphaned_slot;
+	spin_unlock(&OCFS_I(inode)->ip_lock);
+
+	if (orphaned_slot == OCFS_INVALID_NODE_NUM) {
+		/* Nobody knew which slot this inode was orphaned
+		 * into. This may happen during node death and
+		 * recovery knows how to clean it up so we can safely
+		 * ignore this inode for now on. */
+		mlog(0, "Nobody knew where inode %"MLFu64" was orphaned!\n",
+		     OCFS_I(inode)->ip_blkno);
+
+		/* XXX: Is this really necessary? */
+		spin_lock(&OCFS_I(inode)->ip_lock);
+		SET_INODE_DELETED(inode);
+		spin_unlock(&OCFS_I(inode)->ip_lock);
+		goto bail;
+	}
+
+	mlog(0, "Inode %"MLFu64" is ok to wipe from orphan dir slot %d\n",
+	     OCFS_I(inode)->ip_blkno, orphaned_slot);
+
 	fe = (ocfs2_dinode *) fe_bh->b_data;
 	if (!(fe->i_flags & OCFS2_ORPHANED_FL)) {
 		/* for lack of a better error? */
@@ -608,7 +632,7 @@
 
 	orphan_dir_inode = ocfs_get_system_file_inode(osb, 
 						      ORPHAN_DIR_SYSTEM_INODE, 
-						      -1);
+						      orphaned_slot);
 	if (!orphan_dir_inode) {
 		status = -EEXIST;
 		mlog_errno(status);

Modified: trunk/fs/ocfs2/journal.c
===================================================================
--- trunk/fs/ocfs2/journal.c	2005-05-11 23:33:01 UTC (rev 2226)
+++ trunk/fs/ocfs2/journal.c	2005-05-12 20:42:57 UTC (rev 2227)
@@ -54,8 +54,7 @@
 
 static int ocfs_force_read_journal(struct inode *inode);
 static int ocfs_recover_node(ocfs_super *osb,
-			     int node_num,
-			     ocfs2_dinode **la_copy);
+			     int node_num);
 static int __ocfs_recovery_thread(void *arg);
 static int ocfs_commit_cache (ocfs_super * osb);
 static int ocfs_wait_on_mount(ocfs_super *osb);
@@ -67,6 +66,8 @@
 				     int dirty);
 static int ocfs2_trylock_journal(ocfs_super *osb,
 				 int slot_num);
+static int ocfs_recover_orphans(ocfs_super *osb,
+				int slot);
 static int ocfs_commit_thread(void *arg);
 
 /* 
@@ -842,8 +843,9 @@
 }
 
 struct ocfs2_la_recovery_item {
-	struct list_head lri_list;
-	ocfs2_dinode *lri_dinode;
+	struct list_head	lri_list;
+	int			lri_slot;
+	ocfs2_dinode		*lri_dinode;
 };
 
 /* Does the second half of the recovery process. By this point, the
@@ -858,9 +860,10 @@
  */
 void ocfs2_complete_recovery(void *data)
 {
-	int ret, cleanup_orphans;
+	int ret;
 	ocfs_super *osb = data;
 	ocfs_journal *journal = osb->journal;
+	ocfs2_dinode *la_dinode;
 	struct ocfs2_la_recovery_item *item;
 	struct list_head *p, *n;
 	LIST_HEAD(tmp_la_list);
@@ -877,29 +880,26 @@
 		item = list_entry(p, struct ocfs2_la_recovery_item, lri_list);
 		list_del_init(&item->lri_list);
 
-		mlog(0, "Clean up local alloc %"MLFu64"\n",
-		     item->lri_dinode->i_blkno);
+		mlog(0, "Complete recovery for slot %d\n", item->lri_slot);
 
-		ret = ocfs_complete_local_alloc_recovery(osb,
-							 item->lri_dinode);
+		la_dinode = item->lri_dinode;
+		if (la_dinode) {
+			mlog(0, "Clean up local alloc %"MLFu64"\n",
+			     la_dinode->i_blkno);
 
-		kfree(item->lri_dinode);
-		kfree(item);
+			ret = ocfs_complete_local_alloc_recovery(osb,
+								 la_dinode);
+			if (ret < 0)
+				mlog_errno(ret);
 
-		if (ret < 0)
-			mlog_errno(ret);
-	}
+			kfree(la_dinode);
+		}
 
-	spin_lock(&journal->j_lock);
-	cleanup_orphans = journal->j_cleanup_orphans;
-	journal->j_cleanup_orphans = 0;
-	spin_unlock(&journal->j_lock);
-
-	if (cleanup_orphans) {
-		mlog(0, "Cleanup the orphan dir\n");
-		ret = ocfs_recover_orphans(osb);
+		ret = ocfs_recover_orphans(osb, item->lri_slot);
 		if (ret < 0)
 			mlog_errno(ret);
+
+		kfree(item);
 	}
 
 	mlog(0, "Recovery completion\n");
@@ -909,7 +909,8 @@
 
 /* NOTE: This function always eats the reference to la_dinode, either
  * manually on error, or by passing it to ocfs2_complete_recovery */
-static void ocfs2_queue_local_alloc_cleanup(ocfs_journal *journal,
+static void ocfs2_queue_recovery_completion(ocfs_journal *journal,
+					    int slot_num,
 					    ocfs2_dinode *la_dinode)
 {
 	struct ocfs2_la_recovery_item *item;
@@ -919,7 +920,8 @@
 		/* Though we wish to avoid it, we are in fact safe in
 		 * skipping local alloc cleanup as fsck.ocfs2 is more
 		 * than capable of reclaiming unused space. */
-		kfree(la_dinode);
+		if (la_dinode)
+			kfree(la_dinode);
 
 		mlog_errno(-ENOMEM);
 		return;
@@ -927,6 +929,7 @@
 
 	INIT_LIST_HEAD(&item->lri_list);
 	item->lri_dinode = la_dinode;
+	item->lri_slot = slot_num;
 
 	spin_lock(&journal->j_lock);
 	list_add_tail(&item->lri_list, &journal->j_la_cleanups);
@@ -934,18 +937,6 @@
 	spin_unlock(&journal->j_lock);
 }
 
-static void ocfs2_queue_orphan_dir_cleanup(ocfs_journal *journal)
-{
-	spin_lock(&journal->j_lock);
-	if (!journal->j_cleanup_orphans) {
-		/* No need to schedule again if someone's already
-		 * doing this. */
-		journal->j_cleanup_orphans = 1;
-		schedule_work(&journal->j_recovery_work);
-	}
-	spin_unlock(&journal->j_lock);
-}
-
 /* Called by the mount code to queue recovery the last part of
  * recovery for it's own slot. */
 void ocfs2_complete_mount_recovery(ocfs_super *osb)
@@ -953,21 +944,18 @@
 	ocfs_journal *journal = osb->journal;
 
 	if (osb->dirty) {
-		ocfs2_queue_local_alloc_cleanup(journal,
+		ocfs2_queue_recovery_completion(journal,
+						osb->slot_num,
 						osb->local_alloc_copy);
 		osb->local_alloc_copy = NULL;
-
-		ocfs2_queue_orphan_dir_cleanup(journal);
-
 		osb->dirty = 0;
 	}
 }
 
 static int __ocfs_recovery_thread(void *arg)
 {
-	int status, node_num, recovered;
+	int status, node_num;
 	ocfs_super *osb = arg;
-	ocfs2_dinode *la_dinode_cp;
 
 	mlog_entry_void();
 
@@ -985,7 +973,6 @@
 		goto bail;
 	}
 
-	recovered = 0;
 	while(!ocfs_node_map_is_empty(osb, &osb->recovery_map)) {
 		node_num = ocfs_node_map_first_set_bit(osb,
 						       &osb->recovery_map);
@@ -994,8 +981,7 @@
 			break;
 		}
 
-		la_dinode_cp = NULL;
-		status = ocfs_recover_node(osb, node_num, &la_dinode_cp);
+		status = ocfs_recover_node(osb, node_num);
 		if (status < 0) {
 			mlog(ML_ERROR, "Error %d recovering node %d on device "
 			     "(%u,%u)!\n", status, node_num,
@@ -1005,21 +991,13 @@
 		}
 
 		ocfs_recovery_map_clear(osb, node_num);
-
-		if (la_dinode_cp) {
-			mlog(0, "queueing local alloc cleanup for node %d\n",
-			     node_num);
-			ocfs2_queue_local_alloc_cleanup(osb->journal,
-							la_dinode_cp);
-		}
-		recovered++;
 	}
 	ocfs2_super_unlock(osb, 1);
 
-	/* Lets not fire off orphan dir cleanup unless we actually had
-	 * to recover a node. */
-	if (recovered)
-		ocfs2_queue_orphan_dir_cleanup(osb->journal);
+	/* We always run recovery on our own orphan dir - the dead
+	 * node(s) may have voted "no" on an inode delete earlier. A
+	 * revote is therefore required. */
+	ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL);
 
 bail:
 	down(&osb->recovery_lock);
@@ -1205,18 +1183,16 @@
  * by the caller.
  */
 static int ocfs_recover_node(ocfs_super *osb,
-			     int node_num,
-			     ocfs2_dinode **la_copy)
+			     int node_num)
 {
 	int status = 0;
 	int slot_num;
 	ocfs2_slot_info *si = osb->slot_info;
+	ocfs2_dinode *la_copy = NULL;
 
 	mlog_entry("(node_num=%d, osb->node_num = %d)\n",
 		       node_num, osb->node_num);
 
-	*la_copy = NULL;
-
 	mlog(0, "ocfs2_recover_node: checking node %d\n", node_num);
 
 	/* Should not ever be called to recover ourselves -- in that
@@ -1224,7 +1200,6 @@
 	if (osb->node_num == node_num)
 		BUG();
 
-	ocfs2_update_slot_info(si);
 	slot_num = ocfs2_node_num_to_slot(si, node_num);
 	if (slot_num == OCFS_INVALID_NODE_NUM) {
 		status = 0;
@@ -1243,7 +1218,7 @@
 	}
 
 	/* Stamp a clean local alloc file AFTER recovering the journal... */
-	status = ocfs_begin_local_alloc_recovery(osb, slot_num, la_copy);
+	status = ocfs_begin_local_alloc_recovery(osb, slot_num, &la_copy);
 	if (status < 0) {
 		mlog_errno(status);
 		goto done;
@@ -1256,6 +1231,9 @@
 	if (status < 0)
 		mlog_errno(status);
 
+	/* This will gobble the memory pointed to by la_copy */
+	ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy);
+
 	status = 0;
 done:
 
@@ -1349,7 +1327,8 @@
 	return status;
 }
 
-int ocfs_recover_orphans(ocfs_super *osb)
+static int ocfs_recover_orphans(ocfs_super *osb,
+				int slot)
 {
 	int status = 0;
 	int have_disk_lock = 0;
@@ -1361,9 +1340,11 @@
 	struct ocfs2_dir_entry *de;
 	struct super_block *sb = osb->sb;
 
+	mlog(0, "Recover inodes from orphan dir in slot %d\n", slot);
+
 	orphan_dir_inode = ocfs_get_system_file_inode(osb, 
 						      ORPHAN_DIR_SYSTEM_INODE, 
-						      -1);
+						      slot);
 	if  (!orphan_dir_inode) {
 		status = -ENOENT;
 		mlog_errno(status);
@@ -1457,7 +1438,13 @@
 		mlog(0, "iput orphan %"MLFu64"\n", OCFS_I(inode)->ip_blkno);
 
 		iter = OCFS_I(inode)->ip_next_orphan;
+
+		spin_lock(&OCFS_I(inode)->ip_lock);
+		OCFS_I(inode)->ip_orphaned_slot = slot;
+		spin_unlock(&OCFS_I(inode)->ip_lock);
+
 		iput(inode);
+
 		inode = iter;
 	}
 

Modified: trunk/fs/ocfs2/journal.h
===================================================================
--- trunk/fs/ocfs2/journal.h	2005-05-11 23:33:01 UTC (rev 2226)
+++ trunk/fs/ocfs2/journal.h	2005-05-12 20:42:57 UTC (rev 2227)
@@ -65,7 +65,6 @@
 	wait_queue_head_t         j_checkpointed;
 
 	spinlock_t                j_lock;
-	unsigned int              j_cleanup_orphans;
 	struct list_head          j_la_cleanups;
 	struct work_struct        j_recovery_work;
 };
@@ -179,9 +178,6 @@
 /* Exported only for the journal struct init code in super.c. Do not call. */
 void ocfs2_complete_recovery(void *data);
 
-/* Needed to complete mount recovery */
-int ocfs_recover_orphans(ocfs_super *osb);
-
 /*
  *  Journal Control:
  *  Initialize, Load, Shutdown, Wipe a journal.

Modified: trunk/fs/ocfs2/namei.c
===================================================================
--- trunk/fs/ocfs2/namei.c	2005-05-11 23:33:01 UTC (rev 2226)
+++ trunk/fs/ocfs2/namei.c	2005-05-12 20:42:57 UTC (rev 2227)
@@ -106,7 +106,7 @@
 static int ocfs_prepare_orphan_dir(ocfs_super *osb, 
 				   ocfs_journal_handle *handle,
 				   struct inode *inode,
-				   char **ret_name,
+				   char *name,
 				   struct buffer_head **de_bh);
 
 static int ocfs_orphan_add(ocfs_super *osb, ocfs_journal_handle *handle,
@@ -129,6 +129,8 @@
 				inode, blkno, parent_fe_bh, insert_bh);
 }
 
+#define OCFS2_ORPHAN_NAMELEN (2 * sizeof(u64))
+
 /*
  * ocfs_lookup()
  *
@@ -763,7 +765,7 @@
 	ocfs_journal_handle *handle = NULL;
 	struct ocfs2_dir_entry *dirent = NULL;
 	struct buffer_head *dirent_bh = NULL;
-	char *orphan_name = NULL;
+	char orphan_name[OCFS2_ORPHAN_NAMELEN + 1];
 	struct buffer_head *orphan_entry_bh = NULL;
 
 	mlog_entry ("(0x%p, 0x%p, '%.*s')\n", dir, dentry,
@@ -831,7 +833,7 @@
 
 	if (S_ISDIR(inode->i_mode) || (inode->i_nlink == 1)) {
 		status = ocfs_prepare_orphan_dir(osb, handle, inode, 
-						 &orphan_name, 
+						 orphan_name, 
 						 &orphan_entry_bh);
 		if (status < 0) {
 			mlog_errno(status);
@@ -917,9 +919,6 @@
 	if (orphan_entry_bh)
 		brelse(orphan_entry_bh);
 
-	if (orphan_name)
-		kfree(orphan_name);
-
 	mlog_exit(status);
 
 	return status;
@@ -1000,7 +999,7 @@
 	struct inode *old_inode = old_dentry->d_inode;
 	struct inode *new_inode = new_dentry->d_inode;
 	ocfs2_dinode *newfe = NULL;
-	char *orphan_name = NULL;
+	char orphan_name[OCFS2_ORPHAN_NAMELEN + 1];
 	struct buffer_head *orphan_entry_bh = NULL;
 	struct buffer_head *newfe_bh = NULL;
 	struct buffer_head *insert_entry_bh = NULL;
@@ -1173,7 +1172,7 @@
 		if (S_ISDIR(new_inode->i_mode) || (new_inode->i_nlink == 1)) {
 			status = ocfs_prepare_orphan_dir(osb, handle, 
 							 new_inode, 
-							 &orphan_name,
+							 orphan_name,
 							 &orphan_entry_bh);
 			if (status < 0) {
 				mlog_errno(status);
@@ -1354,10 +1353,7 @@
 		brelse(orphan_entry_bh);
 	if (insert_entry_bh)
 		brelse(insert_entry_bh);
-	if (orphan_name)
-		kfree(orphan_name);
 
-
 	mlog_exit(status);
 
 	return status;
@@ -1966,63 +1962,56 @@
 	return ret;
 }
 
-static int ocfs_blkno_stringify(u64 blkno, char **retval)
+static int ocfs_blkno_stringify(u64 blkno, char *name)
 {
-	char *name = NULL;
-	int namelen;
+	int status, namelen;
 
 	mlog_entry_void();
 
-	*retval = NULL;
-	name = kmalloc(OCFS2_MAX_FILENAME_LEN + 1, GFP_KERNEL);
-	if (!name) {
-		namelen = -ENOMEM;
-		mlog_errno(namelen);
+	namelen = snprintf(name, OCFS2_ORPHAN_NAMELEN + 1, "%016"MLFx64,
+			   blkno);
+	if (namelen <= 0) {
+		if (namelen)
+			status = namelen;
+		else
+			status = -EINVAL;
+		mlog_errno(status);
 		goto bail;
 	}
-
-	namelen = snprintf(name, OCFS2_MAX_FILENAME_LEN + 1, "%"MLFu64, blkno);
-	if (namelen <= 0) {
-		kfree(name);
-		if (!namelen)
-			namelen = -EFAULT;
-		mlog_errno(namelen);
+	if (namelen != OCFS2_ORPHAN_NAMELEN) {
+		status = -EINVAL;
+		mlog_errno(status);
 		goto bail;
 	}
 
-	mlog(0, "built filename '%s' for orphan dir (len=%d)\n", name, namelen);
+	mlog(0, "built filename '%s' for orphan dir (len=%d)\n", name,
+	     namelen);
 
-	*retval = name;
+	status = 0;
 bail:
-	mlog_exit(namelen);
-	return namelen;
+	mlog_exit(status);
+	return status;
 }
 
 static int ocfs_prepare_orphan_dir(ocfs_super *osb, 
 				   ocfs_journal_handle *handle,
 				   struct inode *inode,
-				   char **ret_name,
+				   char *name,
 				   struct buffer_head **de_bh)
 {
 	struct inode *orphan_dir_inode = NULL;
 	struct buffer_head *orphan_dir_bh = NULL;
 	int status = 0;
-	char *name = NULL;
-	int namelen;
 
-	*ret_name = NULL;
-
-	/* create a unique name here. */
-	namelen = ocfs_blkno_stringify(OCFS_I(inode)->ip_blkno, &name);
-	if (namelen < 0) {
-		status = namelen;
+	status = ocfs_blkno_stringify(OCFS_I(inode)->ip_blkno, name);
+	if (status < 0) {
 		mlog_errno(status);
 		goto leave;
 	}
 
 	orphan_dir_inode = ocfs_get_system_file_inode(osb, 
 						      ORPHAN_DIR_SYSTEM_INODE, 
-						      -1);
+						      osb->slot_num);
 	if (!orphan_dir_inode) {
 		status = -ENOENT;
 		mlog_errno(status);
@@ -2037,21 +2026,17 @@
 	}
 
 	status = ocfs_prepare_dir_for_insert(osb, orphan_dir_inode, 
-					     orphan_dir_bh, name, namelen, 
-					     de_bh);
+					     orphan_dir_bh, name,
+					     OCFS2_ORPHAN_NAMELEN, de_bh);
 	if (status < 0) {
 		mlog_errno(status);
 		goto leave;
 	}
 
-	*ret_name = name;
 leave:
 	if (orphan_dir_inode)
 		iput(orphan_dir_inode);
 
-	if ((status < 0) && name)
-		kfree(name);
-
 	if (orphan_dir_bh)
 		brelse(orphan_dir_bh);
 
@@ -2070,16 +2055,13 @@
 	struct inode *orphan_dir_inode = NULL;
 	struct buffer_head *orphan_dir_bh = NULL;
 	int status = 0;
-	int namelen;
 	ocfs2_dinode *orphan_fe;
 
 	mlog_entry("(inode->i_ino = %lu)\n", inode->i_ino);
 
-	namelen = strlen(name);
-
 	orphan_dir_inode = ocfs_get_system_file_inode(osb, 
 						      ORPHAN_DIR_SYSTEM_INODE, 
-						      -1);
+						      osb->slot_num);
 	if (!orphan_dir_inode) {
 		status = -ENOENT;
 		mlog_errno(status);
@@ -2114,15 +2096,27 @@
 		goto leave;
 	}
 
-	status = __ocfs_add_entry(handle, orphan_dir_inode, name, namelen, 
-				  inode, OCFS_I(inode)->ip_blkno, 
-				  orphan_dir_bh, de_bh);
+	status = __ocfs_add_entry(handle, orphan_dir_inode, name,
+				  OCFS2_ORPHAN_NAMELEN, inode,
+				  OCFS_I(inode)->ip_blkno, orphan_dir_bh,
+				  de_bh);
 	if (status < 0) {
 		mlog_errno(status);
 		goto leave;
 	}
 
 	fe->i_flags |= OCFS2_ORPHANED_FL;
+
+	/* Record which orphan dir our inode now resides
+	 * in. delete_inode will use this to determine which orphan
+	 * dir to lock. */
+	spin_lock(&OCFS_I(inode)->ip_lock);
+	OCFS_I(inode)->ip_orphaned_slot = osb->slot_num;
+	spin_unlock(&OCFS_I(inode)->ip_lock);
+
+	mlog(0, "Inode %"MLFu64" orphaned in slot %d\n",
+	     OCFS_I(inode)->ip_blkno, osb->slot_num);
+
 leave:
 	if (orphan_dir_inode)
 		iput(orphan_dir_inode);
@@ -2139,8 +2133,7 @@
 		    struct inode *orphan_dir_inode, struct inode *inode, 
 		    struct buffer_head *orphan_dir_bh)
 {
-	char *name = NULL;
-	int namelen;
+	char name[OCFS2_ORPHAN_NAMELEN + 1];
 	ocfs2_dinode *orphan_fe;
 	int status = 0;
 	struct buffer_head *target_de_bh = NULL;
@@ -2148,29 +2141,18 @@
 
 	mlog_entry_void();
 
-	name = kmalloc(OCFS2_MAX_FILENAME_LEN + 1, GFP_KERNEL);
-	if (!name) {
-		status = -ENOMEM;
+	status = ocfs_blkno_stringify(OCFS_I(inode)->ip_blkno, name);
+	if (status < 0) {
 		mlog_errno(status);
 		goto leave;
 	}
 
-	namelen = snprintf(name, OCFS2_MAX_FILENAME_LEN + 1, "%"MLFu64, 
-			   OCFS_I(inode)->ip_blkno);
-	if (namelen <= 0) {
-		if (namelen)
-			status = namelen;
-		else
-			status = -EINVAL;
-		mlog_errno(status);
-		goto leave;
-	}
+	mlog(0, "removing '%s' from orphan dir %"MLFu64" (namelen=%d)\n",
+	     name, OCFS_I(orphan_dir_inode)->ip_blkno, OCFS2_ORPHAN_NAMELEN);
 
-	mlog(0, "removing '%s' from orphan dir (len=%d)\n", name, namelen);
-
 	/* find it's spot in the orphan directory */
-	target_de_bh = ocfs_find_entry(name, namelen, orphan_dir_inode, 
-				       &target_de);
+	target_de_bh = ocfs_find_entry(name, OCFS2_ORPHAN_NAMELEN,
+				       orphan_dir_inode, &target_de);
 	if (!target_de_bh) {
 		status = -ENOENT;
 		mlog_errno(status);
@@ -2205,9 +2187,6 @@
 	}
 
 leave:
-	if (name)
-		kfree(name);
-
 	if (target_de_bh)
 		brelse(target_de_bh);
 		

Modified: trunk/fs/ocfs2/ocfs.h
===================================================================
--- trunk/fs/ocfs2/ocfs.h	2005-05-11 23:33:01 UTC (rev 2226)
+++ trunk/fs/ocfs2/ocfs.h	2005-05-12 20:42:57 UTC (rev 2227)
@@ -192,6 +192,7 @@
 	loff_t		ip_mmu_private;
 	struct ocfs2_extent_map ip_map;
 	struct list_head ip_io_markers;
+	int		ip_orphaned_slot;
 
 	struct semaphore  ip_io_sem;
 

Modified: trunk/fs/ocfs2/ocfs2_fs.h
===================================================================
--- trunk/fs/ocfs2/ocfs2_fs.h	2005-05-11 23:33:01 UTC (rev 2226)
+++ trunk/fs/ocfs2/ocfs2_fs.h	2005-05-12 20:42:57 UTC (rev 2227)
@@ -157,8 +157,8 @@
 #define OCFS2_FIRST_ONLINE_SYSTEM_INODE SLOT_MAP_SYSTEM_INODE
 	HEARTBEAT_SYSTEM_INODE,
 	GLOBAL_BITMAP_SYSTEM_INODE,
+#define OCFS2_LAST_GLOBAL_SYSTEM_INODE GLOBAL_BITMAP_SYSTEM_INODE
 	ORPHAN_DIR_SYSTEM_INODE,
-#define OCFS2_LAST_GLOBAL_SYSTEM_INODE ORPHAN_DIR_SYSTEM_INODE
 	EXTENT_ALLOC_SYSTEM_INODE,
 	INODE_ALLOC_SYSTEM_INODE,
 	JOURNAL_SYSTEM_INODE,
@@ -176,9 +176,9 @@
 	[SLOT_MAP_SYSTEM_INODE]			{ "slot_map", 0, S_IFREG | 0644 },
 	[HEARTBEAT_SYSTEM_INODE]		{ "heartbeat", OCFS2_HEARTBEAT_FL, S_IFREG | 0644 },
 	[GLOBAL_BITMAP_SYSTEM_INODE]		{ "global_bitmap", 0, S_IFREG | 0644 },
-	[ORPHAN_DIR_SYSTEM_INODE]		{ "orphan_dir", 0, S_IFDIR | 0755 },
 
 	/* Node-specific system inodes (one copy per node) */
+	[ORPHAN_DIR_SYSTEM_INODE]		{ "orphan_dir:%04d", 0, S_IFDIR | 0755 },
 	[EXTENT_ALLOC_SYSTEM_INODE]		{ "extent_alloc:%04d", OCFS2_BITMAP_FL | OCFS2_CHAIN_FL, S_IFREG | 0644 },
 	[INODE_ALLOC_SYSTEM_INODE]		{ "inode_alloc:%04d", OCFS2_BITMAP_FL | OCFS2_CHAIN_FL, S_IFREG | 0644 },
 	[JOURNAL_SYSTEM_INODE]			{ "journal:%04d", OCFS2_JOURNAL_FL, S_IFREG | 0644 },

Modified: trunk/fs/ocfs2/super.c
===================================================================
--- trunk/fs/ocfs2/super.c	2005-05-11 23:33:01 UTC (rev 2226)
+++ trunk/fs/ocfs2/super.c	2005-05-12 20:42:57 UTC (rev 2227)
@@ -1012,7 +1012,6 @@
 	init_waitqueue_head(&journal->j_checkpointed);
 	spin_lock_init(&journal->j_lock);
 	journal->j_trans_id = (unsigned long) 1;
-	journal->j_cleanup_orphans = 0;
 	INIT_LIST_HEAD(&journal->j_la_cleanups);
 	INIT_WORK(&journal->j_recovery_work, ocfs2_complete_recovery, osb);
 	journal->j_state = OCFS_JOURNAL_FREE;

Modified: trunk/fs/ocfs2/vote.c
===================================================================
--- trunk/fs/ocfs2/vote.c	2005-05-11 23:33:01 UTC (rev 2226)
+++ trunk/fs/ocfs2/vote.c	2005-05-12 20:42:57 UTC (rev 2227)
@@ -67,7 +67,7 @@
 typedef struct _ocfs2_vote_msg
 {
 	ocfs2_msg_hdr v_hdr;
-	/* may put stuff in here... */
+	s32 v_orphaned_slot;	/* Used during delete votes */
 } ocfs2_vote_msg;
 
 /* Responses are given these values to maintain backwards
@@ -80,6 +80,7 @@
 {
 	ocfs2_msg_hdr r_hdr;
 	s32 r_response;
+	s32 r_orphaned_slot;
 } ocfs2_response_msg;
 
 typedef struct _ocfs2_vote_work {
@@ -103,6 +104,13 @@
 		request < OCFS2_VOTE_REQ_LAST;
 }
 
+typedef void (*ocfs2_net_response_callback)(void *priv,
+					    ocfs2_response_msg *resp);
+struct ocfs2_net_response_cb {
+	ocfs2_net_response_callback	rc_cb;
+	void				*rc_priv;
+};
+
 typedef struct _ocfs2_net_wait_ctxt {
 	struct list_head   n_list;
 	u32                n_response_id;
@@ -112,15 +120,9 @@
 					* all nodes are go, < 0 on any
 					* negative response from any
 					* node or network error. */
+	struct ocfs2_net_response_cb *n_callback;
 } ocfs2_net_wait_ctxt;
 
-static void ocfs2_vote_thread_do_work(ocfs_super *osb);
-static void ocfs2_process_vote(ocfs_super *osb,
-			       ocfs2_vote_msg *msg);
-static int ocfs2_do_request_vote(ocfs_super *osb,
-				 u64 blkno,
-				 unsigned int generation,
-				 enum ocfs2_vote_request type);
 
 static void ocfs2_process_mount_request(ocfs_super *osb,
 					unsigned int node_num)
@@ -146,16 +148,52 @@
 	ocfs_node_map_set_bit(osb, &osb->umount_map, node_num);
 }
 
-static int ocfs2_process_delete_request(struct inode *inode)
+static int ocfs2_process_delete_request(struct inode *inode,
+					int *orphaned_slot)
 {
 	int response = OCFS2_RESPONSE_BUSY;
 
-	mlog(0, "DELETE vote on inode %lu, read lnk_cnt = %u\n", inode->i_ino, 
-	     inode->i_nlink);
+	mlog(0, "DELETE vote on inode %lu, read lnk_cnt = %u, slot = %d\n",
+	     inode->i_ino, inode->i_nlink, *orphaned_slot);
 
 	/* force this as ours may be out of date. */
 	inode->i_nlink = 0;
 
+	/* Whatever our vote response is, we want to make sure that
+	 * the orphaned slot is recorded properly on this node *and*
+	 * on the requesting node. Technically, if the requesting node
+	 * did not know which slot the inode is orphaned in but we
+	 * respond with BUSY he doesn't actually need the orphaned
+	 * slot, but it doesn't hurt to do it here anyway. */
+	if ((*orphaned_slot) != OCFS_INVALID_NODE_NUM) {
+		spin_lock(&OCFS_I(inode)->ip_lock);
+
+		mlog_bug_on_msg(OCFS_I(inode)->ip_orphaned_slot != 
+				OCFS_INVALID_NODE_NUM && 
+				OCFS_I(inode)->ip_orphaned_slot != 
+				(*orphaned_slot),
+				"Inode %"MLFu64": This node thinks it's "
+				"orphaned in slot %d, messaged it's in %d\n", 
+				OCFS_I(inode)->ip_blkno,
+				OCFS_I(inode)->ip_orphaned_slot,
+				*orphaned_slot);
+
+		mlog(0, "Setting orphaned slot for inode %"MLFu64" to %d\n",
+		     OCFS_I(inode)->ip_blkno, *orphaned_slot);
+
+		OCFS_I(inode)->ip_orphaned_slot = *orphaned_slot;
+		spin_unlock(&OCFS_I(inode)->ip_lock);
+	} else {
+		spin_lock(&OCFS_I(inode)->ip_lock);
+
+		mlog(0, "Sending back orphaned slot %d for inode %"MLFu64"\n",
+		     OCFS_I(inode)->ip_orphaned_slot,
+		     OCFS_I(inode)->ip_blkno);
+
+		*orphaned_slot = OCFS_I(inode)->ip_orphaned_slot;
+		spin_unlock(&OCFS_I(inode)->ip_lock);
+	}
+
 	spin_lock(&OCFS_I(inode)->ip_lock);
 	/* vote no if the file is still open. */
 	if (OCFS_I(inode)->ip_open_cnt > 0) {
@@ -216,7 +254,7 @@
 static void ocfs2_process_vote(ocfs_super *osb,
 			       ocfs2_vote_msg *msg)
 {
-	int net_status, vote_response;
+	int net_status, vote_response, orphaned_slot;
 	int rename = 0;
 	unsigned int node_num, generation;
 	u64 blkno;
@@ -230,10 +268,11 @@
 	blkno = be64_to_cpu(hdr->h_blkno);
 	generation = ntohl(hdr->h_generation);
 	node_num = ntohl(hdr->h_node_num);
+	orphaned_slot = ntohl(msg->v_orphaned_slot);
 
 	mlog(0, "processing vote: request = %u, blkno = %"MLFu64", "
-	     "generation = %u, node_num = %u\n", request, blkno,
-	     generation, node_num);
+	     "generation = %u, node_num = %u, orphaned_slot = %d\n", request,
+	     blkno, generation, node_num, orphaned_slot);
 
 	if (!ocfs2_is_valid_vote_request(request)) {
 		mlog(ML_ERROR, "Invalid vote request %d from node %u\n",
@@ -278,7 +317,8 @@
 
 	switch (request) {
 	case OCFS2_VOTE_REQ_DELETE:
-		vote_response = ocfs2_process_delete_request(inode);
+		vote_response = ocfs2_process_delete_request(inode,
+							     &orphaned_slot);
 		break;
 	case OCFS2_VOTE_REQ_RENAME:
 		rename = 1;
@@ -301,6 +341,7 @@
 	response.r_hdr.h_generation = hdr->h_generation;
 	response.r_hdr.h_node_num = htonl(osb->node_num);
 	response.r_response = htonl(vote_response);
+	response.r_orphaned_slot = htonl(orphaned_slot);
 
 	net_status = net_send_message(OCFS2_MESSAGE_TYPE_RESPONSE,
 				      osb->net_key,
@@ -433,6 +474,7 @@
 	init_waitqueue_head(&w->n_event);
 	ocfs_node_map_init(&w->n_node_map);
 	w->n_response_id = response_id;
+	w->n_callback = NULL;
 bail:
 	return w;
 }
@@ -498,7 +540,8 @@
 static int ocfs2_broadcast_vote(ocfs_super *osb,
 				ocfs2_vote_msg *request,
 				unsigned int response_id,
-				int *response)
+				int *response,
+				struct ocfs2_net_response_cb *callback)
 {
 	int status, i, remote_err;
 	ocfs2_net_wait_ctxt *w = NULL;
@@ -512,6 +555,7 @@
 		mlog_errno(status);
 		goto bail;
 	}
+	w->n_callback = callback;
 
 	/* we're pretty much ready to go at this point, and this fills
 	 * in n_response which we need anyway... */
@@ -575,7 +619,9 @@
 static int ocfs2_do_request_vote(ocfs_super *osb,
 				 u64 blkno,
 				 unsigned int generation,
-				 enum ocfs2_vote_request type)
+				 enum ocfs2_vote_request type,
+				 int orphaned_slot,
+				 struct ocfs2_net_response_cb *callback)
 {
 	int status, response;
 	unsigned int response_id;
@@ -600,8 +646,10 @@
 	hdr->h_blkno = cpu_to_be64(blkno);
 	hdr->h_generation = htonl(generation);
 	hdr->h_node_num = htonl((unsigned int) osb->node_num);
+	request->v_orphaned_slot = htonl(orphaned_slot);
 
-	status = ocfs2_broadcast_vote(osb, request, response_id, &response);
+	status = ocfs2_broadcast_vote(osb, request, response_id, &response,
+				      callback);
 	if (status < 0) {
 		mlog_errno(status);
 		goto bail;
@@ -616,7 +664,9 @@
 }
 
 static int ocfs2_request_vote(struct inode *inode,
-			      enum ocfs2_vote_request type)
+			      enum ocfs2_vote_request type,
+			      int orphaned_slot,
+			      struct ocfs2_net_response_cb *callback)
 {
 	int status;
 	ocfs_super *osb = OCFS2_SB(inode->i_sb);
@@ -641,26 +691,82 @@
 			status = ocfs2_do_request_vote(osb, 
 						       OCFS_I(inode)->ip_blkno,
 						       inode->i_generation,
-						       type);
+						       type,
+						       orphaned_slot,
+						       callback);
 
 		ocfs2_super_unlock(osb, 0);
 	}
 	return status;
 }
 
+static void ocfs2_delete_response_cb(void *priv,
+				     ocfs2_response_msg *resp)
+{
+	int orphaned_slot, node;
+	struct inode *inode = priv;
+
+	orphaned_slot = ntohl(resp->r_orphaned_slot);
+	node = ntohl(resp->r_hdr.h_node_num);
+	mlog(0, "node %d tells us that inode %"MLFu64" is orphaned in slot "
+	     "%d\n", node, OCFS_I(inode)->ip_blkno, orphaned_slot);
+
+	/* The other node may not actually know which slot the inode
+	 * is orphaned in. */
+	if (orphaned_slot == OCFS_INVALID_NODE_NUM)
+		return;
+
+	/* Ok, the responding node knows which slot this inode is
+	 * orphaned in. We verify that the information is correct and
+	 * then record this in the inode. ocfs_delete_inode will use
+	 * this information to determine which lock to take. */
+	spin_lock(&OCFS_I(inode)->ip_lock);
+	mlog_bug_on_msg(OCFS_I(inode)->ip_orphaned_slot != orphaned_slot &&
+			OCFS_I(inode)->ip_orphaned_slot 
+			!= OCFS_INVALID_NODE_NUM, "Inode %"MLFu64": Node %d "
+			"says it's orphaned in slot %d, we think it's in %d\n",
+			OCFS_I(inode)->ip_blkno, ntohl(resp->r_hdr.h_node_num),
+			orphaned_slot, OCFS_I(inode)->ip_orphaned_slot);
+
+	OCFS_I(inode)->ip_orphaned_slot = orphaned_slot;
+	spin_unlock(&OCFS_I(inode)->ip_lock);
+}
+
 int ocfs2_request_delete_vote(struct inode *inode)
 {
-	return ocfs2_request_vote(inode, OCFS2_VOTE_REQ_DELETE);
+	int orphaned_slot;
+	struct ocfs2_net_response_cb delete_cb;
+
+	spin_lock(&OCFS_I(inode)->ip_lock);
+	orphaned_slot = OCFS_I(inode)->ip_orphaned_slot;
+	spin_unlock(&OCFS_I(inode)->ip_lock);
+
+	delete_cb.rc_cb = ocfs2_delete_response_cb;
+	delete_cb.rc_priv = inode;
+
+	mlog(0, "Inode %"MLFu64", we start thinking orphaned slot is %d\n",
+	     OCFS_I(inode)->ip_blkno, orphaned_slot);
+
+	return ocfs2_request_vote(inode,
+				  OCFS2_VOTE_REQ_DELETE,
+				  orphaned_slot,
+				  &delete_cb);
 }
 
 int ocfs2_request_unlink_vote(struct inode *inode)
 {
-	return ocfs2_request_vote(inode, OCFS2_VOTE_REQ_UNLINK);
+	return ocfs2_request_vote(inode,
+				  OCFS2_VOTE_REQ_UNLINK,
+				  OCFS_INVALID_NODE_NUM,
+				  NULL);
 }
 
 int ocfs2_request_rename_vote(struct inode *inode)
 {
-	return ocfs2_request_vote(inode, OCFS2_VOTE_REQ_RENAME);
+	return ocfs2_request_vote(inode,
+				  OCFS2_VOTE_REQ_RENAME,
+				  OCFS_INVALID_NODE_NUM,
+				  NULL);
 }
 
 int ocfs2_request_mount_vote(ocfs_super *osb)
@@ -677,7 +783,8 @@
 			return 0;
 
 		status = ocfs2_do_request_vote(osb, 0ULL, 0,
-					       OCFS2_VOTE_REQ_MOUNT);
+					       OCFS2_VOTE_REQ_MOUNT,
+					       OCFS_INVALID_NODE_NUM, NULL);
 	}
 	return status;
 }
@@ -696,7 +803,8 @@
 			return 0;
 
 		status = ocfs2_do_request_vote(osb, 0ULL, 0,
-					       OCFS2_VOTE_REQ_UMOUNT);
+					       OCFS2_VOTE_REQ_UMOUNT,
+					       OCFS_INVALID_NODE_NUM, NULL);
 	}
 	return status;
 }
@@ -748,6 +856,7 @@
 	ocfs_super *osb = data;
 	ocfs2_response_msg *resp;
 	ocfs2_net_wait_ctxt * w;
+	struct ocfs2_net_response_cb *resp_cb;
 
 	resp = (ocfs2_response_msg *) msg->buf;
 
@@ -769,6 +878,7 @@
 		mlog(0, "request not found!\n");
 		goto bail;
 	}
+	resp_cb = w->n_callback;
 
 	if (response_status && (!w->n_response)) {
 		/* we only really need one negative response so don't
@@ -776,6 +886,14 @@
 		w->n_response = response_status;
 	}
 
+	if (resp_cb) {
+		spin_unlock(&osb->net_response_lock);
+
+		resp_cb->rc_cb(resp_cb->rc_priv, resp);
+
+		spin_lock(&osb->net_response_lock);
+	}
+
 	__ocfs2_mark_node_responded(osb, w, node_num);
 bail:
 	spin_unlock(&osb->net_response_lock);
@@ -809,6 +927,7 @@
 	     be64_to_cpu(work->w_msg.v_hdr.h_blkno));
 	mlog(0, "h_generation = %u\n", ntohl(work->w_msg.v_hdr.h_generation));
 	mlog(0, "h_node_num = %u\n", ntohl(work->w_msg.v_hdr.h_node_num));
+	mlog(0, "v_orphaned_slot = %d\n", ntohl(work->w_msg.v_orphaned_slot));
 
 	spin_lock(&osb->vote_task_lock);
 	list_add_tail(&work->w_list, &osb->vote_list);



More information about the Ocfs2-commits mailing list