[Ocfs2-tools-devel] [PATCH 1/2] The io_cache implementation

Joel Becker Joel.Becker at oracle.com
Mon Mar 26 21:22:53 PDT 2007


This is a simple caching implementation for iochannel in libocfs2.  When
initialized, it uses an rbtree to look up existing blocks and an lru to
cache unseen blocks.

The cache must be explicitly initialized by a caller.  This allows some
libocfs2 programs to ensure direct read and write.  Initialization
merely takes the size of the cache in blocks or bytes.  io_set_blksize()
must have already been called.

The io_read_block() and io_write_block() calls now use the cache if it
is available.  Writes are write-through - the cache will be updated
*and* the new data will be synchronized to disk.

Index: libocfs2/include/ocfs2.h
===================================================================
--- libocfs2/include/ocfs2.h	(.../trunk)	(revision 1329)
+++ libocfs2/include/ocfs2.h	(.../branches/iocache)	(revision 1329)
@@ -264,6 +264,9 @@ errcode_t io_read_block(io_channel *chan
 			char *data);
 errcode_t io_write_block(io_channel *channel, int64_t blkno, int count,
 			 const char *data);
+errcode_t io_init_cache(io_channel *channel, size_t nr_blocks);
+errcode_t io_init_cache_size(io_channel *channel, size_t bytes);
+void io_destroy_cache(io_channel *channel);
 
 errcode_t ocfs2_read_super(ocfs2_filesys *fs, uint64_t superblock, char *sb);
 errcode_t ocfs2_write_super(ocfs2_filesys *fs);
Index: libocfs2/unix_io.c
===================================================================
--- libocfs2/unix_io.c	(.../trunk)	(revision 1329)
+++ libocfs2/unix_io.c	(.../branches/iocache)	(revision 1329)
@@ -30,6 +30,7 @@
 #define _LARGEFILE64_SOURCE
 #define _GNU_SOURCE /* Because libc really doesn't want us using O_DIRECT? */
 
+#include <assert.h>
 #include <string.h>
 #include <sys/types.h>
 #include <unistd.h>
@@ -45,14 +46,344 @@
 #include "ocfs2.h"
 
 
+/*
+ * The cache looks up blocks in two ways:
+ *
+ * 1) If it needs a new block, it gets one off of ic->ic_lru.  The blocks
+ *    attach to that list via icb->icb_list.
+ *
+ * 2) If it wants to look up an existing block, it gets it from
+ *    ic->ic_lookup.  The blocks are attached vai icb->icb_node.
+ */
+struct io_cache_block {
+	struct rb_node icb_node;
+	struct list_head icb_list;
+	uint64_t icb_blkno;
+	char *icb_buf;
+};
+
+struct io_cache {
+	size_t ic_nr_blocks;
+	struct list_head ic_lru;
+	struct rb_root ic_lookup;
+
+	/* Housekeeping */
+	struct io_cache_block *ic_metadata_buffer;
+	char *ic_data_buffer;
+};
+
 struct _io_channel {
 	char *io_name;
 	int io_blksize;
 	int io_flags;
 	int io_error;
 	int io_fd;
+	struct io_cache *io_cache;
 };
 
