WARNING - OLD ARCHIVES

This is an archived copy of the Xen.org mailing list, which we have preserved to ensure that existing links to archives are not broken. The live archive, which contains the latest emails, can be found at http://lists.xen.org/
   
 
 
Xen 
 
Home Products Support Community News
 
   
 

xen-devel

[Xen-devel] [PATCH 6 of 9] Remus tapdisk proxy

To: xen-devel@xxxxxxxxxxxxxxxxxxx
Subject: [Xen-devel] [PATCH 6 of 9] Remus tapdisk proxy
From: Brendan Cully <brendan@xxxxxxxxx>
Date: Wed, 13 May 2009 17:19:34 -0700
Cc: andy@xxxxxxxxx
Delivery-date: Wed, 13 May 2009 17:27:52 -0700
Envelope-to: www-data@xxxxxxxxxxxxxxxxxxx
In-reply-to: <patchbomb.1242260368@xxxxxxxxxxxxxxxxxxxxxxxxxxx>
List-help: <mailto:xen-devel-request@lists.xensource.com?subject=help>
List-id: Xen developer discussion <xen-devel.lists.xensource.com>
List-post: <mailto:xen-devel@lists.xensource.com>
List-subscribe: <http://lists.xensource.com/mailman/listinfo/xen-devel>, <mailto:xen-devel-request@lists.xensource.com?subject=subscribe>
List-unsubscribe: <http://lists.xensource.com/mailman/listinfo/xen-devel>, <mailto:xen-devel-request@lists.xensource.com?subject=unsubscribe>
References: <patchbomb.1242260368@xxxxxxxxxxxxxxxxxxxxxxxxxxx>
Sender: xen-devel-bounces@xxxxxxxxxxxxxxxxxxx
User-agent: Mercurial-patchbomb/1f0f01bc86a5
# HG changeset patch
# User Brendan Cully <brendan@xxxxxxxxx>
# Date 1240355510 25200
# Node ID 03fd0c9729f3d87e7803afb170dfc3cdff184998
# Parent  42fddb3a8edeb80339618b7f758dc2959cf97115
Remus tapdisk proxy.
This proxy forwards local disk writes to a backup server, where they are
buffered until the local disk receives a checkpoint signal from the remus
control tools, which it forwards to the backup to cause buffered writes
to be flushed.

Configuration is of the form
tap:remus:<backup host>:<backup port>:<underlying tapdisk string>

The first write to a disk protected by this proxy will block until the
Remus control tools have been activated, which create the backup
disk proxy.

Signed-off-by: Brendan Cully <brendan@xxxxxxxxx>

diff --git a/tools/blktap/drivers/Makefile b/tools/blktap/drivers/Makefile
--- a/tools/blktap/drivers/Makefile
+++ b/tools/blktap/drivers/Makefile
@@ -11,6 +11,8 @@
 CFLAGS   += $(CFLAGS_libxenctrl)
 CFLAGS   += $(CFLAGS_libxenstore)
 CFLAGS   += -I $(LIBAIO_DIR)
+# for hashtable_itr.h in gcc 4.2
+CFLAGS   += -fgnu89-inline
 CFLAGS   += -D_GNU_SOURCE
 
 ifeq ($(shell . ./check_gcrypt $(CC)),yes)
@@ -22,7 +24,8 @@
 endif
 
 LDFLAGS_blktapctrl := $(LDFLAGS_libxenctrl) $(LDFLAGS_libxenstore) -L../lib 
-lblktap
-LDFLAGS_img := $(LIBAIO_DIR)/libaio.a $(CRYPT_LIB) -lpthread -lz
+# hashtable.c uses ceilf from libm
+LDFLAGS_img := $(LIBAIO_DIR)/libaio.a $(CRYPT_LIB) -lpthread -lm -lz
 
 BLK-OBJS-y  := block-aio.o
 BLK-OBJS-y  += block-sync.o
@@ -30,6 +33,7 @@
 BLK-OBJS-y  += block-ram.o
 BLK-OBJS-y  += block-qcow.o
 BLK-OBJS-y  += block-qcow2.o
+BLK-OBJS-y  += block-remus.o hashtable.o hashtable_itr.o hashtable_utility.o
 BLK-OBJS-y  += aes.o
 BLK-OBJS-y  += tapaio.o
 BLK-OBJS-$(CONFIG_Linux) += blk_linux.o
