[Ocfs2-devel] [PATCH 28/29] ocfs2: Implement quota recovery

Mark Fasheh mfasheh at suse.com
Wed Nov 5 16:52:27 PST 2008


On Sat, Oct 25, 2008 at 12:08:21AM +0200, Jan Kara wrote:
> Implement functions for recovery after a crash. Functions just
> read local quota file and sync info to global quota file.
> 
> Signed-off-by: Jan Kara <jack at suse.cz>
> ---
>  fs/ocfs2/journal.c      |  105 +++++++++---
>  fs/ocfs2/journal.h      |    1 +
>  fs/ocfs2/ocfs2.h        |    4 +-
>  fs/ocfs2/quota.h        |   21 +++
>  fs/ocfs2/quota_global.c |    1 -
>  fs/ocfs2/quota_local.c  |  430 ++++++++++++++++++++++++++++++++++++++++++++++-
>  6 files changed, 530 insertions(+), 32 deletions(-)
> 
> diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
> index f3d7c15..d928db9 100644
> --- a/fs/ocfs2/journal.c
> +++ b/fs/ocfs2/journal.c
> @@ -45,6 +45,7 @@
>  #include "slot_map.h"
>  #include "super.h"
>  #include "sysfile.h"
> +#include "quota.h"
>  
>  #include "buffer_head_io.h"
>  
> @@ -52,7 +53,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
>  
>  static int ocfs2_force_read_journal(struct inode *inode);
>  static int ocfs2_recover_node(struct ocfs2_super *osb,
> -			      int node_num);
> +			      int node_num, int slot_num);
>  static int __ocfs2_recovery_thread(void *arg);
>  static int ocfs2_commit_cache(struct ocfs2_super *osb);
>  static int ocfs2_wait_on_mount(struct ocfs2_super *osb);
> @@ -877,6 +878,7 @@ struct ocfs2_la_recovery_item {
>  	int			lri_slot;
>  	struct ocfs2_dinode	*lri_la_dinode;
>  	struct ocfs2_dinode	*lri_tl_dinode;
> +	struct ocfs2_quota_recovery *lri_qrec;
>  };
>  
>  /* Does the second half of the recovery process. By this point, the
> @@ -897,6 +899,7 @@ void ocfs2_complete_recovery(struct work_struct *work)
>  	struct ocfs2_super *osb = journal->j_osb;
>  	struct ocfs2_dinode *la_dinode, *tl_dinode;
>  	struct ocfs2_la_recovery_item *item, *n;
> +	struct ocfs2_quota_recovery *qrec;
>  	LIST_HEAD(tmp_la_list);
>  
>  	mlog_entry_void();
> @@ -942,6 +945,16 @@ void ocfs2_complete_recovery(struct work_struct *work)
>  		if (ret < 0)
>  			mlog_errno(ret);
>  
> +		qrec = item->lri_qrec;
> +		if (qrec) {
> +			mlog(0, "Recovering quota files");
> +			ret = ocfs2_finish_quota_recovery(osb, qrec,
> +							  item->lri_slot);
> +			if (ret < 0)
> +				mlog_errno(ret);
> +			/* Recovery info is already freed now */
> +		}
> +
>  		kfree(item);
>  	}
>  
> @@ -955,7 +968,8 @@ void ocfs2_complete_recovery(struct work_struct *work)
>  static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
>  					    int slot_num,
>  					    struct ocfs2_dinode *la_dinode,
> -					    struct ocfs2_dinode *tl_dinode)
> +					    struct ocfs2_dinode *tl_dinode,
> +					    struct ocfs2_quota_recovery *qrec)
>  {
>  	struct ocfs2_la_recovery_item *item;
>  
> @@ -970,6 +984,9 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
>  		if (tl_dinode)
>  			kfree(tl_dinode);
>  
> +		if (qrec)
> +			ocfs2_free_quota_recovery(qrec);
> +
>  		mlog_errno(-ENOMEM);
>  		return;
>  	}
> @@ -978,6 +995,7 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
>  	item->lri_la_dinode = la_dinode;
>  	item->lri_slot = slot_num;
>  	item->lri_tl_dinode = tl_dinode;
> +	item->lri_qrec = qrec;
>  
>  	spin_lock(&journal->j_lock);
>  	list_add_tail(&item->lri_list, &journal->j_la_cleanups);
> @@ -997,6 +1015,7 @@ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
>  		ocfs2_queue_recovery_completion(journal,
>  						osb->slot_num,
>  						osb->local_alloc_copy,
> +						NULL,
>  						NULL);
>  		ocfs2_schedule_truncate_log_flush(osb, 0);
>  
> @@ -1005,11 +1024,26 @@ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
>  	}
>  }
>  
> +void ocfs2_complete_quota_recovery(struct ocfs2_super *osb)
> +{
> +	if (osb->quota_rec) {
> +		ocfs2_queue_recovery_completion(osb->journal,
> +						osb->slot_num,
> +						NULL,
> +						NULL,
> +						osb->quota_rec);
> +		osb->quota_rec = NULL;
> +	}
> +}
> +
>  static int __ocfs2_recovery_thread(void *arg)
>  {
> -	int status, node_num;
> +	int status, node_num, slot_num;
>  	struct ocfs2_super *osb = arg;
>  	struct ocfs2_recovery_map *rm = osb->recovery_map;
> +	int *rm_quota = NULL;
> +	int rm_quota_used = 0, i;
> +	struct ocfs2_quota_recovery *qrec;
>  
>  	mlog_entry_void();
>  
> @@ -1018,6 +1052,11 @@ static int __ocfs2_recovery_thread(void *arg)
>  		goto bail;
>  	}
>  
> +	rm_quota = kzalloc(osb->max_slots * sizeof(int), GFP_NOFS);
> +	if (!rm_quota) {
> +		status = -ENOMEM;
> +		goto bail;
> +	}
>  restart:
>  	status = ocfs2_super_lock(osb, 1);
>  	if (status < 0) {
> @@ -1031,8 +1070,28 @@ restart:
>  		 * clear it until ocfs2_recover_node() has succeeded. */
>  		node_num = rm->rm_entries[0];
>  		spin_unlock(&osb->osb_lock);
> -
> -		status = ocfs2_recover_node(osb, node_num);
> +		mlog(0, "checking node %d\n", node_num);
> +		slot_num = ocfs2_node_num_to_slot(osb, node_num);
> +		if (slot_num == -ENOENT) {
> +			status = 0;
> +			mlog(0, "no slot for this node, so no recovery"
> +			     "required.\n");
> +			goto skip_recovery;
> +		}
> +		mlog(0, "node %d was using slot %d\n", node_num, slot_num);
> +
> +		/* It is a bit subtle with quota recovery. We cannot do it
> +		 * immediately because we have to obtain cluster locks from
> +		 * quota files and we also don't want to just skip it because
> +		 * then quota usage would be out of sync until some node takes
> +		 * the slot. So we remember which nodes need quota recovery
> +		 * and when everything else is done, we recover quotas. */
> +		for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
> +		if (i == rm_quota_used)
> +			rm_quota[rm_quota_used++] = slot_num;
> +
> +		status = ocfs2_recover_node(osb, node_num, slot_num);
> +skip_recovery:
>  		if (!status) {
>  			ocfs2_recovery_map_clear(osb, node_num);
>  		} else {
> @@ -1056,11 +1115,22 @@ restart:
>  
>  	ocfs2_super_unlock(osb, 1);
>  
> +	/* Now it is right time to recover quotas... */
> +	for (i = 0; i < rm_quota_used; i++) {
> +		qrec = ocfs2_begin_quota_recovery(osb, rm_quota[i]);
> +		if (IS_ERR(qrec)) {
> +			status = PTR_ERR(qrec);
> +			mlog_errno(status);
> +		}
> +		ocfs2_queue_recovery_completion(osb->journal, rm_quota[i],
> +						NULL, NULL, qrec);
> +	}

I think we want to do this *within* the super block cluster lock. What if a
node mounts and then crashes (with quota file changes in it's journal) in
between ocfs2_super_unlock(osb, 1) and the call in
ocfs2_begin_quota_recovery()? We'd get the cluster lock, but the quota file
info would be stale because we'd have an unrecovered slot.


Also, we call ocfs2_queue_recovery_completion() here even when quota isn't
enabled on the file system. Perhaps ocfs2_begin_quota_recovery() should
return NULL in that case and we can skip the recovery_completion. Otherwise
I think we're just running it against an empty qrec, which seems like a bug
too.


> +/* Load information we need for quota recovery into memory */
> +struct ocfs2_quota_recovery *ocfs2_begin_quota_recovery(
> +						struct ocfs2_super *osb,
> +						int slot_num)
> +{
> +	unsigned int feature[MAXQUOTAS] = { OCFS2_FEATURE_RO_COMPAT_USRQUOTA,
> +					    OCFS2_FEATURE_RO_COMPAT_GRPQUOTA};
> +	unsigned int ino[MAXQUOTAS] = { LOCAL_USER_QUOTA_SYSTEM_INODE,
> +					LOCAL_GROUP_QUOTA_SYSTEM_INODE };
> +	struct super_block *sb = osb->sb;
> +	struct ocfs2_local_disk_dqinfo *ldinfo;
> +	struct inode *lqinode;
> +	struct buffer_head *bh;
> +	int type;
> +	int status;
> +	struct ocfs2_quota_recovery *rec;
> +
> +	mlog(ML_NOTICE, "Beginning quota recovery in slot %u\n", slot_num);
> +	rec = ocfs2_alloc_quota_recovery();
> +	if (!rec)
> +		return ERR_PTR(-ENOMEM);
> +	/* First init... */
> +
> +	for (type = 0; type < MAXQUOTAS; type++) {
> +		if (!OCFS2_HAS_RO_COMPAT_FEATURE(sb, feature[type]))
> +			continue;
> +		lqinode = ocfs2_get_system_file_inode(osb, ino[type], slot_num);
> +		if (!lqinode) {
> +			status = -ENOENT;
> +			goto out;
> +		}

Can we add a comment here saying that we've already recovered the journal, so local
quota file meta data is up to date and can be trusted?

> +		status = ocfs2_inode_lock_full(lqinode, NULL, 1,
> +						       OCFS2_META_LOCK_NOQUEUE);

I think you also want to add OCFS2_META_LOCK_RECOVERY here, otherwise we'll
hang if another node dies before we get the lock.


> +		/* Someone else is holding the lock? Then he must be
> +		 * doing the recovery. Just skip the file... */
> +		if (status == -EAGAIN) {
> +			mlog(ML_NOTICE, "skipping quota recovery for slot %d "
> +			     "because quota file is locked.\n", slot_num);
> +			status = 0;
> +			goto out_put;
> +		} else if (status < 0) {
> +			mlog_errno(status);
> +			goto out_put;
> +		}
> +		/* Now read local header */
> +		bh = ocfs2_read_quota_block(lqinode, 0, &status);
> +		if (!bh) {
> +			mlog_errno(status);
> +			mlog(ML_ERROR, "failed to read quota file info header "
> +				"(slot=%d type=%d)\n", slot_num, type);
> +			goto out_lock;
> +		}
> +		ldinfo = (struct ocfs2_local_disk_dqinfo *)(bh->b_data +
> +							OCFS2_LOCAL_INFO_OFF);
> +		status = ocfs2_recovery_load_quota(lqinode, ldinfo, type,
> +						   &rec->r_list[type]);
> +		brelse(bh);
> +out_lock:
> +		ocfs2_inode_unlock(lqinode, 1);
> +out_put:
> +		iput(lqinode);
> +		if (status < 0)
> +			break;
> +	}
> +out:
> +	if (status < 0) {
> +		ocfs2_free_quota_recovery(rec);
> +		rec = ERR_PTR(status);
> +	}
> +	return rec;
> +}
> +
> +/* Sync changes in local quota file into global quota file and
> + * reinitialize local quota file.
> + * The function expects local quota file to be already locked and
> + * dqonoff_mutex locked. */
> +static int ocfs2_recover_local_quota_file(struct inode *lqinode,
> +					  int type,
> +					  struct ocfs2_quota_recovery *rec)
> +{
> +	struct super_block *sb = lqinode->i_sb;
> +	struct ocfs2_mem_dqinfo *oinfo = sb_dqinfo(sb, type)->dqi_priv;
> +	struct ocfs2_local_disk_chunk *dchunk;
> +	struct ocfs2_local_disk_dqblk *dqblk;
> +	struct dquot *dquot;
> +	handle_t *handle;
> +	struct buffer_head *hbh = NULL, *qbh = NULL;
> +	int status = 0;
> +	int bit, chunk;
> +	struct ocfs2_recovery_chunk *rchunk, *next;
> +	qsize_t spacechange, inodechange;
> +
> +	mlog_entry("ino=%lu type=%u", (unsigned long)lqinode->i_ino, type);
> +
> +	status = ocfs2_lock_global_qf(oinfo, 1);
> +	if (status < 0)
> +		goto out;
> +
> +	list_for_each_entry_safe(rchunk, next, &(rec->r_list[type]), rc_list) {
> +		chunk = rchunk->rc_chunk;
> +		hbh = ocfs2_read_quota_block(lqinode,
> +					     ol_quota_chunk_block(sb, chunk),
> +					     &status);
> +		if (!hbh) {
> +			mlog_errno(status);
> +			break;
> +		}
> +		dchunk = (struct ocfs2_local_disk_chunk *)hbh->b_data;
> +		for_each_bit(bit, rchunk->rc_bitmap, ol_chunk_entries(sb)) {
> +			qbh = ocfs2_read_quota_block(lqinode,
> +						ol_dqblk_block(sb, chunk, bit),
> +						&status);
> +			if (!qbh) {
> +				mlog_errno(status);
> +				break;
> +			}
> +			dqblk = (struct ocfs2_local_disk_dqblk *)(qbh->b_data +
> +				ol_dqblk_block_off(sb, chunk, bit));
> +			dquot = dqget(sb, le64_to_cpu(dqblk->dqb_id), type);
> +			if (!dquot) {
> +				status = -EIO;
> +				mlog(ML_ERROR, "Failed to get quota structure "
> +				     "for id %u, type %d. Cannot finish quota "
> +				     "file recovery.\n",
> +				     (unsigned)le64_to_cpu(dqblk->dqb_id),
> +				     type);
> +				goto out_put_bh;
> +			}
> +			handle = ocfs2_start_trans(OCFS2_SB(sb),
> +						   OCFS2_QSYNC_CREDITS);
> +			if (IS_ERR(handle)) {
> +				status = PTR_ERR(handle);
> +				mlog_errno(status);
> +				goto out_put_dquot;
> +			}
> +			mutex_lock(&sb_dqopt(sb)->dqio_mutex);
> +			spin_lock(&dq_data_lock);
> +			/* Add usage from quota entry into quota changes
> +			 * of our node. Auxiliary variables are important
> +			 * due to signedness */
> +			spacechange = le64_to_cpu(dqblk->dqb_spacemod);
> +			inodechange = le64_to_cpu(dqblk->dqb_inodemod);
> +			dquot->dq_dqb.dqb_curspace += spacechange;
> +			dquot->dq_dqb.dqb_curinodes += inodechange;
> +			spin_unlock(&dq_data_lock);
> +			/* We want to drop reference held by the crashed
> +			 * node. Since we have our own reference we know
> +			 * global structure actually won't be freed. */
> +			status = ocfs2_global_release_dquot(dquot);
> +			if (status < 0) {
> +				mlog_errno(status);
> +				goto out_commit;
> +			}
> +			/* Release local quota file entry */
> +			status = ocfs2_journal_access(handle, lqinode,
> +					qbh, OCFS2_JOURNAL_ACCESS_WRITE);
> +			if (status < 0) {
> +				mlog_errno(status);
> +				goto out_commit;
> +			}
> +			lock_buffer(qbh);
> +			WARN_ON(!ocfs2_test_bit(bit, dchunk->dqc_bitmap));
> +			ocfs2_clear_bit(bit, dchunk->dqc_bitmap);
> +			le32_add_cpu(&dchunk->dqc_free, 1);
> +			unlock_buffer(qbh);
> +			status = ocfs2_journal_dirty(handle, qbh);
> +			if (status < 0)
> +				mlog_errno(status);
> +out_commit:
> +			mutex_unlock(&sb_dqopt(sb)->dqio_mutex);
> +			ocfs2_commit_trans(OCFS2_SB(sb), handle);
> +out_put_dquot:
> +			dqput(dquot);
> +out_put_bh:
> +			brelse(qbh);
> +			if (status < 0)
> +				break;
> +		}
> +		brelse(hbh);
> +		list_del(&rchunk->rc_list);
> +		kfree(rchunk->rc_bitmap);
> +		kfree(rchunk);
> +		if (status < 0)
> +			break;
> +	}
> +	ocfs2_unlock_global_qf(oinfo, 1);
> +out:
> +	if (status < 0)
> +		free_recovery_list(&(rec->r_list[type]));
> +	mlog_exit(status);
> +	return status;
> +}
> +
> +/* Recover local quota files for given node different from us */
> +int ocfs2_finish_quota_recovery(struct ocfs2_super *osb,
> +				struct ocfs2_quota_recovery *rec,
> +				int slot_num)
> +{
> +	unsigned int ino[MAXQUOTAS] = { LOCAL_USER_QUOTA_SYSTEM_INODE,
> +					LOCAL_GROUP_QUOTA_SYSTEM_INODE };
> +	struct super_block *sb = osb->sb;
> +	struct ocfs2_local_disk_dqinfo *ldinfo;
> +	struct buffer_head *bh;
> +	handle_t *handle;
> +	int type;
> +	int status = 0;
> +	struct inode *lqinode;
> +	unsigned int flags;
> +
> +	mlog(ML_NOTICE, "Finishing quota recovery in slot %u\n", slot_num);
> +	mutex_lock(&sb_dqopt(sb)->dqonoff_mutex);
> +	for (type = 0; type < MAXQUOTAS; type++) {
> +		if (list_empty(&(rec->r_list[type])))
> +			continue;
> +		mlog(0, "Recovering quota in slot %d\n", slot_num);
> +		lqinode = ocfs2_get_system_file_inode(osb, ino[type], slot_num);
> +		if (!lqinode) {
> +			status = -ENOENT;
> +			goto out;
> +		}
> +		status = ocfs2_inode_lock_full(lqinode, NULL, 1,
> +						       OCFS2_META_LOCK_NOQUEUE);
> +		/* Someone else is holding the lock? Then he must be
> +		 * doing the recovery. Just skip the file... */

Hmm, given that we have to do this for quota recovery completion, what's the
point of all the work we do in ocfs2_begin_quota_recovery() ?

	--Mark

--
Mark Fasheh



More information about the Ocfs2-devel mailing list