+static errcode_t unix_io_read_block(io_channel *channel, int64_t blkno,
+				    int count, char *data)
+{
+	int ret;
+	ssize_t size, tot, rd;
+	uint64_t location;
+
+	/* -ative means count is in bytes */
+	size = (count < 0) ? -count : count * channel->io_blksize;
+	location = blkno * channel->io_blksize;
+
+	tot = 0;
+	while (tot < size) {
+		rd = pread64(channel->io_fd, data + tot,
+			     size - tot, location + tot);
+		ret = OCFS2_ET_IO;
+		if (rd < 0) {
+			channel->io_error = errno;
+			goto out;
+		}
+
+		if (!rd) 
+			goto out;
+
+		tot += rd;
+	}
+
+	ret = 0;
+
+out:
+	if (!ret && tot != size) {
+		ret = OCFS2_ET_SHORT_READ;
+		memset(data + tot, 0, size - tot);
+	}
+
+	return ret;
+}
+
+static errcode_t unix_io_write_block(io_channel *channel, int64_t blkno,
+				     int count, const char *data)
+{
+	int ret;
+	ssize_t size, tot, wr;
+	uint64_t location;
+
+	/* -ative means count is in bytes */
+	size = (count < 0) ? -count : count * channel->io_blksize;
+	location = blkno * channel->io_blksize;
+
+	tot = 0;
+	while (tot < size) {
+		wr = pwrite64(channel->io_fd, data + tot,
+ 			      size - tot, location + tot);
+		ret = OCFS2_ET_IO;
+		if (wr < 0) {
+			channel->io_error = errno;
+			goto out;
+		}
+
+		if (!wr) 
+			goto out;
+
+		tot += wr;
+	}
+
+	ret = 0;
+out:
+	if (!ret && (tot != size))
+		ret = OCFS2_ET_SHORT_WRITE;
+
+	return ret;
+}
+
+
+
+/*
+ * See if the rbtree has a block for the given block number.
+ *
+ * The rb_node garbage lets insertion share the search.  Trivial callers
+ * pass NULL.
+ */
+static struct io_cache_block *io_cache_lookup(struct io_cache *ic,
+					      uint64_t blkno)
+{
+	struct rb_node *p = ic->ic_lookup.rb_node;
+	struct io_cache_block *icb;
+
+	while (p) {
+		icb = rb_entry(p, struct io_cache_block, icb_node);
+		if (blkno < icb->icb_blkno) {
+			p = p->rb_left;
+		} else if (blkno > icb->icb_blkno) {
+			p = p->rb_right;
+		} else
+			return icb;
+	}
+
+	return NULL;
+}
+
+static void io_cache_insert(struct io_cache *ic,
+			    struct io_cache_block *insert_icb)
+{
+	struct rb_node **p = &ic->ic_lookup.rb_node;
+	struct rb_node *parent = NULL;
+	struct io_cache_block *icb = NULL;
+
+	while (*p) {
+		parent = *p;
+		icb = rb_entry(parent, struct io_cache_block, icb_node);
+		if (insert_icb->icb_blkno < icb->icb_blkno) {
+			p = &(*p)->rb_left;
+			icb = NULL;
+		} else if (insert_icb->icb_blkno > icb->icb_blkno) {
+			p = &(*p)->rb_right;
+			icb = NULL;
+		} else
+			assert(0);  /* We erased it, remember? */
+	}
+
+	rb_link_node(&insert_icb->icb_node, parent, p);
+	rb_insert_color(&insert_icb->icb_node, &ic->ic_lookup);
+}
+
+static void io_cache_seen(struct io_cache *ic, struct io_cache_block *icb)
+{
+	/* Move to the front of the LRU */
+	list_del(&icb->icb_list);
+	list_add_tail(&icb->icb_list, &ic->ic_lru);
+}
+
+static errcode_t io_cache_read_one_block(io_channel *channel, int64_t blkno,
+					 char *data)
+{
+	errcode_t ret;
+	struct io_cache *ic = channel->io_cache;
+	struct io_cache_block *icb;
+
+	icb = io_cache_lookup(ic, blkno);
+	if (icb)
+		goto found;
+
+	/*
+	 * Ok, this blkno isn't in the cache.  Steal something.
+	 */
+	icb = list_entry(ic->ic_lru.next, struct io_cache_block, icb_list);
+	if (icb->icb_node.rb_parent) {
+		rb_erase(&icb->icb_node, &ic->ic_lookup);
+		memset(&icb->icb_node, 0, sizeof(struct rb_node));
+	}
+
+	/*
+	 * If the read fails, we leave the block at the end of the LRU
+	 * and out of the lookup tree.
+	 */
+	ret = unix_io_read_block(channel, blkno, 1, icb->icb_buf);
+	if (ret)
+		goto out;
+
+	icb->icb_blkno = blkno;
+	io_cache_insert(ic, icb);
+
+found:
+	memcpy(data, icb->icb_buf, channel->io_blksize);
+	io_cache_seen(ic, icb);
+
+out:
+	return ret;
+}
+
+static errcode_t io_cache_read_block(io_channel *channel, int64_t blkno,
+				     int count, char *data)
+
+{
+	int i;
+	errcode_t ret = 0;
+
+	for (i = 0; i < count; i++, blkno++) {
+		ret = io_cache_read_one_block(channel, blkno, data);
+		if (ret)
+			break;
+	}
+
+	return ret;
+}
+
+static errcode_t io_cache_write_one_block(io_channel *channel,
+					  int64_t blkno, const char *data)
+{
+	errcode_t ret;
+	struct io_cache *ic = channel->io_cache;
+	struct io_cache_block *icb;
+
+	icb = io_cache_lookup(ic, blkno);
+	if (icb)
+		goto found;
+
+	/*
+	 * Ok, this blkno isn't in the cache.  Steal something.
+	 */
+	icb = list_entry(ic->ic_lru.next, struct io_cache_block, icb_list);
+	if (icb->icb_node.rb_parent) {
+		rb_erase(&icb->icb_node, &ic->ic_lookup);
+		memset(&icb->icb_node, 0, sizeof(struct rb_node));
+	}
+
+	icb->icb_blkno = blkno;
+	io_cache_insert(ic, icb);
+
+found:
+	memcpy(icb->icb_buf, data, channel->io_blksize);
+	io_cache_seen(ic, icb);
+
+	ret = unix_io_write_block(channel, blkno, 1, icb->icb_buf);
+	return ret;
+}
+
+static errcode_t io_cache_write_block(io_channel *channel, int64_t blkno,
+				      int count, const char *data)
+
+{
+	int i;
+	errcode_t ret = 0;
+
+	for (i = 0; i < count; i++, blkno++) {
+		ret = io_cache_write_one_block(channel, blkno, data);
+		if (ret)
+			break;
+	}
+
+	return ret;
+}
+
+static void io_free_cache(struct io_cache *ic)
+{
+	if (ic) {
+		if (ic->ic_data_buffer)
+			ocfs2_free(&ic->ic_data_buffer);
+		if (ic->ic_metadata_buffer)
+			ocfs2_free(&ic->ic_metadata_buffer);
+		ocfs2_free(&ic);
+	}
+}
+
+void io_destroy_cache(io_channel *channel)
+{
+	if (channel->io_cache) {
+		io_free_cache(channel->io_cache);
+		channel->io_cache = NULL;
+	}
+}
+
+errcode_t io_init_cache(io_channel *channel, size_t nr_blocks)
+{
+	int i;
+	struct io_cache *ic;
+	char *dbuf;
+	struct io_cache_block *icb_list;
+	errcode_t ret;
+
+	ret = ocfs2_malloc0(sizeof(struct io_cache), &ic);
+	if (ret)
+		goto out;
+
+	ic->ic_nr_blocks = nr_blocks;
+	ic->ic_lookup = RB_ROOT;
+	INIT_LIST_HEAD(&ic->ic_lru);
+
+	ret = ocfs2_malloc_blocks(channel, nr_blocks, &ic->ic_data_buffer);
+	if (ret)
+		goto out;
+
+	ret = ocfs2_malloc0(sizeof(struct io_cache_block) * nr_blocks,
+			    &ic->ic_metadata_buffer);
+	if (ret)
+		goto out;
+
+	icb_list = ic->ic_metadata_buffer;
+	dbuf = ic->ic_data_buffer;
+	for (i = 0; i < nr_blocks; i++) {
+		icb_list[i].icb_buf = dbuf;
+		dbuf += channel->io_blksize;
+		list_add_tail(&icb_list[i].icb_list, &ic->ic_lru);
+	}
+
+	channel->io_cache = ic;
+
+out:
+	if (ret)
+		io_free_cache(ic);
+
+	return ret;
+}
+
+errcode_t io_init_cache_size(io_channel *channel, size_t bytes)
+{
+	size_t blocks;
+
+	blocks = (bytes + (channel->io_blksize - 1)) / channel->io_blksize;
+
+	return io_init_cache(channel, blocks);
+}
+
 
 static errcode_t io_validate_o_direct(io_channel *channel)
 {
@@ -68,7 +399,7 @@ static errcode_t io_validate_o_direct(io
 		if (ret)
 			break;
 
-		ret = io_read_block(channel, 0, 1, blk);
+		ret = unix_io_read_block(channel, 0, 1, blk);
 		ocfs2_free(&blk);
 		if (!ret)
 			break;
@@ -212,74 +543,19 @@ int io_get_blksize(io_channel *channel)
 errcode_t io_read_block(io_channel *channel, int64_t blkno, int count,
 			char *data)
 {
-	int ret;
-	ssize_t size, tot, rd;
-	uint64_t location;
-
-	/* -ative means count is in bytes */
-	size = (count < 0) ? -count : count * channel->io_blksize;
-	location = blkno * channel->io_blksize;
-
-	tot = 0;
-	while (tot < size) {
-		rd = pread64(channel->io_fd, data + tot,
-			     size - tot, location + tot);
-		ret = OCFS2_ET_IO;
-		if (rd < 0) {
-			channel->io_error = errno;
-			goto out;
-		}
-
-		if (!rd) 
-			goto out;
-
-		tot += rd;
-	}
-
-	ret = 0;
-
-out:
-	if (!ret && tot != size) {
-		ret = OCFS2_ET_SHORT_READ;
-		memset(data + tot, 0, size - tot);
-	}
-
-	return ret;
+	if (channel->io_cache)
+		return io_cache_read_block(channel, blkno, count, data);
+	else
+		return unix_io_read_block(channel, blkno, count, data);
 }
 
 errcode_t io_write_block(io_channel *channel, int64_t blkno, int count,
 			 const char *data)
 {
-	int ret;
-	ssize_t size, tot, wr;
-	uint64_t location;
-
-	/* -ative means count is in bytes */
-	size = (count < 0) ? -count : count * channel->io_blksize;
-	location = blkno * channel->io_blksize;
-
-	tot = 0;
-	while (tot < size) {
-		wr = pwrite64(channel->io_fd, data + tot,
- 			      size - tot, location + tot);
-		ret = OCFS2_ET_IO;
-		if (wr < 0) {
-			channel->io_error = errno;
-			goto out;
-		}
-
-		if (!wr) 
-			goto out;
-
-		tot += wr;
-	}
-
-	ret = 0;
-out:
-	if (!ret && (tot != size))
-		ret = OCFS2_ET_SHORT_WRITE;
-
-	return ret;
+	if (channel->io_cache)
+		return io_cache_write_block(channel, blkno, count, data);
+	else
+		return unix_io_write_block(channel, blkno, count, data);
 }
 
 
-- 

"You can get more with a kind word and a gun than you can with
 a kind word alone."
         - Al Capone

Joel Becker
Principal Software Developer
Oracle
E-mail: joel.becker at oracle.com
Phone: (650) 506-8127



More information about the Ocfs2-tools-devel mailing list