diff --git a/tools/blktap/drivers/block-remus.c 
b/tools/blktap/drivers/block-remus.c
new file mode 100644
--- /dev/null
+++ b/tools/blktap/drivers/block-remus.c
@@ -0,0 +1,1686 @@
+/* block-remus.c
+ *
+ * This disk sends all writes to a backup via a network interface before
+ * passing them to an underlying device.
+ * The backup is a bit more complicated:
+ *  1. It applies all incoming writes to a ramdisk.
+ *  2. When a checkpoint request arrives, it moves the ramdisk to
+ *     a committing state and uses a new ramdisk for subsequent writes.
+ *     It also acknowledges the request, to let the sender know it can
+ *     release output.
+ *  3. The ramdisk flushes its contents to the underlying driver.
+ *  4. At failover, the backup waits for the in-flight ramdisk (if any) to
+ *     drain before letting the domain be activated.
+ *
+ * The driver determines whether it is the client or server by attempting
+ * to bind to the replication address. If the address is not local,
+ * the driver acts as client.
+ *
+ * The following messages are defined for the replication stream:
+ * 1. write request
+ *    "wreq"      4
+ *    num_sectors 4
+ *    sector      8
+ *    buffer      (num_sectors * sector_size)
+ * 2. submit request (may be used as a barrier
+ *    "sreq"      4
+ * 3. commit request
+ *    "creq"      4
+ * After a commit request, the client must wait for a competion message:
+ * 4. completion
+ *    "done"      4
+ */
+
+/* due to architectural choices in tapdisk, block-buffer is forced to
+ * reimplement some code which is meant to be private */
+#define TAPDISK
+#include "tapdisk.h"
+#include "hashtable.h"
+#include "hashtable_itr.h"
+#include "hashtable_utility.h"
+
+#include <errno.h>
+#include <inttypes.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/param.h>
+#include <sys/sysctl.h>
+#include <unistd.h>
+
+/* timeout for reads and writes in ms */
+#define NET_TIMEOUT 500
+#define RAMDISK_HASHSIZE 128
+
+#define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
+
+enum tdremus_mode {
+  mode_pass = 0,
+  mode_client,
+  mode_server
+};
+
+struct tdremus_req {
+  uint64_t sector;
+  int nb_sectors;
+  char buf[4096];
+};
+
+struct req_ring {
+  /* waste one slot to distinguish between empty and full */
+  struct tdremus_req requests[MAX_REQUESTS * 2 + 1];
+  unsigned int head;
+  unsigned int tail;
+};
+
+struct ramdisk {
+  size_t sector_size;
+  struct hashtable* h;
+  /* when a ramdisk is flushed, h is given a new empty hash for writes
+   * while the old ramdisk (prev) is drained asynchronously. To avoid
+   * a race where a read request points to a sector in prev which has
+   * not yet been flushed, check prev on a miss in h */
+  struct hashtable* prev;
+  /* count of outstanding requests to the base driver */
+  size_t inflight;
+};
+
+/* the ramdisk intercepts the original callback for reads and writes.
+ * This holds the original data. */
+/* Might be worth making this a static array in struct ramdisk to avoid
+ * a malloc per request */
+
+struct tdremus_state;
+
+struct ramdisk_cbdata {
+  td_callback_t cb;
+  void* private;
+  char* buf;
+  struct tdremus_state* state;
+};
+
+struct ramdisk_write_cbdata {
+  struct tdremus_state* state;
+  char* buf;
+};
+
+typedef int (*queue_rw_t) (struct disk_driver *dd, uint64_t sector,
+                          int nb_sectors, char *buf, td_callback_t cb,
+                          int id, void *prv);
+
+struct tdremus_state {
+  struct tap_disk* driver;
+  void* driver_data;
+  char* path;
+
+  char* ctlfifo; /* receive flush instruction here */
+  int ctl_fd_idx; /* io_fd slot for control FIFO */
+  char* msgfifo; /* output completion message here */
+  int msgfd;
+
+  /* replication host */
+  struct in_addr addr;
+  unsigned short port;
+  int sfd;     /* server listen port */
+  int rfd;     /* replication channel */
+  int rfd_idx; /* io_fd slot for replication channel */
+
+  /* queue write requests, batch-replicate at submit */
+  struct req_ring write_ring;
+
+  /* ramdisk data*/
+  struct ramdisk ramdisk;
+
+  /* mode methods */
+  enum tdremus_mode mode;
+  queue_rw_t queue_read;
+  queue_rw_t queue_write;
+  int (*queue_flush)(struct disk_driver* dd);
+  int (*submit)(struct disk_driver* dd);
+  int (*queue_close)(struct disk_driver *dd);
+};
+
+static int tdremus_queue_read(struct disk_driver *dd, uint64_t sector,
+                                 int nb_sectors, char *buf, td_callback_t cb,
+                                 int id, void *private);
+static int tdremus_queue_write(struct disk_driver *dd, uint64_t sector,
+                                  int nb_sectors, char *buf, td_callback_t cb,
+                                  int id, void *private);
+static int tdremus_close(struct disk_driver *dd);
+
+/* ring functions */
+static inline unsigned int ring_next(struct req_ring* ring, unsigned int pos)
+{
+  if (++pos >= MAX_REQUESTS * 2 + 1)
+    return 0;
+
+  return pos;
+}
+
+static inline int ring_isempty(struct req_ring* ring)
+{
+  return ring->head == ring->tail;
+}
+
+static inline int ring_isfull(struct req_ring* ring)
+{
+  return ring_next(ring, ring->tail) == ring->head;
+}
+
+static int buf_queue_write(struct disk_driver *dd, uint64_t sector,
+                          int nb_sectors, char *buf)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  struct tdremus_req* req;
+
+  /* queue requests until flush */
+  if (ring_isfull(&s->write_ring))
+    return -EBUSY;
+
+  req = s->write_ring.requests + s->write_ring.tail;
+  s->write_ring.tail = ring_next(&s->write_ring, s->write_ring.tail);
+
+  req->sector = sector;
+  req->nb_sectors = nb_sectors;
+  memcpy(req->buf, buf, req->nb_sectors * dd->td_state->sector_size);
+
+  return 0;
+}
+
+/* passthrough functions */
+
+static int pass_start(struct disk_driver* dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+
+  s->queue_read = NULL;
+  s->queue_write = NULL;
+  s->submit = NULL;
+  s->queue_flush = NULL;
+  s->queue_close = NULL;
+
+  return 0;
+}
+
+/* ramdisk methods */
+static int ramdisk_flush(struct disk_driver* dd, struct tdremus_state *s);
+
+/* http://www.concentric.net/~Ttwang/tech/inthash.htm */
+static unsigned int uint64_hash(void* k)
+{
+  uint64_t key = *(uint64_t*)k;
+
+  key = (~key) + (key << 18);
+  key = key ^ (key >> 31);
+  key = key * 21;
+  key = key ^ (key >> 11);
+  key = key + (key << 6);
+  key = key ^ (key >> 22);
+  
+  return (unsigned int)key;
+}
+
+static int rd_hash_equal(void* k1, void* k2)
+{
+  uint64_t key1, key2;
+
+  key1 = *(uint64_t*)k1;
+  key2 = *(uint64_t*)k2;
+
+  return key1 == key2;
+}
+
+static int ramdisk_read(struct ramdisk* ramdisk, uint64_t sector,
+                       int nb_sectors, char* buf)
+{
+  int i;
+  char* v;
+  uint64_t key;
+
+  for (i = 0; i < nb_sectors; i++) {
+    key = sector + i;
+    if (!(v = hashtable_search(ramdisk->h, &key))) {
+      /* check whether it is queued in a previous flush request */
+      if (!(ramdisk->prev && (v = hashtable_search(ramdisk->prev, &key))))
+       return -1;
+    }
+    memcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size);
+  }
+
+  return 0;
+}
+
+static int ramdisk_write_hash(struct hashtable* h, uint64_t sector, char* buf,
+                             size_t len)
+{
+  char* v;
+  uint64_t* key;
+  
+  if ((v = hashtable_search(h, &sector))) {
+    memcpy(v, buf, len);
+    return 0;
+  }
+
+  if (!(v = malloc(len))) {
+    DPRINTF("ramdisk_write_hash: malloc failed\n");
+    return -1;
+  }
+  memcpy(v, buf, len);
+  if (!(key = malloc(sizeof(*key)))) {
+    DPRINTF("ramdisk_write_hash: error allocating key\n");
+    free(v);
+    return -1;
+  }
+  *key = sector;
+  if (!hashtable_insert(h, key, v)) {
+    DPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n", sector);
+    free(key);
+    free(v);
+    return -1;
+  }
+
+  return 0;
+}
+
+static inline int ramdisk_write(struct ramdisk* ramdisk, uint64_t sector,
+                               int nb_sectors, char* buf)
+{
+  int i, rc;
+
+  for (i = 0; i < nb_sectors; i++) {
+    rc = ramdisk_write_hash(ramdisk->h, sector + i,
+                           buf + i * ramdisk->sector_size,
+                           ramdisk->sector_size);
+    if (rc)
+      return rc;
+  }
+
+  return 0;
+}
+
+/* the underlying driver calls this callback instead of the original.
+   The result is applied to the ramdisk. Then the ramdisk version of the
+   data is returned via the original callback. */
+static int ramdisk_read_cb(struct disk_driver* dd, int res, uint64_t sector,
+                          int nb_sectors, int id, void* private)
+{
+  struct ramdisk_cbdata *cbdata = (struct ramdisk_cbdata*)private;
+  struct tdremus_state *s = cbdata->state;
+  char* sectorbuf;
+  int rc;
+  int i;
+
+  if (!res) {
+    for (i = 0; i < nb_sectors; i++) {
+      sectorbuf = cbdata->buf + i * s->ramdisk.sector_size;
+      /* if data is in ramdisk, overwrite the buffer read with the ramdisk */
+      ramdisk_read(&s->ramdisk, sector + i, 1, sectorbuf);
+      /* else: should we cache the results? is that just a waste of RAM? */
+    }
+  }
+
+  rc = cbdata->cb(dd, res, sector, nb_sectors, id, cbdata->private);
+
+  free(cbdata);
+  return rc;
+}
+
+static int ramdisk_write_cb(struct disk_driver* dd, int res, uint64_t sector,
+                           int nb_sectors, int id, void* private)
+{
+  struct ramdisk_write_cbdata *cbdata = (struct ramdisk_write_cbdata*)private;
+  struct tdremus_state *s = cbdata->state;
+  int rc;
+
+  /*
+  RPRINTF("ramdisk write callback: rc %d, %d sectors @ %llu\n", res, 
nb_sectors,
+         sector);
+  */
+
+  free(cbdata->buf);
+  free(cbdata);
+
+  s->ramdisk.inflight--;
+  if (!s->ramdisk.inflight && !s->ramdisk.prev) {
+    /* when this reaches 0 and prev is empty, the disk is flushed. */
+    /*
+    RPRINTF("ramdisk flush complete\n");
+    */
+  }
+
+  if (s->ramdisk.prev) {
+    /* resubmit as much as possible in the remaining disk */
+    /*
+    RPRINTF("calling ramdisk_flush from write callback\n");
+    */
+    return ramdisk_flush(dd, s);
+  }
+
+  return 0;
+}
+
+static int ramdisk_queue_read(struct disk_driver *dd, uint64_t sector,
+                             int nb_sectors, char *buf, td_callback_t cb,
+                             int id, void *private)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  struct ramdisk_cbdata *cbdata;
+  int rc;
+
+  /* if block is present in ramdisk, return it. If any sectors are missing,
+   * fail and perform an overlay at the callback */
+  if (!(ramdisk_read(&s->ramdisk, sector, nb_sectors, buf)))
+    return cb(dd, 0, sector, nb_sectors, id, private);
+
+  /* otherwise queue a read to the underlying disk, with a new callback
+   * to fill the ramdisk */
+  if (!(cbdata = malloc(sizeof(*cbdata)))) {
+    DPRINTF("ramdisk_queue_read: Error allocating callback\n");
+    return -1;
+  }
+  cbdata->cb = cb;
+  cbdata->private = private;
+  cbdata->buf = buf;
+  /* callback may only have driver_data available */
+  cbdata->state = s;
+  
+  dd->private = s->driver_data;
+  rc = s->driver->td_queue_read(dd, sector, nb_sectors, buf, ramdisk_read_cb,
+                               id, cbdata);
+  dd->private = s;
+
+  return rc;
+}
+
+/* apply the write to the ramdisk directly. Wait until explicit flush
+ * to move to disk. Later we should probably trickle writes to the
+ * backup from here (not every call though). */
+static int ramdisk_queue_write(struct disk_driver *dd, uint64_t sector,
+                              int nb_sectors, char *buf, td_callback_t cb,
+                              int id, void *private)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  int rc;
+
+  /* write segment to ramdisk */
+  if ((rc = ramdisk_write(&s->ramdisk, sector, nb_sectors, buf))) {
+    DPRINTF("Ramdisk write failed: %d\n", rc);
+    return rc;
+  }
+
+  return cb(dd, 0, sector, nb_sectors, id, private);
+}
+
+static int uint64_compare(const void* k1, const void* k2)
+{
+  uint64_t u1 = *(uint64_t*)k1;
+  uint64_t u2 = *(uint64_t*)k2;
+
+  /* u1 - u2 is unsigned */
+  return u1 < u2 ? -1 : u1 > u2 ? 1 : 0;
+}
+
+/* set psectors to an array of the sector numbers in the hash, returning
+ * the number of entries (or -1 on error) */
+static int ramdisk_get_sectors(struct hashtable* h, uint64_t** psectors)
+{
+  struct hashtable_itr* itr;
+  uint64_t* sectors;
+  int count;
+
+  if (!(count = hashtable_count(h)))
+    return 0;
+
+  if (!(*psectors = malloc(count * sizeof(uint64_t)))) {
+    DPRINTF("ramdisk_get_sectors: error allocating sector map\n");
+    return -1;
+  }
+  sectors = *psectors;
+
+  itr = hashtable_iterator(h);
+  count = 0;
+  do {
+    sectors[count++] = *(uint64_t*)hashtable_iterator_key(itr);
+  } while (hashtable_iterator_advance(itr));
+  free(itr);
+
+  return count;
+}
+
+static char* merge_requests(struct ramdisk* ramdisk, uint64_t start,
+                           size_t count)
+{
+  char* buf;
+  char* sector;
+  int i;
+
+  if (!(buf = valloc(count * ramdisk->sector_size))) {
+    DPRINTF("merge_request: allocation failed\n");
+    return NULL;
+  }
+
+  for (i = 0; i < count; i++) {
+    if (!(sector = hashtable_search(ramdisk->prev, &start))) {
+      DPRINTF("merge_request: lookup failed on %"PRIu64"\n", start);
+      return NULL;
+    }
+
+    memcpy(buf + i * ramdisk->sector_size, sector, ramdisk->sector_size);
+    free(sector);
+
+    start++;
+  }
+
+  return buf;
+}
+
+/* The underlying driver may not handle having the whole ramdisk queued at
+ * once. We queue what we can and let the callbacks attempt to queue more. */
+/* NOTE: may be called from callback, while dd->private still belongs to
+ * the underlying driver */
+static int ramdisk_flush(struct disk_driver* dd, struct tdremus_state* s)
+{
+  struct ramdisk_write_cbdata* cbdata;
+  uint64_t* sectors;
+  char* buf;
+  uint64_t base, batchlen;
+  int i, j, count = 0;
+
+  /*
+  RPRINTF("ramdisk flush\n");
+  */
+
+  if ((count = ramdisk_get_sectors(s->ramdisk.prev, &sectors)) <= 0)
+    return count;
+
+  /*
+  RPRINTF("ramdisk: flushing %d sectors\n", count);
+  */
+  
+  /* sort and merge sectors to improve disk performance */
+  qsort(sectors, count, sizeof(*sectors), uint64_compare);
+
+  for (i = 0; i < count;) {
+    if (!(cbdata = malloc(sizeof(*cbdata)))) {
+      RPRINTF("ramdisk_flush: error allocating cbdata\n");
+      free(sectors);
+      return -1;
+    }
+
+    base = sectors[i++];
+    while (i < count && sectors[i] == sectors[i-1] + 1)
+      i++;
+    batchlen = sectors[i-1] - base + 1;
+
+    if (!(buf = merge_requests(&s->ramdisk, base, batchlen))) {
+      RPRINTF("ramdisk_flush: merge_requests failed\n");
+      free(sectors);
+      return -1;
+    }
+
+    /* we probably want to record an ID for in-flight requests in ramdisk */
+    cbdata->state = s;
+    cbdata->buf = buf;
+
+    /*
+    RPRINTF("queuing write at %llu, length: %llu\n", base, batchlen);
+    */
+    dd->private = s->driver_data;
+    j = s->driver->td_queue_write(dd, base, batchlen, buf, ramdisk_write_cb,
+                                 0, cbdata);
+    dd->private = s;
+
+    if (j) {
+      RPRINTF("ramdisk queue returned %d\n", j);
+      free(cbdata);
+      free(buf);
+      break;
+    }
+
+    s->ramdisk.inflight++;
+    
+    for (j = 0; j < batchlen; j++) {
+      hashtable_remove(s->ramdisk.prev, &base);
+      base++;
+    }
+  }
+
+  if (!hashtable_count(s->ramdisk.prev)) {
+    /* everything is in flight */
+    hashtable_destroy(s->ramdisk.prev, 0);
+    s->ramdisk.prev = NULL;
+  }
+  
+  free(sectors);
+
+  /*
+  RPRINTF("submitting requests\n");
+  */
+  dd->private = s->driver_data;
+  j = s->driver->td_submit(dd);
+  dd->private = s;
+  /*
+  RPRINTF("submit returned %d\n", j);
+  */
+
+  return 0;
+}
+
+/* flush ramdisk contents to disk */
+static int ramdisk_start_flush(struct disk_driver* dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  uint64_t* key;
+  char* buf;
+  int rc = 0;
+  int i, j, count, batchlen;
+  uint64_t* sectors;
+
+  if (!hashtable_count(s->ramdisk.h)) {
+    /*
+    RPRINTF("Nothing to flush\n");
+    */
+    return 0;
+  }
+
+  if (s->ramdisk.prev) {
+    /* a flush request issued while a previous flush is still in progress
+     * will merge with the previous request. If you want the previous
+     * request to be consistent, wait for it to complete. */
+    if ((count = ramdisk_get_sectors(s->ramdisk.h, &sectors)) < 0)
+      return count;
+
+    for (i = 0; i < count; i++) {
+      buf = hashtable_search(s->ramdisk.h, sectors + i);
+      ramdisk_write_hash(s->ramdisk.prev, sectors[i], buf,
+                        s->ramdisk.sector_size);
+    }
+    free(sectors);
+
+    hashtable_destroy (s->ramdisk.h, 0);
+  } else
+    s->ramdisk.prev = s->ramdisk.h;
+
+  /* We create a new hashtable so that new writes can be performed before
+   * the old hashtable is completely drained. */
+  s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
+                                 rd_hash_equal);
+
+  return ramdisk_flush(dd, s);
+}
+
+static int ramdisk_queue_close(struct disk_driver* dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+
+  hashtable_destroy(s->ramdisk.h, 1);
+
+  return 0;
+}
+
+static int ramdisk_start(struct disk_driver* dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+
+  if (s->ramdisk.h) {
+    RPRINTF("ramdisk already allocated\n");
+    return 0;
+  }
+
+  s->ramdisk.sector_size = dd->td_state->sector_size;
+  s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
+                                 rd_hash_equal);
+
+  DPRINTF("Ramdisk started, %zu bytes/sector\n", s->ramdisk.sector_size);
+  
+  return 0;
+}
+
+/* common client/server functions */
+/* mayberead: Time out after a certain interval. */
+static int mread(int fd, void* buf, size_t len)
+{
+  fd_set rfds;
+  int rc;
+  size_t cur = 0;
+  struct timeval tv = {
+    .tv_sec = 0,
+    .tv_usec = NET_TIMEOUT * 1000
+  };
+
+  if (!len)
+    return 0;
+
+  /* read first. Only select if read is incomplete. */
+  rc = read(fd, buf, len);
+  while (rc < 0 || cur + rc < len) {
+    if (!rc) {
+      RPRINTF("end-of-file");
+      return -1;
+    }
+    if (rc < 0 && errno != EAGAIN) {
+      RPRINTF("error during read: %d\n", errno);
+      return -1;
+    }
+    if (rc > 0)
+      cur += rc;
+
+    FD_ZERO(&rfds);
+    FD_SET(fd, &rfds);
+    if (!(rc = select(fd + 1, &rfds, NULL, NULL, &tv))) {
+      RPRINTF("time out during read\n");
+      return -1;
+    } else if (rc < 0) {
+      RPRINTF("error during select: %d\n", errno);
+      return -1;
+    }
+    rc = read(fd, buf + cur, len - cur);
+  }
+  /*
+  RPRINTF("read %d bytes\n", cur + rc);
+  */
+
+  return 0;
+}
+
+static int mwrite(int fd, void* buf, size_t len)
+{
+  fd_set wfds;
+  size_t cur = 0;
+  int rc;
+  struct timeval tv = {
+    .tv_sec = 0,
+    .tv_usec = NET_TIMEOUT * 1000 
+  };
+
+  if (!len)
+    return 0;
+
+  /* read first. Only select if read is incomplete. */
+  rc = write(fd, buf, len);
+  while (rc < 0 || cur + rc < len) {
+    if (!rc) {
+      RPRINTF("end-of-file");
+      return -1;
+    }
+    if (rc < 0 && errno != EAGAIN) {
+      RPRINTF("error during read: %d\n", errno);
+      return -1;
+    }
+    if (rc > 0)
+      cur += rc;
+
+    FD_ZERO(&wfds);
+    FD_SET(fd, &wfds);
+    if (!(rc = select(fd + 1, NULL, &wfds, NULL, &tv))) {
+      RPRINTF("time out during write\n");
+      return -1;
+    } else if (rc < 0) {
+      RPRINTF("error during select: %d\n", errno);
+      return -1;
+    }
+    rc = write(fd, buf + cur, len - cur);
+  }
+  /*
+  RPRINTF("wrote %d bytes\n", cur + rc);
+  */
+
+  return 0;
+  FD_ZERO(&wfds);
+  FD_SET(fd, &wfds);
+  select(fd + 1, NULL, &wfds, NULL, &tv);
+}
+
+/* add a new FD to tapdisk selector */
+static int install_tdfd(struct disk_driver* dd, int* pidx, int fd) {
+  int idx;
+
+  for (idx = 0; idx < MAX_IOFD; idx++) {
+    if (!dd->io_fd[idx])
+      break;
+  }
+  if (idx == MAX_IOFD) {
+    RPRINTF("no free FD for replication channel\n");
+    return -1;
+  }
+
+  dd->io_fd[idx] = fd;
+  *pidx = idx;
+
+  return 0;
+}
+
+static int close_rfd(struct disk_driver* dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+
+  RPRINTF("closing replication channel\n");
+
+  close(s->rfd);
+  s->rfd = -2;
+  dd->io_fd[s->rfd_idx] = 0;
+  s->rfd_idx = -1;
+
+  return 0;
+}
+
+/* remus client */
+
+static int client_connect(struct disk_driver* dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  struct sockaddr_in sa;
+  int fd;
+  int rc;
+
+  RPRINTF("client connecting to %s:%d...\n", inet_ntoa(s->addr), s->port);
+
+  if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+    RPRINTF("could not create client socket: %d\n", errno);
+    return -1;
+  }
+
+  memset(&sa, 0, sizeof(sa));
+  sa.sin_family = AF_INET;
+  sa.sin_addr.s_addr = s->addr.s_addr;
+  sa.sin_port = htons(s->port);
+  /* wait for remote end to start up */
+  do {
+    if ((rc = connect(fd, &sa, sizeof(sa))) < 0) {
+      if (errno == ECONNREFUSED) {
+       RPRINTF("connection refused -- retrying in 1 second\n");
+       sleep(1);
+      } else {
+       RPRINTF("connection failed: %d\n", errno);
+       goto err_sock;
+      }
+    }
+  } while (rc < 0);
+
+  RPRINTF("client connected\n");
+
+  if (install_tdfd(dd, &s->rfd_idx, fd) < 0)
+    goto err_sock;
+  RPRINTF("replication channel in slot %d\n", s->rfd_idx);
+
+  if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
+    RPRINTF("error making socket non-blocking\n");
+    goto err_sock;
+  }
+
+  s->rfd = fd;
+
+  return 0;
+
+  err_sock:
+  close(fd);
+  return -1;
+}
+
+static int client_replicate(struct disk_driver* dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  struct tdremus_req* req;
+  int len, rc;
+  uint64_t header[2];
+
+  /* -2 means connection ended -- give up replication */
+  if (s->rfd == -2) {
+    RPRINTF("engaging passthrough mode\n");
+    pass_start(dd);
+    s->mode == mode_pass;
+    return 0;
+  }
+
+  if (ring_isempty(&s->write_ring))
+    return 0;
+
+  if (s->rfd < 0 && client_connect(dd) < 0) {
+    RPRINTF("replication failed");
+    return -1;
+  }
+  /* TODO: it would probably make sense to send the header for all
+   * requests in this batch in one write. */
+  memcpy(header, "wreq", 4);
+  while (!ring_isempty(&s->write_ring)) {
+    req = s->write_ring.requests + s->write_ring.head;
+    ((uint32_t*)header)[1] = req->nb_sectors;
+    header[1] = req->sector;
+
+    /*
+    RPRINTF("sending write request: %lu bytes at %llu\n",
+           req->nb_sectors * dd->td_state->sector_size, req->sector);
+    */
+
+    if (mwrite(s->rfd, header, sizeof(header)) < 0)
+      goto err_write;
+    if (mwrite(s->rfd, req->buf,
+              req->nb_sectors * dd->td_state->sector_size) < 0)
+      goto err_write;
+
+    s->write_ring.head = ring_next(&s->write_ring, s->write_ring.head);
+  }
+  /* submit barrier */
+  /*
+  RPRINTF("sending submit\n");
+  if (mwrite(s->rfd, "sreq", 4) < 0)
+    goto err_write;
+  */
+
+  return 0;
+
+  err_write:
+  close_rfd(dd);
+  return -1;
+}
+
+/* store a copy of the request in order to replicate it at submit */
+static int client_queue_write(struct disk_driver *dd, uint64_t sector,
+                              int nb_sectors, char *buf, td_callback_t cb,
+                              int id, void *private)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  int rc;
+
+  /* this is probably unsafe: if the underlying disk makes the callback
+   * before the request has been propagated, the buffer may be damaged.
+   * Perhaps the callback should be overridden to ensure the request
+   * has been sent before continuing. Or maybe we should just copy the
+   * buffer. */
+  /* UPDATE: using copies now */
+  if (ring_isfull(&s->write_ring)) {
+    RPRINTF("ring full, flushing\n");
+    client_replicate(dd);
+  }
+
+  /* client_replicate may have disabled replication on error */
+  if (s->mode == mode_client)
+    if ((rc = buf_queue_write(dd, sector, nb_sectors, buf)) < 0) {
+      RPRINTF("buf_queue_write returned %d\n", rc);
+      return rc;
+    }
+  dd->private = s->driver_data;
+  rc = s->driver->td_queue_write(dd, sector, nb_sectors, buf, cb, id,
+                                private);
+  dd->private = s;
+
+  return rc;
+}
+
+/* submit requests, then replicate them while the underlying disk
+ * is handling the requests */
+static int client_submit(struct disk_driver* dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  int rc;
+
+  dd->private = s->driver_data;
+  rc = s->driver->td_submit(dd);
+  dd->private = s;
+
+  client_replicate(dd);
+
+  return rc;
+}
+
+static int client_flush(struct disk_driver* dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+
+  /*
+  RPRINTF("committing output\n");
+  */
+
+  if (mwrite(s->rfd, "creq", 4) < 0) {
+    close_rfd(dd);
+  }
+
+  return 0;
+}
+
+static int client_start(struct disk_driver* dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+
+  RPRINTF("activating client mode\n");
+
+  s->queue_write = client_queue_write;
+  s->submit = client_submit;
+  s->queue_flush = client_flush;
+
+  return 0;
+}
+
+/* wait for "done" message to commit checkpoint */
+static int client_do_replicate(struct disk_driver* dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  char buf[5];
+  int rc;
+
+  buf[4] = '\0';
+  if (mread(s->rfd, buf, 4) < 0) {
+    close_rfd(dd);
+    return -1;
+  }
+
+  if (!strncmp(buf, "done", 4)) {
+    if ((rc = write(s->msgfd, buf, 4)) < 0) {
+      RPRINTF("error writing notification: %d\n", errno);
+
+      close(s->msgfd);
+      if ((s->msgfd = open(s->msgfifo, O_RDWR)) < 0) {
+       RPRINTF("error reopening FIFO: %d\n", errno);
+       return -1;
+      }
+    }
+  } else
+    RPRINTF("received unknown message: %s\n", buf);
+
+  return 0;
+}
+
+/* remus server */
+
+/* returns the socket that receives write requests */
+static int remus_accept(struct tdremus_state* s)
+{
+  int fd;
+  struct sockaddr_in sa;
+  socklen_t sa_len;
+
+  RPRINTF("server waiting for connection\n");
+  sa_len = sizeof(sa);
+  if ((fd = accept(s->sfd, &sa, &sa_len)) < 0) {
+    RPRINTF("error accepting connection: %d\n", errno);
+    return -1;
+  }
+
+  RPRINTF("server accepted connection\n");
+
+  return fd;
+}
+
+/* returns -2 if EADDRNOTAVAIL */
+static int remus_bind(struct tdremus_state* s)
+{
+  struct sockaddr_in sa;
+  int opt;
+  int rc = -1;
+
+  if ((s->sfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+    RPRINTF("could not create server socket: %d\n", errno);
+    return rc;
+  }
+  opt = 1;
+  if (setsockopt(s->sfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0)
+    RPRINTF("Error setting REUSEADDR on %d: %d\n", s->sfd, errno);
+
+  memset(&sa, 0, sizeof(sa));
+  sa.sin_family = AF_INET;
+  sa.sin_addr.s_addr = s->addr.s_addr;
+  sa.sin_port = htons(s->port);
+  if (bind(s->sfd, &sa, sizeof(sa)) < 0) {
+    RPRINTF("could not bind server socket %d to %s:%d: %d %s\n", s->sfd,
+           inet_ntoa(sa.sin_addr), s->port, errno, strerror(errno));
+    if (errno != EADDRINUSE)
+      rc = -2;
+    goto err_sfd;
+  }
+  if (listen(s->sfd, 10)) {
+    RPRINTF("could not listen on socket: %d\n", errno);
+    goto err_sfd;
+  }
+
+  return 0;
+
+  err_sfd:
+  close(s->sfd);
+  s->sfd = -1;
+
+  return rc;
+}
+
+/* wait for latest checkpoint to be applied */
+static inline int server_writes_inflight(struct disk_driver* dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+
+  if (!s->ramdisk.inflight && !s->ramdisk.prev)
+    return 0;
+
+  return 1;
+}
+
+/* this should not be called until the domain has failed over.
+ * Its duty is to make sure the latest checkpoint is applied
+ * before resuming. */
+static int server_queue_read(struct disk_driver *dd, uint64_t sector,
+                            int nb_sectors, char *buf, td_callback_t cb,
+                            int id, void *private)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+
+  if (server_writes_inflight(dd)) {
+    RPRINTF("queue_read: waiting for queue to drain");
+    return -EBUSY;
+  }
+
+  RPRINTF("queue_read: activating backup");
+  pass_start(dd);
+  tdremus_queue_read(dd, sector, nb_sectors, buf, cb, id, private);
+
+  return 0;
+}
+
+/* see above */
+static int server_queue_write(struct disk_driver *dd, uint64_t sector,
+                            int nb_sectors, char *buf, td_callback_t cb,
+                            int id, void *private)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+
+  if (server_writes_inflight(dd)) {
+       RPRINTF("queue_write: waiting for queue to drain");
+    return -EBUSY;
+  }
+
+  RPRINTF("queue_write: activating backup");
+  pass_start(dd);
+  tdremus_queue_write(dd, sector, nb_sectors, buf, cb, id, private);
+
+  return 0;
+}
+
+static int server_start(struct disk_driver* dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  int fd;
+
+  if (install_tdfd(dd, &s->rfd_idx, s->sfd) < 0)
+    return -1;
+
+  if (ramdisk_start(dd) < 0)
+    return -1;
+  
+  RPRINTF("server listening in slot %d\n", s->rfd_idx);
+
+  s->queue_read = server_queue_read;
+  s->queue_write = server_queue_write;
+
+  return 0;
+}
+
+/* writes should be appended to a journal */
+static int server_do_wreq(struct disk_driver* dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  char buf[4096];
+  uint64_t sector;
+  int nb_sectors, len, rc;
+
+  /*
+  RPRINTF("received write request\n");
+  */
+
+  if (mread(s->rfd, buf, 12) < 0)
+    goto err_read;
+
+  nb_sectors = *(uint32_t*)buf;
+  memcpy(&sector, buf + 4, sizeof(sector));
+  len = nb_sectors * dd->td_state->sector_size;
+
+  /*
+  RPRINTF("writing %d sectors (%d bytes) starting at %llu\n", nb_sectors, len,
+         sector);
+  */
+
+  if (len > sizeof(buf)) {
+    /* freak out! */
+    RPRINTF("write request too large: %d/%u\n", len, (unsigned)sizeof(buf));
+    return -1;
+  }
+
+  if (mread(s->rfd, buf, len) < 0)
+    goto err_read;
+
+  ramdisk_write(&s->ramdisk, sector, nb_sectors, buf);
+
+  return 0;
+
+  err_read:
+  /* should start failover */
+  close_rfd(dd);
+
+  return -1;
+}
+
+static int server_do_sreq(struct disk_driver* dd)
+{
+  /*
+  RPRINTF("submit request received\n");
+  */
+  
+  return 0;
+}
+
+/* at this point, the server can start applying the most recent
+ * ramdisk. */
+static int server_do_creq(struct disk_driver* dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+
+  /*
+  RPRINTF("committing buffer\n");
+  */
+
+  ramdisk_start_flush(dd);
+
+  /* profit! */
+  if (write(s->rfd, "done", 4) != 4)
+    return -1;
+
+  return 0;
+}
+
+/* called when data is pending in s->rfd */
+static int server_do_replicate(struct disk_driver* dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  char req[5];
+  int fd;
+
+  if (dd->io_fd[s->rfd_idx] == s->sfd) {
+    /* connection not yet established. Bring it up. */
+    if ((fd = remus_accept(s)) < 0)
+      return -1;
+
+    s->rfd = fd;
+    dd->io_fd[s->rfd_idx] = s->rfd;
+
+    if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
+      RPRINTF("error making socket non-blocking\n");
+      close_rfd(dd);
+      dd->io_fd[s->rfd_idx] = s->sfd;
+      return -1;
+    }
+
+    RPRINTF("replication channel in slot %d\n", s->rfd_idx);
+
+    return 0;
+  }
+
+  /*
+  RPRINTF("replication data waiting\n");
+  */
+
+  if (mread(s->rfd, req, 4) < 0) {
+    close_rfd(dd);
+    /* TODO: initiate failover recovery */
+    return -1;
+  }
+  req[5] = '\0';
+
+  /*
+  RPRINTF("received request: %s\n", req);
+  */
+
+  if (!strncmp(req, "wreq", 4))
+    return server_do_wreq(dd);
+  else if (!strncmp(req, "sreq", 4))
+    return server_do_sreq(dd);
+  else if (!strncmp(req, "creq", 4))
+    return server_do_creq(dd);
+  else
+    RPRINTF("unknown request received: %s\n", req);
+
+  return 0;
+}
+
+/* control */
+
+static inline int resolve_address(const char* addr, struct in_addr* ia)
+{
+  struct hostent* he;
+  uint32_t ip;
+
+  if (!(he = gethostbyname(addr))) {
+    RPRINTF("error resolving %s: %d\n", addr, h_errno);
+    return -1;
+  }
+
+  if (!he->h_addr_list[0]) {
+    RPRINTF("no address found for %s\n", addr);
+    return -1;
+  }
+
+  /* network byte order */
+  ip = *((uint32_t**)he->h_addr_list)[0];
+  ia->s_addr = ip;
+
+  return 0;
+}
+
+static int get_driver(struct disk_driver* dd, const char* name)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  char* host;
+  char* ports;
+  char* driver;
+  char* parent;
+  unsigned long ulport;
+  disk_info_t* disk_info = NULL;
+  int nr_drivers = sizeof(dtypes) / sizeof(disk_info_t*);
+  int i;
+
+  ports = strchr(name, ':');
+  if (!ports) {
+    RPRINTF("missing host in %s\n", name);
+    return -ENOENT;
+  }
+  if (!(host = strndup(name, ports - name))) {
+    RPRINTF("unable to allocate host\n");
+    return -ENOMEM;
+  }
+  ports++;
+  if (resolve_address(host, &s->addr) < 0) {
+    RPRINTF("unable to resolve host: %s\n", host);
+    free(host);
+    return -ENOENT;
+  }
+  free(host);
+
+  if (!(ulport = strtoul(ports, &driver, 10))) {
+    RPRINTF("missing port in %s\n", name);
+    return -ENOENT;
+  }
+  if (ulport > 65535) {
+    RPRINTF("port out of range: %lu\n", ulport);
+    return -ENOENT;
+  }
+  s->port = (unsigned short)ulport;
+  if (driver[0] != ':') {
+    RPRINTF("missing driver in %s\n", name);
+    return -ENOENT;
+  }
+  driver++;
+
+  parent = strchr(driver, ':');
+  if (!parent) {
+    RPRINTF("missing parent for %s\n", name);
+    return -ENOENT;
+  }
+  parent++;
+  s->path = strdup(parent);
+
+  RPRINTF("host: %s, port: %d\n", inet_ntoa(s->addr), s->port);
+  for (i = 0; i < nr_drivers; i ++)
+    if (!strncmp(driver, dtypes[i]->handle, strlen(dtypes[i]->handle)))
+      disk_info = dtypes[i];
+
+  if (disk_info) {
+    RPRINTF("found driver %s for %s\n", disk_info->handle, parent);
+    s->driver = disk_info->drv;
+
+    return 0;
+  }
+
+  RPRINTF("no driver found for %s\n", name);
+
+  err_driver:
+  free(s->path);
+  s->path = NULL;
+  return -ENOENT;
+}
+
+static int switch_mode(struct disk_driver* dd, enum tdremus_mode mode)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  int rc;
+
+  if (mode == s->mode)
+    return 0;
+
+  if (s->queue_flush)
+    if ((rc = s->queue_flush(dd)) < 0)
+      return rc;
+
+  if (mode == mode_pass)
+    rc = pass_start(dd);
+  if (mode == mode_client)
+    rc = client_start(dd);
+  else if (mode == mode_server)
+    rc = server_start(dd);
+  else {
+    RPRINTF("unknown mode requested: %d\n", mode);
+    rc = -1;
+  }
+
+  if (!rc)
+    s->mode = mode;
+
+  return rc;
+}
+
+static int do_ctl(struct disk_driver* dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  int ctlfd = dd->io_fd[s->ctl_fd_idx];
+  char msg[80];
+  int rc;
+
+  if (!(rc = read(ctlfd, msg, sizeof(msg) - 1 /* append nul */))) {
+    RPRINTF("0-byte read received, reopening FIFO\n");
+    close(ctlfd);
+    if ((ctlfd = open(s->ctlfifo, O_RDWR)) < 0) {
+      RPRINTF("error reopening FIFO: %d\n", errno);
+      return -1;
+    }
+    dd->io_fd[s->ctl_fd_idx] = ctlfd;
+    return 0;
+  }
+
+  if (rc < 0) {
+    RPRINTF("error reading from FIFO: %d\n", errno);
+    return -1;
+  }
+
+  msg[rc] = '\0';
+  if (!strncmp(msg, "flush", 5)) {
+    if (s->queue_flush)
+      return s->queue_flush(dd);
+    else
+      return 0;
+  } else {
+    RPRINTF("unknown command: %s\n", msg);
+  }
+
+  return 0;
+}
+
+/* must be called after the underlying driver has been initialized */
+static int add_ctl_hook(struct disk_driver* dd, const char* name)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  int i, l;
+
+  for (s->ctl_fd_idx = 0; s->ctl_fd_idx < MAX_IOFD; s->ctl_fd_idx++) {
+    if (!dd->io_fd[s->ctl_fd_idx])
+      break;
+  }
+  if (s->ctl_fd_idx == MAX_IOFD) {
+    RPRINTF("no free FD for control channel\n");
+    return -1;
+  }
+
+  /* device name -> FIFO */
+  if (asprintf(&s->ctlfifo, BLKTAP_CTRL_DIR "/remus_%s", name) < 0)
+    return -1;
+  for (i = strlen(BLKTAP_CTRL_DIR) + 1, l = strlen(s->ctlfifo); i < l; i++) {
+    if (strchr(":/", s->ctlfifo[i]))
+      s->ctlfifo[i] = '_';
+  }
+  if (asprintf(&s->msgfifo, "%s.msg", s->ctlfifo) < 0)
+    goto err_ctlfifo;
+
+  if (mkfifo(s->ctlfifo, S_IRWXU|S_IRWXG|S_IRWXO) && errno != EEXIST) {
+    RPRINTF("error creating control FIFO %s: %d\n", s->ctlfifo, errno);
+    goto err_msgfifo;
+  }
+
+  if (mkfifo(s->msgfifo, S_IRWXU|S_IRWXG|S_IRWXO) && errno != EEXIST) {
+    RPRINTF("error creating message FIFO %s: %d\n", s->msgfifo, errno);
+    goto err_msgfifo;
+  }
+
+  /* RDWR so that fd doesn't block select when no writer is present */
+  if ((dd->io_fd[s->ctl_fd_idx] = open(s->ctlfifo, O_RDWR)) < 0) {
+    RPRINTF("error opening control FIFO %s: %d\n", s->ctlfifo, errno);
+    goto err_msgfifo;
+  }
+
+  if ((s->msgfd = open(s->msgfifo, O_RDWR)) < 0) {
+    RPRINTF("error opening message FIFO %s: %d\n", s->msgfifo, errno);
+    goto err_openctlfifo;
+  }
+
+  RPRINTF("control FIFO %s\n", s->ctlfifo);
+  RPRINTF("message FIFO %s\n", s->msgfifo);
+
+  return 0;
+
+  err_openctlfifo:
+  close(dd->io_fd[s->ctl_fd_idx]);
+  dd->io_fd[s->ctl_fd_idx] = 0;
+  err_msgfifo:
+  free(s->msgfifo);
+  s->msgfifo = NULL;
+  err_ctlfifo:
+  free(s->ctlfifo);
+  s->ctlfifo = NULL;
+  s->ctl_fd_idx = -1;
+  return -1;
+}
+
+static void del_ctl_hook(struct disk_driver* dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+
+  if (dd->io_fd[s->ctl_fd_idx]) {
+    close(dd->io_fd[s->ctl_fd_idx]);
+    dd->io_fd[s->ctl_fd_idx] = 0;
+  }
+  if (s->ctlfifo) {
+    unlink(s->ctlfifo);
+    free(s->ctlfifo);
+    s->ctlfifo = NULL;
+  }
+}
+
+/* interface */
+
+static int tdremus_open (struct disk_driver *dd, const char *name,
+                         td_flag_t flags)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  int rc;
+
+  RPRINTF("opening %s\n", name);
+
+  memset(s, 0, sizeof(*s));
+  s->sfd = -1;
+  s->rfd = -1;
+  s->ctl_fd_idx = -1;
+  s->rfd_idx = -1;
+
+  if ((rc = get_driver(dd, name)))
+    return rc;
+
+  if (!(s->driver_data = malloc(s->driver->private_data_size))) {
+    RPRINTF("could not allocate driver data\n");
+    return -ENOMEM;
+  }
+
+  dd->private = s->driver_data;
+  if ((rc = s->driver->td_open(dd, s->path, flags))) {
+    RPRINTF("could not open driver\n");
+    dd->private = s;
+    free(s->driver_data);
+    return rc;
+  }
+  dd->private = s;
+
+  if ((rc = add_ctl_hook(dd, name))) {
+    RPRINTF("error setting up control channel\n");
+    free(s->driver_data);
+    return rc;
+  }
+
+  if ((rc = remus_bind(s))) {
+    if (rc == -2) {
+      rc = switch_mode(dd, mode_client);
+    } else
+      goto err_path;
+  } else
+    rc = switch_mode(dd, mode_server);
+
+  if (rc) {
+    tdremus_close(dd);
+    return -EIO;
+  }
+
+  return 0;
+
+  err_path:
+  free(s->path);
+  s->path = NULL;
+  return rc;
+}
+
+static int tdremus_queue_read(struct disk_driver *dd, uint64_t sector,
+                              int nb_sectors, char *buf, td_callback_t cb,
+                              int id, void *private)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  int rc;
+
+  if (s->queue_read)
+    return s->queue_read(dd, sector, nb_sectors, buf, cb, id, private);
+
+  dd->private = s->driver_data;
+  rc = s->driver->td_queue_read(dd, sector, nb_sectors, buf, cb, id, private);
+  dd->private = s;
+
+  return rc;
+}
+
+static int tdremus_queue_write(struct disk_driver *dd, uint64_t sector,
+                               int nb_sectors, char *buf, td_callback_t cb,
+                               int id, void *private)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  int rc;
+
+  if (s->queue_write)
+    return s->queue_write(dd, sector, nb_sectors, buf, cb, id, private);
+
+  dd->private = s->driver_data;
+  rc = s->driver->td_queue_write(dd, sector, nb_sectors, buf, cb, id,
+                                private);
+  dd->private = s;
+
+  return rc;
+}
+
+static int tdremus_submit(struct disk_driver *dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  int rc;
+
+  if (s->submit)
+    return s->submit(dd);
+
+  dd->private = s->driver_data;
+  rc = s->driver->td_submit(dd);
+  dd->private = s;
+
+  return rc;
+}
+
+static int tdremus_close(struct disk_driver *dd)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  int rc;
+
+  RPRINTF("closing\n");
+
+  dd->private = s->driver_data;
+  rc = s->driver->td_close(dd);
+  dd->private = s;
+
+  if (s->driver_data) {
+    free(s->driver_data);
+    s->driver_data = NULL;
+  }
+  if (s->sfd >= 0) {
+    close(s->sfd);
+    s->sfd = -1;
+  }
+  if (s->rfd >= 0) {
+    close_rfd(dd);
+  }
+  if (s->path) {
+    free(s->path);
+    s->path = NULL;
+  }
+
+  del_ctl_hook(dd);
+
+  return rc;
+}
+
+static int tdremus_do_callbacks(struct disk_driver *dd, int sid)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  int rc;
+
+  if (sid == s->ctl_fd_idx)
+    return do_ctl(dd);
+  if (sid == s->rfd_idx) {
+    if (s->mode == mode_client)
+      return client_do_replicate(dd);
+    else
+      return server_do_replicate(dd);
+  }
+
+  dd->private = s->driver_data;
+  rc = s->driver->td_do_callbacks(dd, sid);
+  dd->private = s;
+
+  return rc;
+}
+
+static int tdremus_get_parent_id(struct disk_driver *dd, struct disk_id *id)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  int rc;
+
+  dd->private = s->driver_data;
+  rc = s->driver->td_get_parent_id(dd, id);
+  dd->private = s;
+
+  return rc;
+}
+
+static int tdremus_validate_parent(struct disk_driver *dd, 
+                            struct disk_driver *parent, td_flag_t flags)
+{
+  struct tdremus_state *s = (struct tdremus_state *)dd->private;
+  int rc;
+
+  dd->private = s->driver_data;
+  rc = s->driver->td_validate_parent(dd, parent, flags);
+  dd->private = s;
+
+  return rc;
+}
+
+struct tap_disk tapdisk_remus = {
+  .disk_type          = "tapdisk_remus",
+  .private_data_size  = sizeof(struct tdremus_state),
+  .td_open            = tdremus_open,
+  .td_queue_read      = tdremus_queue_read,
+  .td_queue_write     = tdremus_queue_write,
+  .td_submit          = tdremus_submit,
+  .td_close           = tdremus_close,
+  .td_do_callbacks    = tdremus_do_callbacks,
+  .td_get_parent_id   = tdremus_get_parent_id,
+  .td_validate_parent = tdremus_validate_parent
+};
diff --git a/tools/blktap/drivers/hashtable.c b/tools/blktap/drivers/hashtable.c
new file mode 100644
--- /dev/null
+++ b/tools/blktap/drivers/hashtable.c
@@ -0,0 +1,274 @@
+/* Copyright (C) 2004 Christopher Clark <firstname.lastname@xxxxxxxxxxxx> */
+
+#include "hashtable.h"
+#include "hashtable_private.h"
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <math.h>
+
+/*
+Credit for primes table: Aaron Krowne
+ http://br.endernet.org/~akrowne/
+ http://planetmath.org/encyclopedia/GoodHashTablePrimes.html
+*/
+static const unsigned int primes[] = {
+53, 97, 193, 389,
+769, 1543, 3079, 6151,
+12289, 24593, 49157, 98317,
+196613, 393241, 786433, 1572869,
+3145739, 6291469, 12582917, 25165843,
+50331653, 100663319, 201326611, 402653189,
+805306457, 1610612741
+};
+const unsigned int prime_table_length = sizeof(primes)/sizeof(primes[0]);
+const float max_load_factor = 0.65;
+
+/*****************************************************************************/
+struct hashtable *
+create_hashtable(unsigned int minsize,
+                 unsigned int (*hashf) (void*),
+                 int (*eqf) (void*,void*))
+{
+    struct hashtable *h;
+    unsigned int pindex, size = primes[0];
+    /* Check requested hashtable isn't too large */
+    if (minsize > (1u << 30)) return NULL;
+    /* Enforce size as prime */
+    for (pindex=0; pindex < prime_table_length; pindex++) {
+        if (primes[pindex] > minsize) { size = primes[pindex]; break; }
+    }
+    h = (struct hashtable *)malloc(sizeof(struct hashtable));
+    if (NULL == h) return NULL; /*oom*/
+    h->table = (struct entry **)malloc(sizeof(struct entry*) * size);
+    if (NULL == h->table) { free(h); return NULL; } /*oom*/
+    memset(h->table, 0, size * sizeof(struct entry *));
+    h->tablelength  = size;
+    h->primeindex   = pindex;
+    h->entrycount   = 0;
+    h->hashfn       = hashf;
+    h->eqfn         = eqf;
+    h->loadlimit    = (unsigned int) ceil(size * max_load_factor);
+    return h;
+}
+
+/*****************************************************************************/
+unsigned int
+hash(struct hashtable *h, void *k)
+{
+    /* Aim to protect against poor hash functions by adding logic here
+     * - logic taken from java 1.4 hashtable source */
+    unsigned int i = h->hashfn(k);
+    i += ~(i << 9);
+    i ^=  ((i >> 14) | (i << 18)); /* >>> */
+    i +=  (i << 4);
+    i ^=  ((i >> 10) | (i << 22)); /* >>> */
+    return i;
+}
+
+/*****************************************************************************/
+static int
+hashtable_expand(struct hashtable *h)
+{
+    /* Double the size of the table to accomodate more entries */
+    struct entry **newtable;
+    struct entry *e;
+    struct entry **pE;
+    unsigned int newsize, i, index;
+    /* Check we're not hitting max capacity */
+    if (h->primeindex == (prime_table_length - 1)) return 0;
+    newsize = primes[++(h->primeindex)];
+
+    newtable = (struct entry **)malloc(sizeof(struct entry*) * newsize);
+    if (NULL != newtable)
+    {
+        memset(newtable, 0, newsize * sizeof(struct entry *));
+        /* This algorithm is not 'stable'. ie. it reverses the list
+         * when it transfers entries between the tables */
+        for (i = 0; i < h->tablelength; i++) {
+            while (NULL != (e = h->table[i])) {
+                h->table[i] = e->next;
+                index = indexFor(newsize,e->h);
+                e->next = newtable[index];
+                newtable[index] = e;
+            }
+        }
+        free(h->table);
+        h->table = newtable;
+    }
+    /* Plan B: realloc instead */
+    else 
+    {
+        newtable = (struct entry **)
+                   realloc(h->table, newsize * sizeof(struct entry *));
+        if (NULL == newtable) { (h->primeindex)--; return 0; }
+        h->table = newtable;
+        memset(newtable[h->tablelength], 0, newsize - h->tablelength);
+        for (i = 0; i < h->tablelength; i++) {
+            for (pE = &(newtable[i]), e = *pE; e != NULL; e = *pE) {
+                index = indexFor(newsize,e->h);
+                if (index == i)
+                {
+                    pE = &(e->next);
+                }
+                else
+                {
+                    *pE = e->next;
+                    e->next = newtable[index];
+                    newtable[index] = e;
+                }
+            }
+        }
+    }
+    h->tablelength = newsize;
+    h->loadlimit   = (unsigned int) ceil(newsize * max_load_factor);
+    return -1;
+}
+
+/*****************************************************************************/
+unsigned int
+hashtable_count(struct hashtable *h)
+{
+    return h->entrycount;
+}
+
+/*****************************************************************************/
+int
+hashtable_insert(struct hashtable *h, void *k, void *v)
+{
+    /* This method allows duplicate keys - but they shouldn't be used */
+    unsigned int index;
+    struct entry *e;
+    if (++(h->entrycount) > h->loadlimit)
+    {
+        /* Ignore the return value. If expand fails, we should
+         * still try cramming just this value into the existing table
+         * -- we may not have memory for a larger table, but one more
+         * element may be ok. Next time we insert, we'll try expanding again.*/
+        hashtable_expand(h);
+    }
+    e = (struct entry *)malloc(sizeof(struct entry));
+    if (NULL == e) { --(h->entrycount); return 0; } /*oom*/
+    e->h = hash(h,k);
+    index = indexFor(h->tablelength,e->h);
+    e->k = k;
+    e->v = v;
+    e->next = h->table[index];
+    h->table[index] = e;
+    return -1;
+}
+
+/*****************************************************************************/
+void * /* returns value associated with key */
+hashtable_search(struct hashtable *h, void *k)
+{
+    struct entry *e;
+    unsigned int hashvalue, index;
+    hashvalue = hash(h,k);
+    index = indexFor(h->tablelength,hashvalue);
+    e = h->table[index];
+    while (NULL != e)
+    {
+        /* Check hash value to short circuit heavier comparison */
+        if ((hashvalue == e->h) && (h->eqfn(k, e->k))) return e->v;
+        e = e->next;
+    }
+    return NULL;
+}
+
+/*****************************************************************************/
+void * /* returns value associated with key */
+hashtable_remove(struct hashtable *h, void *k)
+{
+    /* TODO: consider compacting the table when the load factor drops enough,
+     *       or provide a 'compact' method. */
+
+    struct entry *e;
+    struct entry **pE;
+    void *v;
+    unsigned int hashvalue, index;
+
+    hashvalue = hash(h,k);
+    index = indexFor(h->tablelength,hash(h,k));
+    pE = &(h->table[index]);
+    e = *pE;
+    while (NULL != e)
+    {
+        /* Check hash value to short circuit heavier comparison */
+        if ((hashvalue == e->h) && (h->eqfn(k, e->k)))
+        {
+            *pE = e->next;
+            h->entrycount--;
+            v = e->v;
+            freekey(e->k);
+            free(e);
+            return v;
+        }
+        pE = &(e->next);
+        e = e->next;
+    }
+    return NULL;
+}
+
+/*****************************************************************************/
+/* destroy */
+void
+hashtable_destroy(struct hashtable *h, int free_values)
+{
+    unsigned int i;
+    struct entry *e, *f;
+    struct entry **table = h->table;
+    if (free_values)
+    {
+        for (i = 0; i < h->tablelength; i++)
+        {
+            e = table[i];
+            while (NULL != e)
+            { f = e; e = e->next; freekey(f->k); free(f->v); free(f); }
+        }
+    }
+    else
+    {
+        for (i = 0; i < h->tablelength; i++)
+        {
+            e = table[i];
+            while (NULL != e)
+            { f = e; e = e->next; freekey(f->k); free(f); }
+        }
+    }
+    free(h->table);
+    free(h);
+}
+
+/*
+ * Copyright (c) 2002, Christopher Clark
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
diff --git a/tools/blktap/drivers/hashtable_itr.c 
b/tools/blktap/drivers/hashtable_itr.c
new file mode 100644
--- /dev/null
+++ b/tools/blktap/drivers/hashtable_itr.c
@@ -0,0 +1,188 @@
+/* Copyright (C) 2002, 2004 Christopher Clark  
<firstname.lastname@xxxxxxxxxxxx> */
+
+#include "hashtable.h"
+#include "hashtable_private.h"
+#include "hashtable_itr.h"
+#include <stdlib.h> /* defines NULL */
+
+/*****************************************************************************/
+/* hashtable_iterator    - iterator constructor */
+
+struct hashtable_itr *
+hashtable_iterator(struct hashtable *h)
+{
+    unsigned int i, tablelength;
+    struct hashtable_itr *itr = (struct hashtable_itr *)
+        malloc(sizeof(struct hashtable_itr));
+    if (NULL == itr) return NULL;
+    itr->h = h;
+    itr->e = NULL;
+    itr->parent = NULL;
+    tablelength = h->tablelength;
+    itr->index = tablelength;
+    if (0 == h->entrycount) return itr;
+
+    for (i = 0; i < tablelength; i++)
+    {
+        if (NULL != h->table[i])
+        {
+            itr->e = h->table[i];
+            itr->index = i;
+            break;
+        }
+    }
+    return itr;
+}
+
+/*****************************************************************************/
+/* key      - return the key of the (key,value) pair at the current position */
+/* value    - return the value of the (key,value) pair at the current position 
*/
+
+void *
+hashtable_iterator_key(struct hashtable_itr *i)
+{ return i->e->k; }
+
+void *
+hashtable_iterator_value(struct hashtable_itr *i)
+{ return i->e->v; }
+
+/*****************************************************************************/
+/* advance - advance the iterator to the next element
+ *           returns zero if advanced to end of table */
+
+int
+hashtable_iterator_advance(struct hashtable_itr *itr)
+{
+    unsigned int j,tablelength;
+    struct entry **table;
+    struct entry *next;
+    if (NULL == itr->e) return 0; /* stupidity check */
+
+    next = itr->e->next;
+    if (NULL != next)
+    {
+        itr->parent = itr->e;
+        itr->e = next;
+        return -1;
+    }
+    tablelength = itr->h->tablelength;
+    itr->parent = NULL;
+    if (tablelength <= (j = ++(itr->index)))
+    {
+        itr->e = NULL;
+        return 0;
+    }
+    table = itr->h->table;
+    while (NULL == (next = table[j]))
+    {
+        if (++j >= tablelength)
+        {
+            itr->index = tablelength;
+            itr->e = NULL;
+            return 0;
+        }
+    }
+    itr->index = j;
+    itr->e = next;
+    return -1;
+}
+
+/*****************************************************************************/
+/* remove - remove the entry at the current iterator position
+ *          and advance the iterator, if there is a successive
+ *          element.
+ *          If you want the value, read it before you remove:
+ *          beware memory leaks if you don't.
+ *          Returns zero if end of iteration. */
+
+int
+hashtable_iterator_remove(struct hashtable_itr *itr)
+{
+    struct entry *remember_e, *remember_parent;
+    int ret;
+
+    /* Do the removal */
+    if (NULL == (itr->parent))
+    {
+        /* element is head of a chain */
+        itr->h->table[itr->index] = itr->e->next;
+    } else {
+        /* element is mid-chain */
+        itr->parent->next = itr->e->next;
+    }
+    /* itr->e is now outside the hashtable */
+    remember_e = itr->e;
+    itr->h->entrycount--;
+    freekey(remember_e->k);
+
+    /* Advance the iterator, correcting the parent */
+    remember_parent = itr->parent;
+    ret = hashtable_iterator_advance(itr);
+    if (itr->parent == remember_e) { itr->parent = remember_parent; }
+    free(remember_e);
+    return ret;
+}
+
+/*****************************************************************************/
+int /* returns zero if not found */
+hashtable_iterator_search(struct hashtable_itr *itr,
+                          struct hashtable *h, void *k)
+{
+    struct entry *e, *parent;
+    unsigned int hashvalue, index;
+
+    hashvalue = hash(h,k);
+    index = indexFor(h->tablelength,hashvalue);
+
+    e = h->table[index];
+    parent = NULL;
+    while (NULL != e)
+    {
+        /* Check hash value to short circuit heavier comparison */
+        if ((hashvalue == e->h) && (h->eqfn(k, e->k)))
+        {
+            itr->index = index;
+            itr->e = e;
+            itr->parent = parent;
+            itr->h = h;
+            return -1;
+        }
+        parent = e;
+        e = e->next;
+    }
+    return 0;
+}
+
+
+/*
+ * Copyright (c) 2002, 2004, Christopher Clark
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
diff --git a/tools/blktap/drivers/hashtable_itr.h 
b/tools/blktap/drivers/hashtable_itr.h
new file mode 100644
--- /dev/null
+++ b/tools/blktap/drivers/hashtable_itr.h
@@ -0,0 +1,112 @@
+/* Copyright (C) 2002, 2004 Christopher Clark 
<firstname.lastname@xxxxxxxxxxxx> */
+
+#ifndef __HASHTABLE_ITR_CWC22__
+#define __HASHTABLE_ITR_CWC22__
+#include "hashtable.h"
+#include "hashtable_private.h" /* needed to enable inlining */
+
+/*****************************************************************************/
+/* This struct is only concrete here to allow the inlining of two of the
+ * accessor functions. */
+struct hashtable_itr
+{
+    struct hashtable *h;
+    struct entry *e;
+    struct entry *parent;
+    unsigned int index;
+};
+
+
+/*****************************************************************************/
+/* hashtable_iterator
+ */
+
+struct hashtable_itr *
+hashtable_iterator(struct hashtable *h);
+
+/*****************************************************************************/
+/* hashtable_iterator_key
+ * - return the value of the (key,value) pair at the current position */
+
+extern inline void *
+hashtable_iterator_key(struct hashtable_itr *i)
+{
+    return i->e->k;
+}
+
+/*****************************************************************************/
+/* value - return the value of the (key,value) pair at the current position */
+
+extern inline void *
+hashtable_iterator_value(struct hashtable_itr *i)
+{
+    return i->e->v;
+}
+
+/*****************************************************************************/
+/* advance - advance the iterator to the next element
+ *           returns zero if advanced to end of table */
+
+int
+hashtable_iterator_advance(struct hashtable_itr *itr);
+
+/*****************************************************************************/
+/* remove - remove current element and advance the iterator to the next element
+ *          NB: if you need the value to free it, read it before
+ *          removing. ie: beware memory leaks!
+ *          returns zero if advanced to end of table */
+
+int
+hashtable_iterator_remove(struct hashtable_itr *itr);
+
+/*****************************************************************************/
+/* search - overwrite the supplied iterator, to point to the entry
+ *          matching the supplied key.
+            h points to the hashtable to be searched.
+ *          returns zero if not found. */
+int
+hashtable_iterator_search(struct hashtable_itr *itr,
+                          struct hashtable *h, void *k);
+
+#define DEFINE_HASHTABLE_ITERATOR_SEARCH(fnname, keytype) \
+int fnname (struct hashtable_itr *i, struct hashtable *h, keytype *k) \
+{ \
+    return (hashtable_iterator_search(i,h,k)); \
+}
+
+
+
+#endif /* __HASHTABLE_ITR_CWC22__*/
+
+/*
+ * Copyright (c) 2002, 2004, Christopher Clark
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
diff --git a/tools/blktap/drivers/hashtable_utility.c 
b/tools/blktap/drivers/hashtable_utility.c
new file mode 100644
--- /dev/null
+++ b/tools/blktap/drivers/hashtable_utility.c
@@ -0,0 +1,71 @@
+/* Copyright (C) 2002 Christopher Clark <firstname.lastname@xxxxxxxxxxxx> */
+
+#include "hashtable.h"
+#include "hashtable_private.h"
+#include "hashtable_utility.h"
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+/*****************************************************************************/
+/* hashtable_change
+ *
+ * function to change the value associated with a key, where there already
+ * exists a value bound to the key in the hashtable.
+ * Source due to Holger Schemel.
+ * 
+ *  */
+int
+hashtable_change(struct hashtable *h, void *k, void *v)
+{
+    struct entry *e;
+    unsigned int hashvalue, index;
+    hashvalue = hash(h,k);
+    index = indexFor(h->tablelength,hashvalue);
+    e = h->table[index];
+    while (NULL != e)
+    {
+        /* Check hash value to short circuit heavier comparison */
+        if ((hashvalue == e->h) && (h->eqfn(k, e->k)))
+        {
+            free(e->v);
+            e->v = v;
+            return -1;
+        }
+        e = e->next;
+    }
+    return 0;
+}
+
+/*
+ * Copyright (c) 2002, Christopher Clark
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
diff --git a/tools/blktap/drivers/hashtable_utility.h 
b/tools/blktap/drivers/hashtable_utility.h
new file mode 100644
--- /dev/null
+++ b/tools/blktap/drivers/hashtable_utility.h
@@ -0,0 +1,55 @@
+/* Copyright (C) 2002 Christopher Clark <firstname.lastname@xxxxxxxxxxxx> */
+
+#ifndef __HASHTABLE_CWC22_UTILITY_H__
+#define __HASHTABLE_CWC22_UTILITY_H__
+
+/*****************************************************************************
+ * hashtable_change
+ *
+ * function to change the value associated with a key, where there already
+ * exists a value bound to the key in the hashtable.
+ * Source due to Holger Schemel.
+ *
+ * @name        hashtable_change
+ * @param   h   the hashtable
+ * @param       key
+ * @param       value
+ *
+ */
+int
+hashtable_change(struct hashtable *h, void *k, void *v);
+
+#endif /* __HASHTABLE_CWC22_H__ */
+
+/*
+ * Copyright (c) 2002, Christopher Clark
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
diff --git a/tools/blktap/drivers/tapdisk.h b/tools/blktap/drivers/tapdisk.h
--- a/tools/blktap/drivers/tapdisk.h
+++ b/tools/blktap/drivers/tapdisk.h
@@ -160,6 +160,7 @@
 extern struct tap_disk tapdisk_ram;
 extern struct tap_disk tapdisk_qcow;
 extern struct tap_disk tapdisk_qcow2;
+extern struct tap_disk tapdisk_remus;
 
 
 /*Define Individual Disk Parameters here */
