[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [RFC Patch v3 16/22] blktap2: move ramdisk related codes to block-replication.c



 COLO will reuse them

Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
Cc: Shriram Rajagopalan <rshriram@xxxxxxxxx>
---
 tools/blktap2/drivers/block-remus.c       | 485 ++----------------------------
 tools/blktap2/drivers/block-replication.c | 452 ++++++++++++++++++++++++++++
 tools/blktap2/drivers/block-replication.h |  48 +++
 3 files changed, 523 insertions(+), 462 deletions(-)

diff --git a/tools/blktap2/drivers/block-remus.c 
b/tools/blktap2/drivers/block-remus.c
index 8b6f157..2713af1 100644
--- a/tools/blktap2/drivers/block-remus.c
+++ b/tools/blktap2/drivers/block-remus.c
@@ -37,9 +37,6 @@
 #include "tapdisk-server.h"
 #include "tapdisk-driver.h"
 #include "tapdisk-interface.h"
-#include "hashtable.h"
-#include "hashtable_itr.h"
-#include "hashtable_utility.h"
 #include "block-replication.h"
 
 #include <errno.h>
@@ -58,7 +55,6 @@
 
 /* timeout for reads and writes in ms */
 #define HEARTBEAT_MS 1000
-#define RAMDISK_HASHSIZE 128
 
 /* connect retry timeout (seconds) */
 #define REMUS_CONNRETRY_TIMEOUT 1
@@ -97,51 +93,6 @@ td_vbd_t *device_vbd = NULL;
 td_image_t *remus_image = NULL;
 struct tap_disk tapdisk_remus;
 
-struct ramdisk {
-       size_t sector_size;
-       struct hashtable* h;
-       /* when a ramdisk is flushed, h is given a new empty hash for writes
-        * while the old ramdisk (prev) is drained asynchronously.
-        */
-       struct hashtable* prev;
-       /* count of outstanding requests to the base driver */
-       size_t inflight;
-       /* prev holds the requests to be flushed, while inprogress holds
-        * requests being flushed. When requests complete, they are removed
-        * from inprogress.
-        * Whenever a new flush is merged with ongoing flush (i.e, prev),
-        * we have to make sure that none of the new requests overlap with
-        * ones in "inprogress". If it does, keep it back in prev and dont issue
-        * IO until the current one finishes. If we allow this IO to proceed,
-        * we might end up with two "overlapping" requests in the disk's queue 
and
-        * the disk may not offer any guarantee on which one is written first.
-        * IOW, make sure we dont create a write-after-write time ordering 
constraint.
-        * 
-        */
-       struct hashtable* inprogress;
-};
-
-/* the ramdisk intercepts the original callback for reads and writes.
- * This holds the original data. */
-/* Might be worth making this a static array in struct ramdisk to avoid
- * a malloc per request */
-
-struct tdremus_state;
-
-struct ramdisk_cbdata {
-       td_callback_t cb;
-       void* private;
-       char* buf;
-       struct tdremus_state* state;
-};
-
-struct ramdisk_write_cbdata {
-       struct tdremus_state* state;
-       char* buf;
-};
-
-typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
-
 typedef struct poll_fd {
        int        fd;
        event_id_t id;
@@ -167,8 +118,14 @@ struct tdremus_state {
         */
        struct req_ring queued_io;
 
-       /* ramdisk data*/
+       /* ramdisk data */
        struct ramdisk ramdisk;
+       /*
+        * The primary write request is queued in this
+        * hashtable, and will be flushed to ramdisk when
+        * the checkpoint finishes.
+        */
+       struct hashtable *h;
 
        /* mode methods */
        enum tdremus_mode mode;
@@ -239,404 +196,20 @@ static void ring_add_request(struct req_ring *ring, 
const td_request_t *treq)
        ring->prod = ring_next(ring->prod);
 }
 
-/* Prototype declarations */
-static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s);
-
-/* functions to create and sumbit treq's */
-
-static void
-replicated_write_callback(td_request_t treq, int err)
-{
-       struct tdremus_state *s = (struct tdremus_state *) treq.cb_data;
-       td_vbd_request_t *vreq;
-       int i;
-       uint64_t start;
-       vreq = (td_vbd_request_t *) treq.private;
-
-       /* the write failed for now, lets panic. this is very bad */
-       if (err) {
-               RPRINTF("ramdisk write failed, disk image is not consistent\n");
-               exit(-1);
-       }
-
-       /* The write succeeded. let's pull the vreq off whatever request list
-        * it is on and free() it */
-       list_del(&vreq->next);
-       free(vreq);
-
-       s->ramdisk.inflight--;
-       start = treq.sec;
-       for (i = 0; i < treq.secs; i++) {
-               hashtable_remove(s->ramdisk.inprogress, &start);
-               start++;
-       }
-       free(treq.buf);
-
-       if (!s->ramdisk.inflight && !s->ramdisk.prev) {
-               /* TODO: the ramdisk has been flushed */
-       }
-}
-
-static inline int
-create_write_request(struct tdremus_state *state, td_sector_t sec, int secs, 
char *buf)
-{
-       td_request_t treq;
-       td_vbd_request_t *vreq;
-
-       treq.op      = TD_OP_WRITE;
-       treq.buf     = buf;
-       treq.sec     = sec;
-       treq.secs    = secs;
-       treq.image   = remus_image;
-       treq.cb      = replicated_write_callback;
-       treq.cb_data = state;
-       treq.id      = 0;
-       treq.sidx    = 0;
-
-       vreq         = calloc(1, sizeof(td_vbd_request_t));
-       treq.private = vreq;
-
-       if(!vreq)
-               return -1;
-
-       vreq->submitting = 1;
-       INIT_LIST_HEAD(&vreq->next);
-       tapdisk_vbd_move_request(treq.private, &device_vbd->pending_requests);
-
-       /* TODO:
-        * we should probably leave it up to the caller to forward the request 
*/
-       td_forward_request(treq);
-
-       vreq->submitting--;
-
-       return 0;
-}
-
-
-/* http://www.concentric.net/~Ttwang/tech/inthash.htm */
-static unsigned int uint64_hash(void* k)
-{
-       uint64_t key = *(uint64_t*)k;
-
-       key = (~key) + (key << 18);
-       key = key ^ (key >> 31);
-       key = key * 21;
-       key = key ^ (key >> 11);
-       key = key + (key << 6);
-       key = key ^ (key >> 22);
-
-       return (unsigned int)key;
-}
-
-static int rd_hash_equal(void* k1, void* k2)
-{
-       uint64_t key1, key2;
-
-       key1 = *(uint64_t*)k1;
-       key2 = *(uint64_t*)k2;
-
-       return key1 == key2;
-}
-
-static int ramdisk_read(struct ramdisk* ramdisk, uint64_t sector,
-                       int nb_sectors, char* buf)
-{
-       int i;
-       char* v;
-       uint64_t key;
-
-       for (i = 0; i < nb_sectors; i++) {
-               key = sector + i;
-               /* check whether it is queued in a previous flush request */
-               if (!(ramdisk->prev && (v = hashtable_search(ramdisk->prev, 
&key)))) {
-                       /* check whether it is an ongoing flush */
-                       if (!(ramdisk->inprogress && (v = 
hashtable_search(ramdisk->inprogress, &key))))
-                               return -1;
-               }
-               memcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size);
-       }
-
-       return 0;
-}
-
-static int ramdisk_write_hash(struct hashtable* h, uint64_t sector, char* buf,
-                             size_t len)
-{
-       char* v;
-       uint64_t* key;
-
-       if ((v = hashtable_search(h, &sector))) {
-               memcpy(v, buf, len);
-               return 0;
-       }
-
-       if (!(v = malloc(len))) {
-               DPRINTF("ramdisk_write_hash: malloc failed\n");
-               return -1;
-       }
-       memcpy(v, buf, len);
-       if (!(key = malloc(sizeof(*key)))) {
-               DPRINTF("ramdisk_write_hash: error allocating key\n");
-               free(v);
-               return -1;
-       }
-       *key = sector;
-       if (!hashtable_insert(h, key, v)) {
-               DPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n", 
sector);
-               free(key);
-               free(v);
-               return -1;
-       }
-
-       return 0;
-}
-
-static inline int ramdisk_write(struct ramdisk* ramdisk, uint64_t sector,
-                               int nb_sectors, char* buf)
-{
-       int i, rc;
-
-       for (i = 0; i < nb_sectors; i++) {
-               rc = ramdisk_write_hash(ramdisk->h, sector + i,
-                                       buf + i * ramdisk->sector_size,
-                                       ramdisk->sector_size);
-               if (rc)
-                       return rc;
-       }
-
-       return 0;
-}
-
-static int uint64_compare(const void* k1, const void* k2)
-{
-       uint64_t u1 = *(uint64_t*)k1;
-       uint64_t u2 = *(uint64_t*)k2;
-
-       /* u1 - u2 is unsigned */
-       return u1 < u2 ? -1 : u1 > u2 ? 1 : 0;
-}
-
-/* set psectors to an array of the sector numbers in the hash, returning
- * the number of entries (or -1 on error) */
-static int ramdisk_get_sectors(struct hashtable* h, uint64_t** psectors)
-{
-       struct hashtable_itr* itr;
-       uint64_t* sectors;
-       int count;
-
-       if (!(count = hashtable_count(h)))
-               return 0;
-
-       if (!(*psectors = malloc(count * sizeof(uint64_t)))) {
-               DPRINTF("ramdisk_get_sectors: error allocating sector map\n");
-               return -1;
-       }
-       sectors = *psectors;
-
-       itr = hashtable_iterator(h);
-       count = 0;
-       do {
-               sectors[count++] = *(uint64_t*)hashtable_iterator_key(itr);
-       } while (hashtable_iterator_advance(itr));
-       free(itr);
-
-       return count;
-}
-
-/*
-  return -1 for OOM
-  return -2 for merge lookup failure
-  return -3 for WAW race
-  return 0 on success.
-*/
-static int merge_requests(struct ramdisk* ramdisk, uint64_t start,
-                       size_t count, char **mergedbuf)
-{
-       char* buf;
-       char* sector;
-       int i;
-       uint64_t *key;
-       int rc = 0;
-
-       if (!(buf = valloc(count * ramdisk->sector_size))) {
-               DPRINTF("merge_request: allocation failed\n");
-               return -1;
-       }
-
-       for (i = 0; i < count; i++) {
-               if (!(sector = hashtable_search(ramdisk->prev, &start))) {
-                       DPRINTF("merge_request: lookup failed on %"PRIu64"\n", 
start);
-                       free(buf);
-                       rc = -2;
-                       goto fail;
-               }
-
-               /* Check inprogress requests to avoid waw non-determinism */
-               if (hashtable_search(ramdisk->inprogress, &start)) {
-                       DPRINTF("merge_request: WAR RACE on %"PRIu64"\n", 
start);
-                       free(buf);
-                       rc = -3;
-                       goto fail;
-               }
-               /* Insert req into inprogress (brief period of duplication of 
hash entries until
-                * they are removed from prev. Read tracking would not be 
reading wrong entries)
-                */
-               if (!(key = malloc(sizeof(*key)))) {
-                       DPRINTF("%s: error allocating key\n", __FUNCTION__);
-                       free(buf);                      
-                       rc = -1;
-                       goto fail;
-               }
-               *key = start;
-               if (!hashtable_insert(ramdisk->inprogress, key, NULL)) {
-                       DPRINTF("%s failed to insert sector %" PRIu64 " into 
inprogress hash\n", 
-                               __FUNCTION__, start);
-                       free(key);
-                       free(buf);
-                       rc = -1;
-                       goto fail;
-               }
-               memcpy(buf + i * ramdisk->sector_size, sector, 
ramdisk->sector_size);
-               start++;
-       }
-
-       *mergedbuf = buf;
-       return 0;
-fail:
-       for (start--; i >0; i--, start--)
-               hashtable_remove(ramdisk->inprogress, &start);
-       return rc;
-}
-
-/* The underlying driver may not handle having the whole ramdisk queued at
- * once. We queue what we can and let the callbacks attempt to queue more. */
-/* NOTE: may be called from callback, while dd->private still belongs to
- * the underlying driver */
-static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s)
-{
-       uint64_t* sectors;
-       char* buf = NULL;
-       uint64_t base, batchlen;
-       int i, j, count = 0;
-
-       // RPRINTF("ramdisk flush\n");
-
-       if ((count = ramdisk_get_sectors(s->ramdisk.prev, &sectors)) <= 0)
-               return count;
-
-       /* Create the inprogress table if empty */
-       if (!s->ramdisk.inprogress)
-               s->ramdisk.inprogress = create_hashtable(RAMDISK_HASHSIZE,
-                                                       uint64_hash,
-                                                       rd_hash_equal);
-       
-       /*
-         RPRINTF("ramdisk: flushing %d sectors\n", count);
-       */
-
-       /* sort and merge sectors to improve disk performance */
-       qsort(sectors, count, sizeof(*sectors), uint64_compare);
-
-       for (i = 0; i < count;) {
-               base = sectors[i++];
-               while (i < count && sectors[i] == sectors[i-1] + 1)
-                       i++;
-               batchlen = sectors[i-1] - base + 1;
-
-               j = merge_requests(&s->ramdisk, base, batchlen, &buf);
-                       
-               if (j) {
-                       RPRINTF("ramdisk_flush: merge_requests failed:%s\n",
-                               j == -1? "OOM": (j==-2? "missing sector" : "WAW 
race"));
-                       if (j == -3) continue;
-                       free(sectors);
-                       return -1;
-               }
-
-               /* NOTE: create_write_request() creates a treq AND forwards it 
down
-                * the driver chain */
-               // RPRINTF("forwarding write request at %" PRIu64 ", length: %" 
PRIu64 "\n", base, batchlen);
-               create_write_request(s, base, batchlen, buf);
-               //RPRINTF("write request at %" PRIu64 ", length: %" PRIu64 " 
forwarded\n", base, batchlen);
-
-               s->ramdisk.inflight++;
-
-               for (j = 0; j < batchlen; j++) {
-                       buf = hashtable_search(s->ramdisk.prev, &base);
-                       free(buf);
-                       hashtable_remove(s->ramdisk.prev, &base);
-                       base++;
-               }
-       }
-
-       if (!hashtable_count(s->ramdisk.prev)) {
-               /* everything is in flight */
-               hashtable_destroy(s->ramdisk.prev, 0);
-               s->ramdisk.prev = NULL;
-       }
-
-       free(sectors);
-
-       // RPRINTF("ramdisk flush done\n");
-       return 0;
-}
-
-/* flush ramdisk contents to disk */
-static int ramdisk_start_flush(td_driver_t *driver)
-{
-       struct tdremus_state *s = (struct tdremus_state *)driver->data;
-       uint64_t* key;
-       char* buf;
-       int rc = 0;
-       int i, j, count, batchlen;
-       uint64_t* sectors;
-
-       if (!hashtable_count(s->ramdisk.h)) {
-               /*
-                 RPRINTF("Nothing to flush\n");
-               */
-               return 0;
-       }
-
-       if (s->ramdisk.prev) {
-               /* a flush request issued while a previous flush is still in 
progress
-                * will merge with the previous request. If you want the 
previous
-                * request to be consistent, wait for it to complete. */
-               if ((count = ramdisk_get_sectors(s->ramdisk.h, &sectors)) < 0)
-                       return count;
-
-               for (i = 0; i < count; i++) {
-                       buf = hashtable_search(s->ramdisk.h, sectors + i);
-                       ramdisk_write_hash(s->ramdisk.prev, sectors[i], buf,
-                                          s->ramdisk.sector_size);
-               }
-               free(sectors);
-
-               hashtable_destroy (s->ramdisk.h, 1);
-       } else
-               s->ramdisk.prev = s->ramdisk.h;
-
-       /* We create a new hashtable so that new writes can be performed before
-        * the old hashtable is completely drained. */
-       s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
-                                       rd_hash_equal);
-
-       return ramdisk_flush(driver, s);
-}
-
-
 static int ramdisk_start(td_driver_t *driver)
 {
        struct tdremus_state *s = (struct tdremus_state *)driver->data;
 
-       if (s->ramdisk.h) {
+       if (s->h) {
                RPRINTF("ramdisk already allocated\n");
                return 0;
        }
 
        s->ramdisk.sector_size = driver->info.sector_size;
-       s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
-                                       rd_hash_equal);
+       s->ramdisk.log_prefix = "remus";
+       s->ramdisk.image = remus_image;
+       ramdisk_init(&s->ramdisk);
+       s->h = ramdisk_new_hashtable();
 
        DPRINTF("Ramdisk started, %zu bytes/sector\n", s->ramdisk.sector_size);
 
@@ -1024,10 +597,7 @@ static inline int server_writes_inflight(td_driver_t 
*driver)
 {
        struct tdremus_state *s = (struct tdremus_state *)driver->data;
 
-       if (!s->ramdisk.inflight && !s->ramdisk.prev)
-               return 0;
-
-       return 1;
+       return ramdisk_writes_inflight(&s->ramdisk);
 }
 
 /* Due to block device prefetching this code may be called on the server side
@@ -1067,13 +637,9 @@ void backup_queue_write(td_driver_t *driver, td_request_t 
treq)
 static int server_flush(td_driver_t *driver)
 {
        struct tdremus_state *s = (struct tdremus_state *)driver->data;
-       /*
-        * Nothing to flush in beginning.
-        */
-       if (!s->ramdisk.prev)
-               return 0;
+
        /* Try to flush any remaining requests */
-       return ramdisk_flush(driver, s);
+       return ramdisk_flush(&s->ramdisk);
 }
 
 static int backup_start(td_driver_t *driver)
