[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [RFC Patch v3 16/22] blktap2: move ramdisk related codes to block-replication.c
COLO will reuse them Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx> Cc: Shriram Rajagopalan <rshriram@xxxxxxxxx> --- tools/blktap2/drivers/block-remus.c | 485 ++---------------------------- tools/blktap2/drivers/block-replication.c | 452 ++++++++++++++++++++++++++++ tools/blktap2/drivers/block-replication.h | 48 +++ 3 files changed, 523 insertions(+), 462 deletions(-) diff --git a/tools/blktap2/drivers/block-remus.c b/tools/blktap2/drivers/block-remus.c index 8b6f157..2713af1 100644 --- a/tools/blktap2/drivers/block-remus.c +++ b/tools/blktap2/drivers/block-remus.c @@ -37,9 +37,6 @@ #include "tapdisk-server.h" #include "tapdisk-driver.h" #include "tapdisk-interface.h" -#include "hashtable.h" -#include "hashtable_itr.h" -#include "hashtable_utility.h" #include "block-replication.h" #include <errno.h> @@ -58,7 +55,6 @@ /* timeout for reads and writes in ms */ #define HEARTBEAT_MS 1000 -#define RAMDISK_HASHSIZE 128 /* connect retry timeout (seconds) */ #define REMUS_CONNRETRY_TIMEOUT 1 @@ -97,51 +93,6 @@ td_vbd_t *device_vbd = NULL; td_image_t *remus_image = NULL; struct tap_disk tapdisk_remus; -struct ramdisk { - size_t sector_size; - struct hashtable* h; - /* when a ramdisk is flushed, h is given a new empty hash for writes - * while the old ramdisk (prev) is drained asynchronously. - */ - struct hashtable* prev; - /* count of outstanding requests to the base driver */ - size_t inflight; - /* prev holds the requests to be flushed, while inprogress holds - * requests being flushed. When requests complete, they are removed - * from inprogress. - * Whenever a new flush is merged with ongoing flush (i.e, prev), - * we have to make sure that none of the new requests overlap with - * ones in "inprogress". If it does, keep it back in prev and dont issue - * IO until the current one finishes. If we allow this IO to proceed, - * we might end up with two "overlapping" requests in the disk's queue and - * the disk may not offer any guarantee on which one is written first. - * IOW, make sure we dont create a write-after-write time ordering constraint. - * - */ - struct hashtable* inprogress; -}; - -/* the ramdisk intercepts the original callback for reads and writes. - * This holds the original data. */ -/* Might be worth making this a static array in struct ramdisk to avoid - * a malloc per request */ - -struct tdremus_state; - -struct ramdisk_cbdata { - td_callback_t cb; - void* private; - char* buf; - struct tdremus_state* state; -}; - -struct ramdisk_write_cbdata { - struct tdremus_state* state; - char* buf; -}; - -typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq); - typedef struct poll_fd { int fd; event_id_t id; @@ -167,8 +118,14 @@ struct tdremus_state { */ struct req_ring queued_io; - /* ramdisk data*/ + /* ramdisk data */ struct ramdisk ramdisk; + /* + * The primary write request is queued in this + * hashtable, and will be flushed to ramdisk when + * the checkpoint finishes. + */ + struct hashtable *h; /* mode methods */ enum tdremus_mode mode; @@ -239,404 +196,20 @@ static void ring_add_request(struct req_ring *ring, const td_request_t *treq) ring->prod = ring_next(ring->prod); } -/* Prototype declarations */ -static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s); - -/* functions to create and sumbit treq's */ - -static void -replicated_write_callback(td_request_t treq, int err) -{ - struct tdremus_state *s = (struct tdremus_state *) treq.cb_data; - td_vbd_request_t *vreq; - int i; - uint64_t start; - vreq = (td_vbd_request_t *) treq.private; - - /* the write failed for now, lets panic. this is very bad */ - if (err) { - RPRINTF("ramdisk write failed, disk image is not consistent\n"); - exit(-1); - } - - /* The write succeeded. let's pull the vreq off whatever request list - * it is on and free() it */ - list_del(&vreq->next); - free(vreq); - - s->ramdisk.inflight--; - start = treq.sec; - for (i = 0; i < treq.secs; i++) { - hashtable_remove(s->ramdisk.inprogress, &start); - start++; - } - free(treq.buf); - - if (!s->ramdisk.inflight && !s->ramdisk.prev) { - /* TODO: the ramdisk has been flushed */ - } -} - -static inline int -create_write_request(struct tdremus_state *state, td_sector_t sec, int secs, char *buf) -{ - td_request_t treq; - td_vbd_request_t *vreq; - - treq.op = TD_OP_WRITE; - treq.buf = buf; - treq.sec = sec; - treq.secs = secs; - treq.image = remus_image; - treq.cb = replicated_write_callback; - treq.cb_data = state; - treq.id = 0; - treq.sidx = 0; - - vreq = calloc(1, sizeof(td_vbd_request_t)); - treq.private = vreq; - - if(!vreq) - return -1; - - vreq->submitting = 1; - INIT_LIST_HEAD(&vreq->next); - tapdisk_vbd_move_request(treq.private, &device_vbd->pending_requests); - - /* TODO: - * we should probably leave it up to the caller to forward the request */ - td_forward_request(treq); - - vreq->submitting--; - - return 0; -} - - -/* http://www.concentric.net/~Ttwang/tech/inthash.htm */ -static unsigned int uint64_hash(void* k) -{ - uint64_t key = *(uint64_t*)k; - - key = (~key) + (key << 18); - key = key ^ (key >> 31); - key = key * 21; - key = key ^ (key >> 11); - key = key + (key << 6); - key = key ^ (key >> 22); - - return (unsigned int)key; -} - -static int rd_hash_equal(void* k1, void* k2) -{ - uint64_t key1, key2; - - key1 = *(uint64_t*)k1; - key2 = *(uint64_t*)k2; - - return key1 == key2; -} - -static int ramdisk_read(struct ramdisk* ramdisk, uint64_t sector, - int nb_sectors, char* buf) -{ - int i; - char* v; - uint64_t key; - - for (i = 0; i < nb_sectors; i++) { - key = sector + i; - /* check whether it is queued in a previous flush request */ - if (!(ramdisk->prev && (v = hashtable_search(ramdisk->prev, &key)))) { - /* check whether it is an ongoing flush */ - if (!(ramdisk->inprogress && (v = hashtable_search(ramdisk->inprogress, &key)))) - return -1; - } - memcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size); - } - - return 0; -} - -static int ramdisk_write_hash(struct hashtable* h, uint64_t sector, char* buf, - size_t len) -{ - char* v; - uint64_t* key; - - if ((v = hashtable_search(h, §or))) { - memcpy(v, buf, len); - return 0; - } - - if (!(v = malloc(len))) { - DPRINTF("ramdisk_write_hash: malloc failed\n"); - return -1; - } - memcpy(v, buf, len); - if (!(key = malloc(sizeof(*key)))) { - DPRINTF("ramdisk_write_hash: error allocating key\n"); - free(v); - return -1; - } - *key = sector; - if (!hashtable_insert(h, key, v)) { - DPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n", sector); - free(key); - free(v); - return -1; - } - - return 0; -} - -static inline int ramdisk_write(struct ramdisk* ramdisk, uint64_t sector, - int nb_sectors, char* buf) -{ - int i, rc; - - for (i = 0; i < nb_sectors; i++) { - rc = ramdisk_write_hash(ramdisk->h, sector + i, - buf + i * ramdisk->sector_size, - ramdisk->sector_size); - if (rc) - return rc; - } - - return 0; -} - -static int uint64_compare(const void* k1, const void* k2) -{ - uint64_t u1 = *(uint64_t*)k1; - uint64_t u2 = *(uint64_t*)k2; - - /* u1 - u2 is unsigned */ - return u1 < u2 ? -1 : u1 > u2 ? 1 : 0; -} - -/* set psectors to an array of the sector numbers in the hash, returning - * the number of entries (or -1 on error) */ -static int ramdisk_get_sectors(struct hashtable* h, uint64_t** psectors) -{ - struct hashtable_itr* itr; - uint64_t* sectors; - int count; - - if (!(count = hashtable_count(h))) - return 0; - - if (!(*psectors = malloc(count * sizeof(uint64_t)))) { - DPRINTF("ramdisk_get_sectors: error allocating sector map\n"); - return -1; - } - sectors = *psectors; - - itr = hashtable_iterator(h); - count = 0; - do { - sectors[count++] = *(uint64_t*)hashtable_iterator_key(itr); - } while (hashtable_iterator_advance(itr)); - free(itr); - - return count; -} - -/* - return -1 for OOM - return -2 for merge lookup failure - return -3 for WAW race - return 0 on success. -*/ -static int merge_requests(struct ramdisk* ramdisk, uint64_t start, - size_t count, char **mergedbuf) -{ - char* buf; - char* sector; - int i; - uint64_t *key; - int rc = 0; - - if (!(buf = valloc(count * ramdisk->sector_size))) { - DPRINTF("merge_request: allocation failed\n"); - return -1; - } - - for (i = 0; i < count; i++) { - if (!(sector = hashtable_search(ramdisk->prev, &start))) { - DPRINTF("merge_request: lookup failed on %"PRIu64"\n", start); - free(buf); - rc = -2; - goto fail; - } - - /* Check inprogress requests to avoid waw non-determinism */ - if (hashtable_search(ramdisk->inprogress, &start)) { - DPRINTF("merge_request: WAR RACE on %"PRIu64"\n", start); - free(buf); - rc = -3; - goto fail; - } - /* Insert req into inprogress (brief period of duplication of hash entries until - * they are removed from prev. Read tracking would not be reading wrong entries) - */ - if (!(key = malloc(sizeof(*key)))) { - DPRINTF("%s: error allocating key\n", __FUNCTION__); - free(buf); - rc = -1; - goto fail; - } - *key = start; - if (!hashtable_insert(ramdisk->inprogress, key, NULL)) { - DPRINTF("%s failed to insert sector %" PRIu64 " into inprogress hash\n", - __FUNCTION__, start); - free(key); - free(buf); - rc = -1; - goto fail; - } - memcpy(buf + i * ramdisk->sector_size, sector, ramdisk->sector_size); - start++; - } - - *mergedbuf = buf; - return 0; -fail: - for (start--; i >0; i--, start--) - hashtable_remove(ramdisk->inprogress, &start); - return rc; -} - -/* The underlying driver may not handle having the whole ramdisk queued at - * once. We queue what we can and let the callbacks attempt to queue more. */ -/* NOTE: may be called from callback, while dd->private still belongs to - * the underlying driver */ -static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s) -{ - uint64_t* sectors; - char* buf = NULL; - uint64_t base, batchlen; - int i, j, count = 0; - - // RPRINTF("ramdisk flush\n"); - - if ((count = ramdisk_get_sectors(s->ramdisk.prev, §ors)) <= 0) - return count; - - /* Create the inprogress table if empty */ - if (!s->ramdisk.inprogress) - s->ramdisk.inprogress = create_hashtable(RAMDISK_HASHSIZE, - uint64_hash, - rd_hash_equal); - - /* - RPRINTF("ramdisk: flushing %d sectors\n", count); - */ - - /* sort and merge sectors to improve disk performance */ - qsort(sectors, count, sizeof(*sectors), uint64_compare); - - for (i = 0; i < count;) { - base = sectors[i++]; - while (i < count && sectors[i] == sectors[i-1] + 1) - i++; - batchlen = sectors[i-1] - base + 1; - - j = merge_requests(&s->ramdisk, base, batchlen, &buf); - - if (j) { - RPRINTF("ramdisk_flush: merge_requests failed:%s\n", - j == -1? "OOM": (j==-2? "missing sector" : "WAW race")); - if (j == -3) continue; - free(sectors); - return -1; - } - - /* NOTE: create_write_request() creates a treq AND forwards it down - * the driver chain */ - // RPRINTF("forwarding write request at %" PRIu64 ", length: %" PRIu64 "\n", base, batchlen); - create_write_request(s, base, batchlen, buf); - //RPRINTF("write request at %" PRIu64 ", length: %" PRIu64 " forwarded\n", base, batchlen); - - s->ramdisk.inflight++; - - for (j = 0; j < batchlen; j++) { - buf = hashtable_search(s->ramdisk.prev, &base); - free(buf); - hashtable_remove(s->ramdisk.prev, &base); - base++; - } - } - - if (!hashtable_count(s->ramdisk.prev)) { - /* everything is in flight */ - hashtable_destroy(s->ramdisk.prev, 0); - s->ramdisk.prev = NULL; - } - - free(sectors); - - // RPRINTF("ramdisk flush done\n"); - return 0; -} - -/* flush ramdisk contents to disk */ -static int ramdisk_start_flush(td_driver_t *driver) -{ - struct tdremus_state *s = (struct tdremus_state *)driver->data; - uint64_t* key; - char* buf; - int rc = 0; - int i, j, count, batchlen; - uint64_t* sectors; - - if (!hashtable_count(s->ramdisk.h)) { - /* - RPRINTF("Nothing to flush\n"); - */ - return 0; - } - - if (s->ramdisk.prev) { - /* a flush request issued while a previous flush is still in progress - * will merge with the previous request. If you want the previous - * request to be consistent, wait for it to complete. */ - if ((count = ramdisk_get_sectors(s->ramdisk.h, §ors)) < 0) - return count; - - for (i = 0; i < count; i++) { - buf = hashtable_search(s->ramdisk.h, sectors + i); - ramdisk_write_hash(s->ramdisk.prev, sectors[i], buf, - s->ramdisk.sector_size); - } - free(sectors); - - hashtable_destroy (s->ramdisk.h, 1); - } else - s->ramdisk.prev = s->ramdisk.h; - - /* We create a new hashtable so that new writes can be performed before - * the old hashtable is completely drained. */ - s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash, - rd_hash_equal); - - return ramdisk_flush(driver, s); -} - - static int ramdisk_start(td_driver_t *driver) { struct tdremus_state *s = (struct tdremus_state *)driver->data; - if (s->ramdisk.h) { + if (s->h) { RPRINTF("ramdisk already allocated\n"); return 0; } s->ramdisk.sector_size = driver->info.sector_size; - s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash, - rd_hash_equal); + s->ramdisk.log_prefix = "remus"; + s->ramdisk.image = remus_image; + ramdisk_init(&s->ramdisk); + s->h = ramdisk_new_hashtable(); DPRINTF("Ramdisk started, %zu bytes/sector\n", s->ramdisk.sector_size); @@ -1024,10 +597,7 @@ static inline int server_writes_inflight(td_driver_t *driver) { struct tdremus_state *s = (struct tdremus_state *)driver->data; - if (!s->ramdisk.inflight && !s->ramdisk.prev) - return 0; - - return 1; + return ramdisk_writes_inflight(&s->ramdisk); } /* Due to block device prefetching this code may be called on the server side @@ -1067,13 +637,9 @@ void backup_queue_write(td_driver_t *driver, td_request_t treq) static int server_flush(td_driver_t *driver) { struct tdremus_state *s = (struct tdremus_state *)driver->data; - /* - * Nothing to flush in beginning. - */ - if (!s->ramdisk.prev) - return 0; + /* Try to flush any remaining requests */ - return ramdisk_flush(driver, s); + return ramdisk_flush(&s->ramdisk); } static int backup_start(td_driver_t *driver) @@ -1120,7 +686,9 @@ static void server_do_wreq(td_driver_t *driver) if (mread(s->stream_fd.fd, buf, len) < 0) goto err; - if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0) { + if (ramdisk_write_to_hashtable(s->h, *sector, *sectors, + driver->info.sector_size, buf, + "remus") < 0) { rc = ERROR_INTERNAL; goto err; } @@ -1150,7 +718,7 @@ static void server_do_creq(td_driver_t *driver) // RPRINTF("committing buffer\n"); - ramdisk_start_flush(driver); + ramdisk_start_flush(&s->ramdisk, &s->h); /* XXX this message should not be sent until flush completes! */ if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4) @@ -1199,12 +767,7 @@ void unprotected_queue_read(td_driver_t *driver, td_request_t treq) /* wait for previous ramdisk to flush before servicing reads */ if (server_writes_inflight(driver)) { - /* for now lets just return EBUSY. - * if there are any left-over requests in prev, - * kick em again. - */ - if(!s->ramdisk.inflight) /* nothing in inprogress */ - ramdisk_flush(driver, s); + ramdisk_flush(&s->ramdisk); td_complete_request(treq, -EBUSY); } @@ -1222,8 +785,7 @@ void unprotected_queue_write(td_driver_t *driver, td_request_t treq) /* wait for previous ramdisk to flush */ if (server_writes_inflight(driver)) { RPRINTF("queue_write: waiting for queue to drain"); - if(!s->ramdisk.inflight) /* nothing in inprogress. Kick prev */ - ramdisk_flush(driver, s); + ramdisk_flush(&s->ramdisk); td_complete_request(treq, -EBUSY); } else { @@ -1532,9 +1094,8 @@ static int tdremus_close(td_driver_t *driver) struct tdremus_state *s = (struct tdremus_state *)driver->data; RPRINTF("closing\n"); - if (s->ramdisk.inprogress) - hashtable_destroy(s->ramdisk.inprogress, 0); - + ramdisk_destroy(&s->ramdisk); + ramdisk_destroy_hashtable(s->h); td_replication_connect_kill(&s->t); ctl_unregister(s); ctl_close(s); diff --git a/tools/blktap2/drivers/block-replication.c b/tools/blktap2/drivers/block-replication.c index e4b2679..30eba8f 100644 --- a/tools/blktap2/drivers/block-replication.c +++ b/tools/blktap2/drivers/block-replication.c @@ -15,6 +15,10 @@ #include "tapdisk-server.h" #include "block-replication.h" +#include "tapdisk-interface.h" +#include "hashtable.h" +#include "hashtable_itr.h" +#include "hashtable_utility.h" #include <string.h> #include <errno.h> @@ -30,6 +34,8 @@ #define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _f, log_prefix, ## _a) #define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f, log_prefix, ## _a) +#define RAMDISK_HASHSIZE 128 + /* connection status */ enum { connection_none, @@ -466,3 +472,449 @@ static void td_replication_connect_event(event_id_t id, char mode, fail: td_replication_client_failed(t, rc); } + + +/* I/O replication */ +static void replicated_write_callback(td_request_t treq, int err) +{ + ramdisk_t *ramdisk = treq.cb_data; + td_vbd_request_t *vreq = treq.private; + int i; + uint64_t start; + const char *log_prefix = ramdisk->log_prefix; + + /* the write failed for now, lets panic. this is very bad */ + if (err) { + EPRINTF("ramdisk write failed, disk image is not consistent\n"); + exit(-1); + } + + /* + * The write succeeded. let's pull the vreq off whatever request list + * it is on and free() it + */ + list_del(&vreq->next); + free(vreq); + + ramdisk->inflight--; + start = treq.sec; + for (i = 0; i < treq.secs; i++) { + hashtable_remove(ramdisk->inprogress, &start); + start++; + } + free(treq.buf); + + if (!ramdisk->inflight && ramdisk->prev) + ramdisk_flush(ramdisk); +} + +static int +create_write_request(ramdisk_t *ramdisk, td_sector_t sec, int secs, char *buf) +{ + td_request_t treq; + td_vbd_request_t *vreq; + td_vbd_t *vbd = ramdisk->image->private; + + treq.op = TD_OP_WRITE; + treq.buf = buf; + treq.sec = sec; + treq.secs = secs; + treq.image = ramdisk->image; + treq.cb = replicated_write_callback; + treq.cb_data = ramdisk; + treq.id = 0; + treq.sidx = 0; + + vreq = calloc(1, sizeof(td_vbd_request_t)); + treq.private = vreq; + + if(!vreq) + return -1; + + vreq->submitting = 1; + INIT_LIST_HEAD(&vreq->next); + tapdisk_vbd_move_request(treq.private, &vbd->pending_requests); + + td_forward_request(treq); + + vreq->submitting--; + + return 0; +} + +/* http://www.concentric.net/~Ttwang/tech/inthash.htm */ +static unsigned int uint64_hash(void *k) +{ + uint64_t key = *(uint64_t*)k; + + key = (~key) + (key << 18); + key = key ^ (key >> 31); + key = key * 21; + key = key ^ (key >> 11); + key = key + (key << 6); + key = key ^ (key >> 22); + + return (unsigned int)key; +} + +static int rd_hash_equal(void *k1, void *k2) +{ + uint64_t key1, key2; + + key1 = *(uint64_t*)k1; + key2 = *(uint64_t*)k2; + + return key1 == key2; +} + +static int uint64_compare(const void *k1, const void *k2) +{ + uint64_t u1 = *(uint64_t*)k1; + uint64_t u2 = *(uint64_t*)k2; + + /* u1 - u2 is unsigned */ + return u1 < u2 ? -1 : u1 > u2 ? 1 : 0; +} + +/* + * set psectors to an array of the sector numbers in the hash, returning + * the number of entries (or -1 on error) + */ +static int ramdisk_get_sectors(struct hashtable *h, uint64_t **psectors, + const char *log_prefix) +{ + struct hashtable_itr* itr; + uint64_t* sectors; + int count; + + if (!(count = hashtable_count(h))) + return 0; + + if (!(*psectors = malloc(count * sizeof(uint64_t)))) { + DPRINTF("ramdisk_get_sectors: error allocating sector map\n"); + return -1; + } + sectors = *psectors; + + itr = hashtable_iterator(h); + count = 0; + do { + sectors[count++] = *(uint64_t*)hashtable_iterator_key(itr); + } while (hashtable_iterator_advance(itr)); + free(itr); + + return count; +} + +static int ramdisk_write_hash(struct hashtable *h, uint64_t sector, char *buf, + size_t len, const char *log_prefix) +{ + char *v; + uint64_t *key; + + if ((v = hashtable_search(h, §or))) { + memcpy(v, buf, len); + return 0; + } + + if (!(v = malloc(len))) { + DPRINTF("ramdisk_write_hash: malloc failed\n"); + return -1; + } + memcpy(v, buf, len); + if (!(key = malloc(sizeof(*key)))) { + DPRINTF("ramdisk_write_hash: error allocating key\n"); + free(v); + return -1; + } + *key = sector; + if (!hashtable_insert(h, key, v)) { + DPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n", sector); + free(key); + free(v); + return -1; + } + + return 0; +} + +/* + * return -1 for OOM + * return -2 for merge lookup failure(should not happen) + * return -3 for WAW race + * return 0 on success. + */ +static int merge_requests(struct ramdisk *ramdisk, uint64_t start, + size_t count, char **mergedbuf) +{ + char* buf; + char* sector; + int i; + uint64_t *key; + int rc = 0; + const char *log_prefix = ramdisk->log_prefix; + + if (!(buf = valloc(count * ramdisk->sector_size))) { + DPRINTF("merge_request: allocation failed\n"); + return -1; + } + + for (i = 0; i < count; i++) { + if (!(sector = hashtable_search(ramdisk->prev, &start))) { + EPRINTF("merge_request: lookup failed on %"PRIu64"\n", + start); + free(buf); + rc = -2; + goto fail; + } + + /* Check inprogress requests to avoid waw non-determinism */ + if (hashtable_search(ramdisk->inprogress, &start)) { + DPRINTF("merge_request: WAR RACE on %"PRIu64"\n", + start); + free(buf); + rc = -3; + goto fail; + } + + /* + * Insert req into inprogress (brief period of duplication of + * hash entries until they are removed from prev. Read tracking + * would not be reading wrong entries) + */ + if (!(key = malloc(sizeof(*key)))) { + EPRINTF("%s: error allocating key\n", __FUNCTION__); + free(buf); + rc = -1; + goto fail; + } + *key = start; + if (!hashtable_insert(ramdisk->inprogress, key, NULL)) { + EPRINTF("%s failed to insert sector %" PRIu64 " into inprogress hash\n", + __FUNCTION__, start); + free(key); + free(buf); + rc = -1; + goto fail; + } + + memcpy(buf + i * ramdisk->sector_size, sector, ramdisk->sector_size); + start++; + } + + *mergedbuf = buf; + return 0; +fail: + for (start--; i > 0; i--, start--) + hashtable_remove(ramdisk->inprogress, &start); + return rc; +} + +int ramdisk_flush(ramdisk_t *ramdisk) +{ + uint64_t *sectors; + char *buf = NULL; + uint64_t base, batchlen; + int i, j, count = 0; + const char *log_prefix = ramdisk->log_prefix; + + /* everything is in flight */ + if (!ramdisk->prev) + return 0; + + count = ramdisk_get_sectors(ramdisk->prev, §ors, log_prefix); + if (count <= 0) + /* should not happen */ + return count; + + /* Create the inprogress table if empty */ + if (!ramdisk->inprogress) + ramdisk->inprogress = ramdisk_new_hashtable(); + + /* sort and merge sectors to improve disk performance */ + qsort(sectors, count, sizeof(*sectors), uint64_compare); + + for (i = 0; i < count;) { + base = sectors[i++]; + while (i < count && sectors[i] == sectors[i-1] + 1) + i++; + batchlen = sectors[i-1] - base + 1; + + j = merge_requests(ramdisk, base, batchlen, &buf); + if (j) { + EPRINTF("ramdisk_flush: merge_requests failed:%s\n", + j == -1 ? "OOM" : + (j == -2 ? "missing sector" : + "WAW race")); + if (j == -3) + continue; + free(sectors); + return -1; + } + + /* + * NOTE: create_write_request() creates a treq AND forwards + * it down the driver chain + * + * TODO: handle create_write_request()'s error. + */ + create_write_request(ramdisk, base, batchlen, buf); + + ramdisk->inflight++; + + for (j = 0; j < batchlen; j++) { + buf = hashtable_search(ramdisk->prev, &base); + free(buf); + hashtable_remove(ramdisk->prev, &base); + base++; + } + } + + if (!hashtable_count(ramdisk->prev)) { + /* everything is in flight */ + hashtable_destroy(ramdisk->prev, 0); + ramdisk->prev = NULL; + } + + free(sectors); + return 0; +} + +int ramdisk_start_flush(ramdisk_t *ramdisk, struct hashtable **new) +{ + uint64_t *key; + char *buf; + int rc = 0; + int i, j, count, batchlen; + uint64_t *sectors; + const char *log_prefix = ramdisk->log_prefix; + + if (!hashtable_count(*new)) + return 0; + + if (ramdisk->prev) { + /* + * a flush request issued while a previous flush is still in + * progress will merge with the previous request. If you want + * the previous request to be consistent, wait for it to + * complete. + */ + count = ramdisk_get_sectors(*new, §ors, log_prefix); + if (count < 0 ) + return count; + + for (i = 0; i < count; i++) { + buf = hashtable_search(*new, sectors + i); + ramdisk_write_hash(ramdisk->prev, sectors[i], buf, + ramdisk->sector_size, log_prefix); + } + free(sectors); + + hashtable_destroy(*new, 1); + } else + ramdisk->prev = *new; + + /* + * We create a new hashtable so that new writes can be performed before + * the old hashtable is completely drained. + */ + *new = ramdisk_new_hashtable(); + + return ramdisk_flush(ramdisk); +} + +void ramdisk_init(ramdisk_t *ramdisk) +{ + ramdisk->inflight = 0; + ramdisk->prev = NULL; + ramdisk->inprogress = NULL; +} + +void ramdisk_destroy(ramdisk_t *ramdisk) +{ + const char *log_prefix = ramdisk->log_prefix; + + /* + * ramdisk_destroy() is called only when we will close the tapdisk image. + * In this case, there are no pending requests in vbd. + * + * If ramdisk->inflight is not 0, it means that the requests created by + * us are still in vbd->pending_requests. + */ + if (ramdisk->inflight) { + /* should not happen */ + EPRINTF("cannot destroy ramdisk\n"); + return; + } + + if (ramdisk->inprogress) { + hashtable_destroy(ramdisk->inprogress, 0); + ramdisk->inprogress = NULL; + } + + if (ramdisk->prev) { + hashtable_destroy(ramdisk->prev, 1); + ramdisk->prev = NULL; + } +} + +int ramdisk_writes_inflight(ramdisk_t *ramdisk) +{ + if (!ramdisk->inflight && !ramdisk->prev) + return 0; + + return 1; +} + +int ramdisk_read(struct ramdisk *ramdisk, uint64_t sector, + int nb_sectors, char *buf) +{ + int i; + char *v; + uint64_t key; + + for (i = 0; i < nb_sectors; i++) { + key = sector + i; + /* check whether it is queued in a previous flush request */ + if (!(ramdisk->prev && + (v = hashtable_search(ramdisk->prev, &key)))) { + /* check whether it is an ongoing flush */ + if (!(ramdisk->inprogress && + (v = hashtable_search(ramdisk->inprogress, &key)))) + return -1; + } + memcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size); + } + + return 0; +} + +struct hashtable *ramdisk_new_hashtable(void) +{ + return create_hashtable(RAMDISK_HASHSIZE, uint64_hash, rd_hash_equal); +} + +int ramdisk_write_to_hashtable(struct hashtable *h, uint64_t sector, + int nb_sectors, size_t sector_size, char* buf, + const char *log_prefix) +{ + int i, rc; + + for (i = 0; i < nb_sectors; i++) { + rc = ramdisk_write_hash(h, sector + i, + buf + i * sector_size, + sector_size, log_prefix); + if (rc) + return rc; + } + + return 0; +} + +void ramdisk_destroy_hashtable(struct hashtable *h) +{ + if (!h) + return; + + hashtable_destroy(h, 1); +} diff --git a/tools/blktap2/drivers/block-replication.h b/tools/blktap2/drivers/block-replication.h index 0bd6e71..fdc216e 100644 --- a/tools/blktap2/drivers/block-replication.h +++ b/tools/blktap2/drivers/block-replication.h @@ -110,4 +110,52 @@ int td_replication_server_restart(td_replication_connect_t *t); */ int td_replication_client_start(td_replication_connect_t *t); +/* I/O replication */ +typedef struct ramdisk ramdisk_t; +struct ramdisk { + size_t sector_size; + const char *log_prefix; + td_image_t *image; + + /* private */ + /* count of outstanding requests to the base driver */ + size_t inflight; + /* prev holds the requests to be flushed, while inprogress holds + * requests being flushed. When requests complete, they are removed + * from inprogress. + * Whenever a new flush is merged with ongoing flush (i.e, prev), + * we have to make sure that none of the new requests overlap with + * ones in "inprogress". If it does, keep it back in prev and dont issue + * IO until the current one finishes. If we allow this IO to proceed, + * we might end up with two "overlapping" requests in the disk's queue and + * the disk may not offer any guarantee on which one is written first. + * IOW, make sure we dont create a write-after-write time ordering constraint. + */ + struct hashtable *prev; + struct hashtable *inprogress; +}; + +void ramdisk_init(ramdisk_t *ramdisk); +void ramdisk_destroy(ramdisk_t *ramdisk); + +/* flush pending contents to disk */ +int ramdisk_flush(ramdisk_t *ramdisk); +/* flush new contents to disk */ +int ramdisk_start_flush(ramdisk_t *ramdisk, struct hashtable **new); +int ramdisk_writes_inflight(ramdisk_t *ramdisk); + +/* + * try to read from ramdisk. Return -1 if some sectors are not in + * ramdisk. Otherwise, return 0. + */ +int ramdisk_read(struct ramdisk *ramdisk, uint64_t sector, + int nb_sectors, char *buf); + +/* create a new hashtable that can be used by ramdisk */ +struct hashtable *ramdisk_new_hashtable(void); +int ramdisk_write_to_hashtable(struct hashtable *h, uint64_t sector, + int nb_sectors, size_t sector_size, char* buf, + const char *log_prefix); +void ramdisk_destroy_hashtable(struct hashtable *h); + #endif -- 1.9.3 _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxx http://lists.xen.org/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |