[Ocfs2-tools-devel] I/O Cache for libocfs2

Joel Becker Joel.Becker at oracle.com
Fri Aug 24 17:19:23 PDT 2007


Here is an I/O cache backing unix_io.c in libocfs2.  It's a simple read
cache.  Writes are write-through and synchronous.

The user specifies the cache size in bytes or blocks.  When they call
io_read_block() or io_write_block(), the block is looked up in the
cache.  If it exists, a read merely copies the data to the user buffer,
while a write copies the user data to the cache buffer, then syncs the
buffer to disk (write-through).

If the block doesn't exist in the cache, the least recently used cache
block is stolen and filled as above.

The cache is not automatically created during ocfs2_open().  A process
that wants caching needs to enable it after taking the appropriate
locking steps.

-- 

Index: libocfs2/include/ocfs2.h
===================================================================
--- libocfs2/include/ocfs2.h	(revision 1406)
+++ libocfs2/include/ocfs2.h	(working copy)
@@ -265,6 +265,9 @@ errcode_t io_read_block(io_channel *chan
 			char *data);
 errcode_t io_write_block(io_channel *channel, int64_t blkno, int count,
 			 const char *data);
+errcode_t io_init_cache(io_channel *channel, size_t nr_blocks);
+errcode_t io_init_cache_size(io_channel *channel, size_t bytes);
+void io_destroy_cache(io_channel *channel);
 
 errcode_t ocfs2_read_super(ocfs2_filesys *fs, uint64_t superblock, char *sb);
 errcode_t ocfs2_write_super(ocfs2_filesys *fs);
Index: libocfs2/unix_io.c
===================================================================
--- libocfs2/unix_io.c	(revision 1406)
+++ libocfs2/unix_io.c	(working copy)
@@ -30,6 +30,7 @@
 #define _LARGEFILE64_SOURCE
 #define _GNU_SOURCE /* Because libc really doesn't want us using O_DIRECT? */
 
+#include <assert.h>
 #include <string.h>
 #include <sys/types.h>
 #include <unistd.h>
@@ -45,14 +46,358 @@
 #include "ocfs2.h"
 
 
+/*
+ * The cache looks up blocks in two ways:
+ *
+ * 1) If it needs a new block, it gets one off of ic->ic_lru.  The blocks
+ *    attach to that list via icb->icb_list.
+ *
+ * 2) If it wants to look up an existing block, it gets it from
+ *    ic->ic_lookup.  The blocks are attached vai icb->icb_node.
+ */
+struct io_cache_block {
+	struct rb_node icb_node;
+	struct list_head icb_list;
+	uint64_t icb_blkno;
+	char *icb_buf;
+};
+
+struct io_cache {
+	size_t ic_nr_blocks;
+	struct list_head ic_lru;
+	struct rb_root ic_lookup;
+
+	/* Housekeeping */
+	struct io_cache_block *ic_metadata_buffer;
+	char *ic_data_buffer;
+};
+
 struct _io_channel {
 	char *io_name;
 	int io_blksize;
 	int io_flags;
 	int io_error;
 	int io_fd;
+	struct io_cache *io_cache;
 };
 
+static errcode_t unix_io_read_block(io_channel *channel, int64_t blkno,
+				    int count, char *data)
+{
+	int ret;
+	ssize_t size, tot, rd;
+	uint64_t location;
+
+	/* -ative means count is in bytes */
+	size = (count < 0) ? -count : count * channel->io_blksize;
+	location = blkno * channel->io_blksize;
+
+	tot = 0;
+	while (tot < size) {
+		rd = pread64(channel->io_fd, data + tot,
+			     size - tot, location + tot);
+		ret = OCFS2_ET_IO;
+		if (rd < 0) {
+			channel->io_error = errno;
+			goto out;
+		}
+
+		if (!rd) 
+			goto out;
+
+		tot += rd;
+	}
+
+	ret = 0;
+
+out:
+	if (!ret && tot != size) {
+		ret = OCFS2_ET_SHORT_READ;
+		memset(data + tot, 0, size - tot);
+	}
+
+	return ret;
+}
+
+static errcode_t unix_io_write_block(io_channel *channel, int64_t blkno,
+				     int count, const char *data)
+{
+	int ret;
+	ssize_t size, tot, wr;
+	uint64_t location;
+
+	/* -ative means count is in bytes */
+	size = (count < 0) ? -count : count * channel->io_blksize;
+	location = blkno * channel->io_blksize;
+
+	tot = 0;
+	while (tot < size) {
+		wr = pwrite64(channel->io_fd, data + tot,
+ 			      size - tot, location + tot);
+		ret = OCFS2_ET_IO;
+		if (wr < 0) {
+			channel->io_error = errno;
+			goto out;
+		}
+
+		if (!wr) 
+			goto out;
+
+		tot += wr;
+	}
+
+	ret = 0;
+out:
+	if (!ret && (tot != size))
+		ret = OCFS2_ET_SHORT_WRITE;
+
+	return ret;
+}
+
+
+
+/*
+ * See if the rbtree has a block for the given block number.
+ *
+ * The rb_node garbage lets insertion share the search.  Trivial callers
+ * pass NULL.
+ */
+static struct io_cache_block *io_cache_lookup(struct io_cache *ic,
+					      uint64_t blkno)
+{
+	struct rb_node *p = ic->ic_lookup.rb_node;
+	struct io_cache_block *icb;
+
+	while (p) {
+		icb = rb_entry(p, struct io_cache_block, icb_node);
+		if (blkno < icb->icb_blkno) {
+			p = p->rb_left;
+		} else if (blkno > icb->icb_blkno) {
+			p = p->rb_right;
+		} else
+			return icb;
+	}
+
+	return NULL;
+}
+
+static void io_cache_insert(struct io_cache *ic,
+			    struct io_cache_block *insert_icb)
+{
+	struct rb_node **p = &ic->ic_lookup.rb_node;
+	struct rb_node *parent = NULL;
+	struct io_cache_block *icb = NULL;
+
+	while (*p) {
+		parent = *p;
+		icb = rb_entry(parent, struct io_cache_block, icb_node);
+		if (insert_icb->icb_blkno < icb->icb_blkno) {
+			p = &(*p)->rb_left;
+			icb = NULL;
+		} else if (insert_icb->icb_blkno > icb->icb_blkno) {
+			p = &(*p)->rb_right;
+			icb = NULL;
+		} else
+			assert(0);  /* We erased it, remember? */
+	}
+
+	rb_link_node(&insert_icb->icb_node, parent, p);
+	rb_insert_color(&insert_icb->icb_node, &ic->ic_lookup);
+}
+
+static void io_cache_seen(struct io_cache *ic, struct io_cache_block *icb)
+{
+	/* Move to the front of the LRU */
+	list_del(&icb->icb_list);
+	list_add_tail(&icb->icb_list, &ic->ic_lru);
+}
+
+static void io_cache_disconnect(struct io_cache *ic,
+				struct io_cache_block *icb)
+{
+	/*
+	 * This icb should longer be looked up.
+	 * If rb_parent is NULL, it's already disconnected.
+	 */
+	if (icb->icb_node.rb_parent) {
+		rb_erase(&icb->icb_node, &ic->ic_lookup);
+		memset(&icb->icb_node, 0, sizeof(struct rb_node));
+	}
+}
+
+static struct io_cache_block *io_cache_pop_lru(struct io_cache *ic)
+{
+	struct io_cache_block *icb;
+
+	icb = list_entry(ic->ic_lru.next, struct io_cache_block, icb_list);
+	io_cache_disconnect(ic, icb);
+
+	return icb;
+}
+
+static errcode_t io_cache_read_one_block(io_channel *channel, int64_t blkno,
+					 char *data)
+{
+	errcode_t ret;
+	struct io_cache *ic = channel->io_cache;
+	struct io_cache_block *icb;
+
+	icb = io_cache_lookup(ic, blkno);
+	if (icb)
+		goto found;
+
+	/* Ok, this blkno isn't in the cache.  Steal something. */
+	icb = io_cache_pop_lru(ic);
+
+	/*
+	 * If the read fails, we leave the block at the end of the LRU
+	 * and out of the lookup tree.
+	 */
+	ret = unix_io_read_block(channel, blkno, 1, icb->icb_buf);
+	if (ret)
+		goto out;
+
+	icb->icb_blkno = blkno;
+	io_cache_insert(ic, icb);
+
+found:
+	memcpy(data, icb->icb_buf, channel->io_blksize);
+	io_cache_seen(ic, icb);
+
+out:
+	return ret;
+}
+
+static errcode_t io_cache_read_block(io_channel *channel, int64_t blkno,
+				     int count, char *data)
+
+{
+	int i;
+	errcode_t ret = 0;
+
+	for (i = 0; i < count; i++, blkno++, data += channel->io_blksize) {
+		ret = io_cache_read_one_block(channel, blkno, data);
+		if (ret)
+			break;
+	}
+
+	return ret;
+}
+
+static errcode_t io_cache_write_one_block(io_channel *channel,
+					  int64_t blkno, const char *data)
+{
+	errcode_t ret;
+	struct io_cache *ic = channel->io_cache;
+	struct io_cache_block *icb;
+
+	icb = io_cache_lookup(ic, blkno);
+	if (icb)
+		goto found;
+
+	/* Ok, this blkno isn't in the cache.  Steal something. */
+	icb = io_cache_pop_lru(ic);
+
+	icb->icb_blkno = blkno;
+	io_cache_insert(ic, icb);
+
+found:
+	memcpy(icb->icb_buf, data, channel->io_blksize);
+	io_cache_seen(ic, icb);
+
+	ret = unix_io_write_block(channel, blkno, 1, icb->icb_buf);
+	if (ret)
+		io_cache_disconnect(ic, icb);
+
+	return ret;
+}
+
+static errcode_t io_cache_write_block(io_channel *channel, int64_t blkno,
+				      int count, const char *data)
+
+{
+	int i;
+	errcode_t ret = 0;
+
+	for (i = 0; i < count; i++, blkno++, data += channel->io_blksize) {
+		ret = io_cache_write_one_block(channel, blkno, data);
+		if (ret)
+			break;
+	}
+
+	return ret;
+}
+
+static void io_free_cache(struct io_cache *ic)
+{
+	if (ic) {
+		if (ic->ic_data_buffer)
+			ocfs2_free(&ic->ic_data_buffer);
+		if (ic->ic_metadata_buffer)
+			ocfs2_free(&ic->ic_metadata_buffer);
+		ocfs2_free(&ic);
+	}
+}
+
+void io_destroy_cache(io_channel *channel)
+{
+	if (channel->io_cache) {
+		io_free_cache(channel->io_cache);
+		channel->io_cache = NULL;
+	}
+}
+
+errcode_t io_init_cache(io_channel *channel, size_t nr_blocks)
+{
+	int i;
+	struct io_cache *ic;
+	char *dbuf;
+	struct io_cache_block *icb_list;
+	errcode_t ret;
+
+	ret = ocfs2_malloc0(sizeof(struct io_cache), &ic);
+	if (ret)
+		goto out;
+
+	ic->ic_nr_blocks = nr_blocks;
+	ic->ic_lookup = RB_ROOT;
+	INIT_LIST_HEAD(&ic->ic_lru);
+
+	ret = ocfs2_malloc_blocks(channel, nr_blocks, &ic->ic_data_buffer);
+	if (ret)
+		goto out;
+
+	ret = ocfs2_malloc0(sizeof(struct io_cache_block) * nr_blocks,
+			    &ic->ic_metadata_buffer);
+	if (ret)
+		goto out;
+
+	icb_list = ic->ic_metadata_buffer;
+	dbuf = ic->ic_data_buffer;
+	for (i = 0; i < nr_blocks; i++) {
+		icb_list[i].icb_buf = dbuf;
+		dbuf += channel->io_blksize;
+		list_add_tail(&icb_list[i].icb_list, &ic->ic_lru);
+	}
+
+	channel->io_cache = ic;
+
+out:
+	if (ret)
+		io_free_cache(ic);
+
+	return ret;
+}
+
+errcode_t io_init_cache_size(io_channel *channel, size_t bytes)
+{
+	size_t blocks;
+
+	blocks = (bytes + (channel->io_blksize - 1)) / channel->io_blksize;
+
+	return io_init_cache(channel, blocks);
+}
+
 
 static errcode_t io_validate_o_direct(io_channel *channel)
 {
@@ -68,7 +413,7 @@ static errcode_t io_validate_o_direct(io
 		if (ret)
 			break;
 
-		ret = io_read_block(channel, 0, 1, blk);
+		ret = unix_io_read_block(channel, 0, 1, blk);
 		ocfs2_free(&blk);
 		if (!ret)
 			break;
@@ -212,74 +557,19 @@ int io_get_blksize(io_channel *channel)
 errcode_t io_read_block(io_channel *channel, int64_t blkno, int count,
 			char *data)
 {
-	int ret;
-	ssize_t size, tot, rd;
-	uint64_t location;
-
-	/* -ative means count is in bytes */
-	size = (count < 0) ? -count : count * channel->io_blksize;
-	location = blkno * channel->io_blksize;
-
-	tot = 0;
-	while (tot < size) {
-		rd = pread64(channel->io_fd, data + tot,
-			     size - tot, location + tot);
-		ret = OCFS2_ET_IO;
-		if (rd < 0) {
-			channel->io_error = errno;
-			goto out;
-		}
-
-		if (!rd) 
-			goto out;
-
-		tot += rd;
-	}
-
-	ret = 0;
-
-out:
-	if (!ret && tot != size) {
-		ret = OCFS2_ET_SHORT_READ;
-		memset(data + tot, 0, size - tot);
-	}
-
-	return ret;
+	if (channel->io_cache)
+		return io_cache_read_block(channel, blkno, count, data);
+	else
+		return unix_io_read_block(channel, blkno, count, data);
 }
 
 errcode_t io_write_block(io_channel *channel, int64_t blkno, int count,
 			 const char *data)
 {
-	int ret;
-	ssize_t size, tot, wr;
-	uint64_t location;
-
-	/* -ative means count is in bytes */
-	size = (count < 0) ? -count : count * channel->io_blksize;
-	location = blkno * channel->io_blksize;
-
-	tot = 0;
-	while (tot < size) {
-		wr = pwrite64(channel->io_fd, data + tot,
- 			      size - tot, location + tot);
-		ret = OCFS2_ET_IO;
-		if (wr < 0) {
-			channel->io_error = errno;
-			goto out;
-		}
-
-		if (!wr) 
-			goto out;
-
-		tot += wr;
-	}
-
-	ret = 0;
-out:
-	if (!ret && (tot != size))
-		ret = OCFS2_ET_SHORT_WRITE;
-
-	return ret;
+	if (channel->io_cache)
+		return io_cache_write_block(channel, blkno, count, data);
+	else
+		return unix_io_write_block(channel, blkno, count, data);
 }
 
 



More information about the Ocfs2-tools-devel mailing list