@@ -1120,7 +686,9 @@ static void server_do_wreq(td_driver_t *driver)
        if (mread(s->stream_fd.fd, buf, len) < 0)
                goto err;
 
-       if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0) {
+       if (ramdisk_write_to_hashtable(s->h, *sector, *sectors,
+                                      driver->info.sector_size, buf,
+                                      "remus") < 0) {
                rc = ERROR_INTERNAL;
                goto err;
        }
@@ -1150,7 +718,7 @@ static void server_do_creq(td_driver_t *driver)
 
        // RPRINTF("committing buffer\n");
 
-       ramdisk_start_flush(driver);
+       ramdisk_start_flush(&s->ramdisk, &s->h);
 
        /* XXX this message should not be sent until flush completes! */
        if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4)
@@ -1199,12 +767,7 @@ void unprotected_queue_read(td_driver_t *driver, 
td_request_t treq)
 
        /* wait for previous ramdisk to flush  before servicing reads */
        if (server_writes_inflight(driver)) {
-               /* for now lets just return EBUSY.
-                * if there are any left-over requests in prev,
-                * kick em again.
-                */
-               if(!s->ramdisk.inflight) /* nothing in inprogress */
-                       ramdisk_flush(driver, s);
+               ramdisk_flush(&s->ramdisk);
 
                td_complete_request(treq, -EBUSY);
        }