@@ -229,6 +230,17 @@
 #endif
 };
 
+static disk_info_t remus_disk = {
+  DISK_TYPE_REMUS,
+  "replicated disk (remus)",
+  "remus",
+  0,
+  0,
+#ifdef TAPDISK
+  &tapdisk_remus,
+#endif
+};
+
 /*Main disk info array */
 static disk_info_t *dtypes[] = {
        &aio_disk,
@@ -237,6 +249,7 @@
        &ram_disk,
        &qcow_disk,
        &qcow2_disk,
+       &remus_disk
 };
 
 typedef struct driver_list_entry {
diff --git a/tools/blktap/lib/blktaplib.h b/tools/blktap/lib/blktaplib.h
--- a/tools/blktap/lib/blktaplib.h
+++ b/tools/blktap/lib/blktaplib.h
@@ -219,6 +219,7 @@
 #define DISK_TYPE_RAM      3
 #define DISK_TYPE_QCOW     4
 #define DISK_TYPE_QCOW2    5
+#define DISK_TYPE_REMUS    6
 
 /* xenstore/xenbus: */
 #define DOMNAME "Domain-0"
diff --git a/tools/python/xen/xend/server/BlktapController.py 
b/tools/python/xen/xend/server/BlktapController.py
--- a/tools/python/xen/xend/server/BlktapController.py
+++ b/tools/python/xen/xend/server/BlktapController.py
@@ -14,6 +14,7 @@
     'ram',
     'qcow',
     'qcow2',
+    'remus',
 
     'ioemu',
     'tapdisk',

_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-devel