[Ocfs2-devel] [PATCH 28/29] ocfs2: Implement quota recovery
Mark Fasheh
mfasheh at suse.com
Wed Nov 5 16:52:27 PST 2008
On Sat, Oct 25, 2008 at 12:08:21AM +0200, Jan Kara wrote:
> Implement functions for recovery after a crash. Functions just
> read local quota file and sync info to global quota file.
>
> Signed-off-by: Jan Kara <jack at suse.cz>
> ---
> fs/ocfs2/journal.c | 105 +++++++++---
> fs/ocfs2/journal.h | 1 +
> fs/ocfs2/ocfs2.h | 4 +-
> fs/ocfs2/quota.h | 21 +++
> fs/ocfs2/quota_global.c | 1 -
> fs/ocfs2/quota_local.c | 430 ++++++++++++++++++++++++++++++++++++++++++++++-
> 6 files changed, 530 insertions(+), 32 deletions(-)
>
> diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
> index f3d7c15..d928db9 100644
> --- a/fs/ocfs2/journal.c
> +++ b/fs/ocfs2/journal.c
> @@ -45,6 +45,7 @@
> #include "slot_map.h"
> #include "super.h"
> #include "sysfile.h"
> +#include "quota.h"
>
> #include "buffer_head_io.h"
>
> @@ -52,7 +53,7 @@ DEFINE_SPINLOCK(trans_inc_lock);
>
> static int ocfs2_force_read_journal(struct inode *inode);
> static int ocfs2_recover_node(struct ocfs2_super *osb,
> - int node_num);
> + int node_num, int slot_num);
> static int __ocfs2_recovery_thread(void *arg);
> static int ocfs2_commit_cache(struct ocfs2_super *osb);
> static int ocfs2_wait_on_mount(struct ocfs2_super *osb);
> @@ -877,6 +878,7 @@ struct ocfs2_la_recovery_item {
> int lri_slot;
> struct ocfs2_dinode *lri_la_dinode;
> struct ocfs2_dinode *lri_tl_dinode;
> + struct ocfs2_quota_recovery *lri_qrec;
> };
>
> /* Does the second half of the recovery process. By this point, the
> @@ -897,6 +899,7 @@ void ocfs2_complete_recovery(struct work_struct *work)
> struct ocfs2_super *osb = journal->j_osb;
> struct ocfs2_dinode *la_dinode, *tl_dinode;
> struct ocfs2_la_recovery_item *item, *n;
> + struct ocfs2_quota_recovery *qrec;
> LIST_HEAD(tmp_la_list);
>
> mlog_entry_void();
> @@ -942,6 +945,16 @@ void ocfs2_complete_recovery(struct work_struct *work)
> if (ret < 0)
> mlog_errno(ret);
>
> + qrec = item->lri_qrec;
> + if (qrec) {
> + mlog(0, "Recovering quota files");
> + ret = ocfs2_finish_quota_recovery(osb, qrec,
> + item->lri_slot);
> + if (ret < 0)
> + mlog_errno(ret);
> + /* Recovery info is already freed now */
> + }
> +
> kfree(item);
> }
>
> @@ -955,7 +968,8 @@ void ocfs2_complete_recovery(struct work_struct *work)
> static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
> int slot_num,
> struct ocfs2_dinode *la_dinode,
> - struct ocfs2_dinode *tl_dinode)
> + struct ocfs2_dinode *tl_dinode,
> + struct ocfs2_quota_recovery *qrec)
> {
> struct ocfs2_la_recovery_item *item;
>
> @@ -970,6 +984,9 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
> if (tl_dinode)
> kfree(tl_dinode);
>
> + if (qrec)
> + ocfs2_free_quota_recovery(qrec);
> +
> mlog_errno(-ENOMEM);
> return;
> }
> @@ -978,6 +995,7 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
> item->lri_la_dinode = la_dinode;
> item->lri_slot = slot_num;
> item->lri_tl_dinode = tl_dinode;
> + item->lri_qrec = qrec;
>
> spin_lock(&journal->j_lock);
> list_add_tail(&item->lri_list, &journal->j_la_cleanups);
> @@ -997,6 +1015,7 @@ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
> ocfs2_queue_recovery_completion(journal,
> osb->slot_num,
> osb->local_alloc_copy,
> + NULL,
> NULL);
> ocfs2_schedule_truncate_log_flush(osb, 0);
>
> @@ -1005,11 +1024,26 @@ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
> }
> }
>
> +void ocfs2_complete_quota_recovery(struct ocfs2_super *osb)
> +{
> + if (osb->quota_rec) {
> + ocfs2_queue_recovery_completion(osb->journal,
> + osb->slot_num,
> + NULL,
> + NULL,
> + osb->quota_rec);
> + osb->quota_rec = NULL;
> + }
> +}
> +
> static int __ocfs2_recovery_thread(void *arg)
> {
> - int status, node_num;
> + int status, node_num, slot_num;
> struct ocfs2_super *osb = arg;
> struct ocfs2_recovery_map *rm = osb->recovery_map;
> + int *rm_quota = NULL;
> + int rm_quota_used = 0, i;
> + struct ocfs2_quota_recovery *qrec;
>
> mlog_entry_void();
>
> @@ -1018,6 +1052,11 @@ static int __ocfs2_recovery_thread(void *arg)
> goto bail;
> }
>
> + rm_quota = kzalloc(osb->max_slots * sizeof(int), GFP_NOFS);
> + if (!rm_quota) {
> + status = -ENOMEM;
> + goto bail;
> + }
> restart:
> status = ocfs2_super_lock(osb, 1);
> if (status < 0) {
> @@ -1031,8 +1070,28 @@ restart:
> * clear it until ocfs2_recover_node() has succeeded. */
> node_num = rm->rm_entries[0];
> spin_unlock(&osb->osb_lock);
> -
> - status = ocfs2_recover_node(osb, node_num);
> + mlog(0, "checking node %d\n", node_num);
> + slot_num = ocfs2_node_num_to_slot(osb, node_num);
> + if (slot_num == -ENOENT) {
> + status = 0;
> + mlog(0, "no slot for this node, so no recovery"
> + "required.\n");
> + goto skip_recovery;
> + }
> + mlog(0, "node %d was using slot %d\n", node_num, slot_num);
> +
> + /* It is a bit subtle with quota recovery. We cannot do it
> + * immediately because we have to obtain cluster locks from
> + * quota files and we also don't want to just skip it because
> + * then quota usage would be out of sync until some node takes
> + * the slot. So we remember which nodes need quota recovery
> + * and when everything else is done, we recover quotas. */
> + for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
> + if (i == rm_quota_used)
> + rm_quota[rm_quota_used++] = slot_num;
> +
> + status = ocfs2_recover_node(osb, node_num, slot_num);
> +skip_recovery:
> if (!status) {
> ocfs2_recovery_map_clear(osb, node_num);
> } else {
> @@ -1056,11 +1115,22 @@ restart:
>
> ocfs2_super_unlock(osb, 1);
>
> + /* Now it is right time to recover quotas... */
> + for (i = 0; i < rm_quota_used; i++) {
> + qrec = ocfs2_begin_quota_recovery(osb, rm_quota[i]);
> + if (IS_ERR(qrec)) {
> + status = PTR_ERR(qrec);
> + mlog_errno(status);
> + }
> + ocfs2_queue_recovery_completion(osb->journal, rm_quota[i],
> + NULL, NULL, qrec);
> + }
I think we want to do this *within* the super block cluster lock. What if a
node mounts and then crashes (with quota file changes in it's journal) in
between ocfs2_super_unlock(osb, 1) and the call in
ocfs2_begin_quota_recovery()? We'd get the cluster lock, but the quota file
info would be stale because we'd have an unrecovered slot.
Also, we call ocfs2_queue_recovery_completion() here even when quota isn't
enabled on the file system. Perhaps ocfs2_begin_quota_recovery() should
return NULL in that case and we can skip the recovery_completion. Otherwise
I think we're just running it against an empty qrec, which seems like a bug
too.
> +/* Load information we need for quota recovery into memory */
> +struct ocfs2_quota_recovery *ocfs2_begin_quota_recovery(
> + struct ocfs2_super *osb,
> + int slot_num)
> +{
> + unsigned int feature[MAXQUOTAS] = { OCFS2_FEATURE_RO_COMPAT_USRQUOTA,
> + OCFS2_FEATURE_RO_COMPAT_GRPQUOTA};
> + unsigned int ino[MAXQUOTAS] = { LOCAL_USER_QUOTA_SYSTEM_INODE,
> + LOCAL_GROUP_QUOTA_SYSTEM_INODE };
> + struct super_block *sb = osb->sb;
> + struct ocfs2_local_disk_dqinfo *ldinfo;
> + struct inode *lqinode;
> + struct buffer_head *bh;
> + int type;
> + int status;
> + struct ocfs2_quota_recovery *rec;
> +
> + mlog(ML_NOTICE, "Beginning quota recovery in slot %u\n", slot_num);
> + rec = ocfs2_alloc_quota_recovery();
> + if (!rec)
> + return ERR_PTR(-ENOMEM);
> + /* First init... */
> +
> + for (type = 0; type < MAXQUOTAS; type++) {
> + if (!OCFS2_HAS_RO_COMPAT_FEATURE(sb, feature[type]))
> + continue;
> + lqinode = ocfs2_get_system_file_inode(osb, ino[type], slot_num);
> + if (!lqinode) {
> + status = -ENOENT;
> + goto out;
> + }
Can we add a comment here saying that we've already recovered the journal, so local
quota file meta data is up to date and can be trusted?
> + status = ocfs2_inode_lock_full(lqinode, NULL, 1,
> + OCFS2_META_LOCK_NOQUEUE);
I think you also want to add OCFS2_META_LOCK_RECOVERY here, otherwise we'll
hang if another node dies before we get the lock.
> + /* Someone else is holding the lock? Then he must be
> + * doing the recovery. Just skip the file... */
> + if (status == -EAGAIN) {
> + mlog(ML_NOTICE, "skipping quota recovery for slot %d "
> + "because quota file is locked.\n", slot_num);
> + status = 0;
> + goto out_put;
> + } else if (status < 0) {
> + mlog_errno(status);
> + goto out_put;
> + }
> + /* Now read local header */
> + bh = ocfs2_read_quota_block(lqinode, 0, &status);
> + if (!bh) {
> + mlog_errno(status);
> + mlog(ML_ERROR, "failed to read quota file info header "
> + "(slot=%d type=%d)\n", slot_num, type);
> + goto out_lock;
> + }
> + ldinfo = (struct ocfs2_local_disk_dqinfo *)(bh->b_data +
> + OCFS2_LOCAL_INFO_OFF);
> + status = ocfs2_recovery_load_quota(lqinode, ldinfo, type,
> + &rec->r_list[type]);
> + brelse(bh);
> +out_lock:
> + ocfs2_inode_unlock(lqinode, 1);
> +out_put:
> + iput(lqinode);
> + if (status < 0)
> + break;
> + }
> +out:
> + if (status < 0) {
> + ocfs2_free_quota_recovery(rec);
> + rec = ERR_PTR(status);
> + }
> + return rec;
> +}
> +
> +/* Sync changes in local quota file into global quota file and
> + * reinitialize local quota file.
> + * The function expects local quota file to be already locked and
> + * dqonoff_mutex locked. */
> +static int ocfs2_recover_local_quota_file(struct inode *lqinode,
> + int type,
> + struct ocfs2_quota_recovery *rec)
> +{
> + struct super_block *sb = lqinode->i_sb;
> + struct ocfs2_mem_dqinfo *oinfo = sb_dqinfo(sb, type)->dqi_priv;
> + struct ocfs2_local_disk_chunk *dchunk;
> + struct ocfs2_local_disk_dqblk *dqblk;
> + struct dquot *dquot;
> + handle_t *handle;
> + struct buffer_head *hbh = NULL, *qbh = NULL;
> + int status = 0;
> + int bit, chunk;
> + struct ocfs2_recovery_chunk *rchunk, *next;
> + qsize_t spacechange, inodechange;
> +
> + mlog_entry("ino=%lu type=%u", (unsigned long)lqinode->i_ino, type);
> +
> + status = ocfs2_lock_global_qf(oinfo, 1);
> + if (status < 0)
> + goto out;
> +
> + list_for_each_entry_safe(rchunk, next, &(rec->r_list[type]), rc_list) {
> + chunk = rchunk->rc_chunk;
> + hbh = ocfs2_read_quota_block(lqinode,
> + ol_quota_chunk_block(sb, chunk),
> + &status);
> + if (!hbh) {
> + mlog_errno(status);
> + break;
> + }
> + dchunk = (struct ocfs2_local_disk_chunk *)hbh->b_data;
> + for_each_bit(bit, rchunk->rc_bitmap, ol_chunk_entries(sb)) {
> + qbh = ocfs2_read_quota_block(lqinode,
> + ol_dqblk_block(sb, chunk, bit),
> + &status);
> + if (!qbh) {
> + mlog_errno(status);
> + break;
> + }
> + dqblk = (struct ocfs2_local_disk_dqblk *)(qbh->b_data +
> + ol_dqblk_block_off(sb, chunk, bit));
> + dquot = dqget(sb, le64_to_cpu(dqblk->dqb_id), type);
> + if (!dquot) {
> + status = -EIO;
> + mlog(ML_ERROR, "Failed to get quota structure "
> + "for id %u, type %d. Cannot finish quota "
> + "file recovery.\n",
> + (unsigned)le64_to_cpu(dqblk->dqb_id),
> + type);
> + goto out_put_bh;
> + }
> + handle = ocfs2_start_trans(OCFS2_SB(sb),
> + OCFS2_QSYNC_CREDITS);
> + if (IS_ERR(handle)) {
> + status = PTR_ERR(handle);
> + mlog_errno(status);
> + goto out_put_dquot;
> + }
> + mutex_lock(&sb_dqopt(sb)->dqio_mutex);
> + spin_lock(&dq_data_lock);
> + /* Add usage from quota entry into quota changes
> + * of our node. Auxiliary variables are important
> + * due to signedness */
> + spacechange = le64_to_cpu(dqblk->dqb_spacemod);
> + inodechange = le64_to_cpu(dqblk->dqb_inodemod);
> + dquot->dq_dqb.dqb_curspace += spacechange;
> + dquot->dq_dqb.dqb_curinodes += inodechange;
> + spin_unlock(&dq_data_lock);
> + /* We want to drop reference held by the crashed
> + * node. Since we have our own reference we know
> + * global structure actually won't be freed. */
> + status = ocfs2_global_release_dquot(dquot);
> + if (status < 0) {
> + mlog_errno(status);
> + goto out_commit;
> + }
> + /* Release local quota file entry */
> + status = ocfs2_journal_access(handle, lqinode,
> + qbh, OCFS2_JOURNAL_ACCESS_WRITE);
> + if (status < 0) {
> + mlog_errno(status);
> + goto out_commit;
> + }
> + lock_buffer(qbh);
> + WARN_ON(!ocfs2_test_bit(bit, dchunk->dqc_bitmap));
> + ocfs2_clear_bit(bit, dchunk->dqc_bitmap);
> + le32_add_cpu(&dchunk->dqc_free, 1);
> + unlock_buffer(qbh);
> + status = ocfs2_journal_dirty(handle, qbh);
> + if (status < 0)
> + mlog_errno(status);
> +out_commit:
> + mutex_unlock(&sb_dqopt(sb)->dqio_mutex);
> + ocfs2_commit_trans(OCFS2_SB(sb), handle);
> +out_put_dquot:
> + dqput(dquot);
> +out_put_bh:
> + brelse(qbh);
> + if (status < 0)
> + break;
> + }
> + brelse(hbh);
> + list_del(&rchunk->rc_list);
> + kfree(rchunk->rc_bitmap);
> + kfree(rchunk);
> + if (status < 0)
> + break;
> + }
> + ocfs2_unlock_global_qf(oinfo, 1);
> +out:
> + if (status < 0)
> + free_recovery_list(&(rec->r_list[type]));
> + mlog_exit(status);
> + return status;
> +}
> +
> +/* Recover local quota files for given node different from us */
> +int ocfs2_finish_quota_recovery(struct ocfs2_super *osb,
> + struct ocfs2_quota_recovery *rec,
> + int slot_num)
> +{
> + unsigned int ino[MAXQUOTAS] = { LOCAL_USER_QUOTA_SYSTEM_INODE,
> + LOCAL_GROUP_QUOTA_SYSTEM_INODE };
> + struct super_block *sb = osb->sb;
> + struct ocfs2_local_disk_dqinfo *ldinfo;
> + struct buffer_head *bh;
> + handle_t *handle;
> + int type;
> + int status = 0;
> + struct inode *lqinode;
> + unsigned int flags;
> +
> + mlog(ML_NOTICE, "Finishing quota recovery in slot %u\n", slot_num);
> + mutex_lock(&sb_dqopt(sb)->dqonoff_mutex);
> + for (type = 0; type < MAXQUOTAS; type++) {
> + if (list_empty(&(rec->r_list[type])))
> + continue;
> + mlog(0, "Recovering quota in slot %d\n", slot_num);
> + lqinode = ocfs2_get_system_file_inode(osb, ino[type], slot_num);
> + if (!lqinode) {
> + status = -ENOENT;
> + goto out;
> + }
> + status = ocfs2_inode_lock_full(lqinode, NULL, 1,
> + OCFS2_META_LOCK_NOQUEUE);
> + /* Someone else is holding the lock? Then he must be
> + * doing the recovery. Just skip the file... */
Hmm, given that we have to do this for quota recovery completion, what's the
point of all the work we do in ocfs2_begin_quota_recovery() ?
--Mark
--
Mark Fasheh
More information about the Ocfs2-devel
mailing list