@@ -1222,8 +785,7 @@ void unprotected_queue_write(td_driver_t *driver, 
td_request_t treq)
        /* wait for previous ramdisk to flush */
        if (server_writes_inflight(driver)) {
                RPRINTF("queue_write: waiting for queue to drain");
-               if(!s->ramdisk.inflight) /* nothing in inprogress. Kick prev */
-                       ramdisk_flush(driver, s);
+               ramdisk_flush(&s->ramdisk);
                td_complete_request(treq, -EBUSY);
        }
        else {
@@ -1532,9 +1094,8 @@ static int tdremus_close(td_driver_t *driver)
        struct tdremus_state *s = (struct tdremus_state *)driver->data;
 
        RPRINTF("closing\n");
-       if (s->ramdisk.inprogress)
-               hashtable_destroy(s->ramdisk.inprogress, 0);
-
+       ramdisk_destroy(&s->ramdisk);
+       ramdisk_destroy_hashtable(s->h);
        td_replication_connect_kill(&s->t);
        ctl_unregister(s);
        ctl_close(s);
diff --git a/tools/blktap2/drivers/block-replication.c 
b/tools/blktap2/drivers/block-replication.c
index e4b2679..30eba8f 100644
--- a/tools/blktap2/drivers/block-replication.c
+++ b/tools/blktap2/drivers/block-replication.c
@@ -15,6 +15,10 @@
 
 #include "tapdisk-server.h"
 #include "block-replication.h"
+#include "tapdisk-interface.h"
+#include "hashtable.h"
+#include "hashtable_itr.h"
+#include "hashtable_utility.h"
 
 #include <string.h>
 #include <errno.h>
@@ -30,6 +34,8 @@
 #define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _f, log_prefix, ## _a)
 #define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f, log_prefix, ## _a)
 
+#define RAMDISK_HASHSIZE 128
+
 /* connection status */
 enum {
        connection_none,
@@ -466,3 +472,449 @@ static void td_replication_connect_event(event_id_t id, 
char mode,
 fail:
        td_replication_client_failed(t, rc);
 }
+
+
+/* I/O replication */
+static void replicated_write_callback(td_request_t treq, int err)
+{
+       ramdisk_t *ramdisk = treq.cb_data;
+       td_vbd_request_t *vreq = treq.private;
+       int i;
+       uint64_t start;
+       const char *log_prefix = ramdisk->log_prefix;
+
+       /* the write failed for now, lets panic. this is very bad */
+       if (err) {
+               EPRINTF("ramdisk write failed, disk image is not consistent\n");
+               exit(-1);
+       }
+
+       /*
+        * The write succeeded. let's pull the vreq off whatever request list
+        * it is on and free() it
+        */
+       list_del(&vreq->next);
+       free(vreq);
+
+       ramdisk->inflight--;
+       start = treq.sec;
+       for (i = 0; i < treq.secs; i++) {
+               hashtable_remove(ramdisk->inprogress, &start);
+               start++;
+       }
+       free(treq.buf);
+
+       if (!ramdisk->inflight && ramdisk->prev)
+               ramdisk_flush(ramdisk);
+}
+
+static int
+create_write_request(ramdisk_t *ramdisk, td_sector_t sec, int secs, char *buf)
+{
+       td_request_t treq;
+       td_vbd_request_t *vreq;
+       td_vbd_t *vbd = ramdisk->image->private;
+
+       treq.op      = TD_OP_WRITE;
+       treq.buf     = buf;
+       treq.sec     = sec;
+       treq.secs    = secs;
+       treq.image   = ramdisk->image;
+       treq.cb      = replicated_write_callback;
+       treq.cb_data = ramdisk;
+       treq.id      = 0;
+       treq.sidx    = 0;
+
+       vreq         = calloc(1, sizeof(td_vbd_request_t));
+       treq.private = vreq;
+
+       if(!vreq)
+               return -1;
+
+       vreq->submitting = 1;
+       INIT_LIST_HEAD(&vreq->next);
+       tapdisk_vbd_move_request(treq.private, &vbd->pending_requests);
+
+       td_forward_request(treq);
+
+       vreq->submitting--;
+
+       return 0;
+}
+
+/* http://www.concentric.net/~Ttwang/tech/inthash.htm */
+static unsigned int uint64_hash(void *k)
+{
+       uint64_t key = *(uint64_t*)k;
+
+       key = (~key) + (key << 18);
+       key = key ^ (key >> 31);
+       key = key * 21;
+       key = key ^ (key >> 11);
+       key = key + (key << 6);
+       key = key ^ (key >> 22);
+
+       return (unsigned int)key;
+}
+
+static int rd_hash_equal(void *k1, void *k2)
+{
+       uint64_t key1, key2;
+
+       key1 = *(uint64_t*)k1;
+       key2 = *(uint64_t*)k2;
+
+       return key1 == key2;
+}
+
+static int uint64_compare(const void *k1, const void *k2)
+{
+       uint64_t u1 = *(uint64_t*)k1;
+       uint64_t u2 = *(uint64_t*)k2;
+
+       /* u1 - u2 is unsigned */
+       return u1 < u2 ? -1 : u1 > u2 ? 1 : 0;
+}
+
+/*
+ * set psectors to an array of the sector numbers in the hash, returning
+ * the number of entries (or -1 on error)
+ */
+static int ramdisk_get_sectors(struct hashtable *h, uint64_t **psectors,
+                              const char *log_prefix)
+{
+       struct hashtable_itr* itr;
+       uint64_t* sectors;
+       int count;
+
+       if (!(count = hashtable_count(h)))
+               return 0;
+
+       if (!(*psectors = malloc(count * sizeof(uint64_t)))) {
+               DPRINTF("ramdisk_get_sectors: error allocating sector map\n");
+               return -1;
+       }
+       sectors = *psectors;
+
+       itr = hashtable_iterator(h);
+       count = 0;
+       do {
+               sectors[count++] = *(uint64_t*)hashtable_iterator_key(itr);
+       } while (hashtable_iterator_advance(itr));
+       free(itr);
+
+       return count;
+}
+
+static int ramdisk_write_hash(struct hashtable *h, uint64_t sector, char *buf,
+                             size_t len, const char *log_prefix)
+{
+       char *v;
+       uint64_t *key;
+
+       if ((v = hashtable_search(h, &sector))) {
+               memcpy(v, buf, len);
+               return 0;
+       }
+
+       if (!(v = malloc(len))) {
+               DPRINTF("ramdisk_write_hash: malloc failed\n");
+               return -1;
+       }
+       memcpy(v, buf, len);
+       if (!(key = malloc(sizeof(*key)))) {
+               DPRINTF("ramdisk_write_hash: error allocating key\n");
+               free(v);
+               return -1;
+       }
+       *key = sector;
+       if (!hashtable_insert(h, key, v)) {
+               DPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n", 
sector);
+               free(key);
+               free(v);
+               return -1;
+       }
+
+       return 0;
+}
+
+/*
+ * return -1 for OOM
+ * return -2 for merge lookup failure(should not happen)
+ * return -3 for WAW race
+ * return 0 on success.
+ */
+static int merge_requests(struct ramdisk *ramdisk, uint64_t start,
+                         size_t count, char **mergedbuf)
+{
+       char* buf;
+       char* sector;
+       int i;
+       uint64_t *key;
+       int rc = 0;
+       const char *log_prefix = ramdisk->log_prefix;
+
+       if (!(buf = valloc(count * ramdisk->sector_size))) {
+               DPRINTF("merge_request: allocation failed\n");
+               return -1;
+       }
+
+       for (i = 0; i < count; i++) {
+               if (!(sector = hashtable_search(ramdisk->prev, &start))) {
+                       EPRINTF("merge_request: lookup failed on %"PRIu64"\n",
+                               start);
+                       free(buf);
+                       rc = -2;
+                       goto fail;
+               }
+
+               /* Check inprogress requests to avoid waw non-determinism */
+               if (hashtable_search(ramdisk->inprogress, &start)) {
+                       DPRINTF("merge_request: WAR RACE on %"PRIu64"\n",
+                               start);
+                       free(buf);
+                       rc = -3;
+                       goto fail;
+               }
+
+               /*
+                * Insert req into inprogress (brief period of duplication of
+                * hash entries until they are removed from prev. Read tracking
+                * would not be reading wrong entries)
+                */
+               if (!(key = malloc(sizeof(*key)))) {
+                       EPRINTF("%s: error allocating key\n", __FUNCTION__);
+                       free(buf);
+                       rc = -1;
+                       goto fail;
+               }
+               *key = start;
+               if (!hashtable_insert(ramdisk->inprogress, key, NULL)) {
+                       EPRINTF("%s failed to insert sector %" PRIu64 " into 
inprogress hash\n",
+                               __FUNCTION__, start);
+                       free(key);
+                       free(buf);
+                       rc = -1;
+                       goto fail;
+               }
+
+               memcpy(buf + i * ramdisk->sector_size, sector, 
ramdisk->sector_size);
+               start++;
+       }
+
+       *mergedbuf = buf;
+       return 0;
+fail:
+       for (start--; i > 0; i--, start--)
+               hashtable_remove(ramdisk->inprogress, &start);
+       return rc;
+}
+
+int ramdisk_flush(ramdisk_t *ramdisk)
+{
+       uint64_t *sectors;
+       char *buf = NULL;
+       uint64_t base, batchlen;
+       int i, j, count = 0;
+       const char *log_prefix = ramdisk->log_prefix;
+
+       /* everything is in flight */
+       if (!ramdisk->prev)
+               return 0;
+
+       count = ramdisk_get_sectors(ramdisk->prev, &sectors, log_prefix);
+       if (count <= 0)
+               /* should not happen */
+               return count;
+
+       /* Create the inprogress table if empty */
+       if (!ramdisk->inprogress)
+               ramdisk->inprogress = ramdisk_new_hashtable();
+
+       /* sort and merge sectors to improve disk performance */
+       qsort(sectors, count, sizeof(*sectors), uint64_compare);
+
+       for (i = 0; i < count;) {
+               base = sectors[i++];
+               while (i < count && sectors[i] == sectors[i-1] + 1)
+                       i++;
+               batchlen = sectors[i-1] - base + 1;
+
+               j = merge_requests(ramdisk, base, batchlen, &buf);
+               if (j) {
+                       EPRINTF("ramdisk_flush: merge_requests failed:%s\n",
+                               j == -1 ? "OOM" :
+                                       (j == -2 ? "missing sector" :
+                                                "WAW race"));
+                       if (j == -3)
+                               continue;
+                       free(sectors);
+                       return -1;
+               }
+
+               /*
+                * NOTE: create_write_request() creates a treq AND forwards
+                * it down the driver chain
+                *
+                * TODO: handle create_write_request()'s error.
+                */
+               create_write_request(ramdisk, base, batchlen, buf);
+
+               ramdisk->inflight++;
+
+               for (j = 0; j < batchlen; j++) {
+                       buf = hashtable_search(ramdisk->prev, &base);
+                       free(buf);
+                       hashtable_remove(ramdisk->prev, &base);
+                       base++;
+               }
+       }
+
+       if (!hashtable_count(ramdisk->prev)) {
+               /* everything is in flight */
+               hashtable_destroy(ramdisk->prev, 0);
+               ramdisk->prev = NULL;
+       }
+
+       free(sectors);
+       return 0;
+}
+
+int ramdisk_start_flush(ramdisk_t *ramdisk, struct hashtable **new)
+{
+       uint64_t *key;
+       char *buf;
+       int rc = 0;
+       int i, j, count, batchlen;
+       uint64_t *sectors;
+       const char *log_prefix = ramdisk->log_prefix;
+
+       if (!hashtable_count(*new))
+               return 0;
+
+       if (ramdisk->prev) {
+               /*
+                * a flush request issued while a previous flush is still in
+                * progress will merge with the previous request. If you want
+                * the previous request to be consistent, wait for it to
+                * complete.
+                */
+               count = ramdisk_get_sectors(*new, &sectors, log_prefix);
+               if (count < 0 )
+                       return count;
+
+               for (i = 0; i < count; i++) {
+                       buf = hashtable_search(*new, sectors + i);
+                       ramdisk_write_hash(ramdisk->prev, sectors[i], buf,
+                                          ramdisk->sector_size, log_prefix);
+               }
+               free(sectors);
+
+               hashtable_destroy(*new, 1);
+       } else
+               ramdisk->prev = *new;
+
+       /*
+        * We create a new hashtable so that new writes can be performed before
+        * the old hashtable is completely drained.
+        */
+       *new = ramdisk_new_hashtable();
+
+       return ramdisk_flush(ramdisk);
+}
+
+void ramdisk_init(ramdisk_t *ramdisk)
+{
+       ramdisk->inflight = 0;
+       ramdisk->prev = NULL;
+       ramdisk->inprogress = NULL;
+}
+
+void ramdisk_destroy(ramdisk_t *ramdisk)
+{
+       const char *log_prefix = ramdisk->log_prefix;
+
+       /*
+        * ramdisk_destroy() is called only when we will close the tapdisk 
image.
+        * In this case, there are no pending requests in vbd.
+        *
+        * If ramdisk->inflight is not 0, it means that the requests created by
+        * us are still in vbd->pending_requests.
+        */
+       if (ramdisk->inflight) {
+               /* should not happen */
+               EPRINTF("cannot destroy ramdisk\n");
+               return;
+       }
+
+       if (ramdisk->inprogress) {
+               hashtable_destroy(ramdisk->inprogress, 0);
+               ramdisk->inprogress = NULL;
+       }
+
+       if (ramdisk->prev) {
+               hashtable_destroy(ramdisk->prev, 1);
+               ramdisk->prev = NULL;
+       }
+}
+
+int ramdisk_writes_inflight(ramdisk_t *ramdisk)
+{
+       if (!ramdisk->inflight && !ramdisk->prev)
+               return 0;
+
+       return 1;
+}
+
+int ramdisk_read(struct ramdisk *ramdisk, uint64_t sector,
+                int nb_sectors, char *buf)
+{
+       int i;
+       char *v;
+       uint64_t key;
+
+       for (i = 0; i < nb_sectors; i++) {
+               key = sector + i;
+               /* check whether it is queued in a previous flush request */
+               if (!(ramdisk->prev &&
+                   (v = hashtable_search(ramdisk->prev, &key)))) {
+                       /* check whether it is an ongoing flush */
+                       if (!(ramdisk->inprogress &&
+                           (v = hashtable_search(ramdisk->inprogress, &key))))
+                               return -1;
+               }
+               memcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size);
+       }
+
+       return 0;
+}
+
+struct hashtable *ramdisk_new_hashtable(void)
+{
+       return create_hashtable(RAMDISK_HASHSIZE, uint64_hash, rd_hash_equal);
+}
+
+int ramdisk_write_to_hashtable(struct hashtable *h, uint64_t sector,
+                              int nb_sectors, size_t sector_size, char* buf,
+                              const char *log_prefix)
+{
+       int i, rc;
+
+       for (i = 0; i < nb_sectors; i++) {
+               rc = ramdisk_write_hash(h, sector + i,
+                                       buf + i * sector_size,
+                                       sector_size, log_prefix);
+               if (rc)
+                       return rc;
+       }
+
+       return 0;
+}
+
+void ramdisk_destroy_hashtable(struct hashtable *h)
+{
+       if (!h)
+               return;
+
+       hashtable_destroy(h, 1);
+}
diff --git a/tools/blktap2/drivers/block-replication.h 
b/tools/blktap2/drivers/block-replication.h
index 0bd6e71..fdc216e 100644
--- a/tools/blktap2/drivers/block-replication.h
+++ b/tools/blktap2/drivers/block-replication.h
@@ -110,4 +110,52 @@ int td_replication_server_restart(td_replication_connect_t 
*t);
  */
 int td_replication_client_start(td_replication_connect_t *t);
 
+/* I/O replication */
+typedef struct ramdisk ramdisk_t;
+struct ramdisk {
+       size_t sector_size;
+       const char *log_prefix;
+       td_image_t *image;
+
+       /* private */
+       /* count of outstanding requests to the base driver */
+       size_t inflight;
+       /* prev holds the requests to be flushed, while inprogress holds
+        * requests being flushed. When requests complete, they are removed
+        * from inprogress.
+        * Whenever a new flush is merged with ongoing flush (i.e, prev),
+        * we have to make sure that none of the new requests overlap with
+        * ones in "inprogress". If it does, keep it back in prev and dont issue
+        * IO until the current one finishes. If we allow this IO to proceed,
+        * we might end up with two "overlapping" requests in the disk's queue 
and
+        * the disk may not offer any guarantee on which one is written first.
+        * IOW, make sure we dont create a write-after-write time ordering 
constraint.
+        */
+       struct hashtable *prev;
+       struct hashtable *inprogress;
+};
+
+void ramdisk_init(ramdisk_t *ramdisk);
+void ramdisk_destroy(ramdisk_t *ramdisk);
+
+/* flush pending contents to disk */
+int ramdisk_flush(ramdisk_t *ramdisk);
+/* flush new contents to disk */
+int ramdisk_start_flush(ramdisk_t *ramdisk, struct hashtable **new);
+int ramdisk_writes_inflight(ramdisk_t *ramdisk);
+
+/*
+ * try to read from ramdisk. Return -1 if some sectors are not in
+ * ramdisk. Otherwise, return 0.
+ */
+int ramdisk_read(struct ramdisk *ramdisk, uint64_t sector,
+                int nb_sectors, char *buf);
+
+/* create a new hashtable that can be used by ramdisk */
+struct hashtable *ramdisk_new_hashtable(void);
+int ramdisk_write_to_hashtable(struct hashtable *h, uint64_t sector,
+                              int nb_sectors, size_t sector_size, char* buf,
+                              const char *log_prefix);
+void ramdisk_destroy_hashtable(struct hashtable *h);
+
 #endif
-- 
1.9.3


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.