|
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [RFC Patch v3 16/22] blktap2: move ramdisk related codes to block-replication.c
COLO will reuse them
Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
Cc: Shriram Rajagopalan <rshriram@xxxxxxxxx>
---
tools/blktap2/drivers/block-remus.c | 485 ++----------------------------
tools/blktap2/drivers/block-replication.c | 452 ++++++++++++++++++++++++++++
tools/blktap2/drivers/block-replication.h | 48 +++
3 files changed, 523 insertions(+), 462 deletions(-)
diff --git a/tools/blktap2/drivers/block-remus.c
b/tools/blktap2/drivers/block-remus.c
index 8b6f157..2713af1 100644
--- a/tools/blktap2/drivers/block-remus.c
+++ b/tools/blktap2/drivers/block-remus.c
@@ -37,9 +37,6 @@
#include "tapdisk-server.h"
#include "tapdisk-driver.h"
#include "tapdisk-interface.h"
-#include "hashtable.h"
-#include "hashtable_itr.h"
-#include "hashtable_utility.h"
#include "block-replication.h"
#include <errno.h>
@@ -58,7 +55,6 @@
/* timeout for reads and writes in ms */
#define HEARTBEAT_MS 1000
-#define RAMDISK_HASHSIZE 128
/* connect retry timeout (seconds) */
#define REMUS_CONNRETRY_TIMEOUT 1
@@ -97,51 +93,6 @@ td_vbd_t *device_vbd = NULL;
td_image_t *remus_image = NULL;
struct tap_disk tapdisk_remus;
-struct ramdisk {
- size_t sector_size;
- struct hashtable* h;
- /* when a ramdisk is flushed, h is given a new empty hash for writes
- * while the old ramdisk (prev) is drained asynchronously.
- */
- struct hashtable* prev;
- /* count of outstanding requests to the base driver */
- size_t inflight;
- /* prev holds the requests to be flushed, while inprogress holds
- * requests being flushed. When requests complete, they are removed
- * from inprogress.
- * Whenever a new flush is merged with ongoing flush (i.e, prev),
- * we have to make sure that none of the new requests overlap with
- * ones in "inprogress". If it does, keep it back in prev and dont issue
- * IO until the current one finishes. If we allow this IO to proceed,
- * we might end up with two "overlapping" requests in the disk's queue
and
- * the disk may not offer any guarantee on which one is written first.
- * IOW, make sure we dont create a write-after-write time ordering
constraint.
- *
- */
- struct hashtable* inprogress;
-};
-
-/* the ramdisk intercepts the original callback for reads and writes.
- * This holds the original data. */
-/* Might be worth making this a static array in struct ramdisk to avoid
- * a malloc per request */
-
-struct tdremus_state;
-
-struct ramdisk_cbdata {
- td_callback_t cb;
- void* private;
- char* buf;
- struct tdremus_state* state;
-};
-
-struct ramdisk_write_cbdata {
- struct tdremus_state* state;
- char* buf;
-};
-
-typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
-
typedef struct poll_fd {
int fd;
event_id_t id;
@@ -167,8 +118,14 @@ struct tdremus_state {
*/
struct req_ring queued_io;
- /* ramdisk data*/
+ /* ramdisk data */
struct ramdisk ramdisk;
+ /*
+ * The primary write request is queued in this
+ * hashtable, and will be flushed to ramdisk when
+ * the checkpoint finishes.
+ */
+ struct hashtable *h;
/* mode methods */
enum tdremus_mode mode;
@@ -239,404 +196,20 @@ static void ring_add_request(struct req_ring *ring,
const td_request_t *treq)
ring->prod = ring_next(ring->prod);
}
-/* Prototype declarations */
-static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s);
-
-/* functions to create and sumbit treq's */
-
-static void
-replicated_write_callback(td_request_t treq, int err)
-{
- struct tdremus_state *s = (struct tdremus_state *) treq.cb_data;
- td_vbd_request_t *vreq;
- int i;
- uint64_t start;
- vreq = (td_vbd_request_t *) treq.private;
-
- /* the write failed for now, lets panic. this is very bad */
- if (err) {
- RPRINTF("ramdisk write failed, disk image is not consistent\n");
- exit(-1);
- }
-
- /* The write succeeded. let's pull the vreq off whatever request list
- * it is on and free() it */
- list_del(&vreq->next);
- free(vreq);
-
- s->ramdisk.inflight--;
- start = treq.sec;
- for (i = 0; i < treq.secs; i++) {
- hashtable_remove(s->ramdisk.inprogress, &start);
- start++;
- }
- free(treq.buf);
-
- if (!s->ramdisk.inflight && !s->ramdisk.prev) {
- /* TODO: the ramdisk has been flushed */
- }
-}
-
-static inline int
-create_write_request(struct tdremus_state *state, td_sector_t sec, int secs,
char *buf)
-{
- td_request_t treq;
- td_vbd_request_t *vreq;
-
- treq.op = TD_OP_WRITE;
- treq.buf = buf;
- treq.sec = sec;
- treq.secs = secs;
- treq.image = remus_image;
- treq.cb = replicated_write_callback;
- treq.cb_data = state;
- treq.id = 0;
- treq.sidx = 0;
-
- vreq = calloc(1, sizeof(td_vbd_request_t));
- treq.private = vreq;
-
- if(!vreq)
- return -1;
-
- vreq->submitting = 1;
- INIT_LIST_HEAD(&vreq->next);
- tapdisk_vbd_move_request(treq.private, &device_vbd->pending_requests);
-
- /* TODO:
- * we should probably leave it up to the caller to forward the request
*/
- td_forward_request(treq);
-
- vreq->submitting--;
-
- return 0;
-}
-
-
-/* http://www.concentric.net/~Ttwang/tech/inthash.htm */
-static unsigned int uint64_hash(void* k)
-{
- uint64_t key = *(uint64_t*)k;
-
- key = (~key) + (key << 18);
- key = key ^ (key >> 31);
- key = key * 21;
- key = key ^ (key >> 11);
- key = key + (key << 6);
- key = key ^ (key >> 22);
-
- return (unsigned int)key;
-}
-
-static int rd_hash_equal(void* k1, void* k2)
-{
- uint64_t key1, key2;
-
- key1 = *(uint64_t*)k1;
- key2 = *(uint64_t*)k2;
-
- return key1 == key2;
-}
-
-static int ramdisk_read(struct ramdisk* ramdisk, uint64_t sector,
- int nb_sectors, char* buf)
-{
- int i;
- char* v;
- uint64_t key;
-
- for (i = 0; i < nb_sectors; i++) {
- key = sector + i;
- /* check whether it is queued in a previous flush request */
- if (!(ramdisk->prev && (v = hashtable_search(ramdisk->prev,
&key)))) {
- /* check whether it is an ongoing flush */
- if (!(ramdisk->inprogress && (v =
hashtable_search(ramdisk->inprogress, &key))))
- return -1;
- }
- memcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size);
- }
-
- return 0;
-}
-
-static int ramdisk_write_hash(struct hashtable* h, uint64_t sector, char* buf,
- size_t len)
-{
- char* v;
- uint64_t* key;
-
- if ((v = hashtable_search(h, §or))) {
- memcpy(v, buf, len);
- return 0;
- }
-
- if (!(v = malloc(len))) {
- DPRINTF("ramdisk_write_hash: malloc failed\n");
- return -1;
- }
- memcpy(v, buf, len);
- if (!(key = malloc(sizeof(*key)))) {
- DPRINTF("ramdisk_write_hash: error allocating key\n");
- free(v);
- return -1;
- }
- *key = sector;
- if (!hashtable_insert(h, key, v)) {
- DPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n",
sector);
- free(key);
- free(v);
- return -1;
- }
-
- return 0;
-}
-
-static inline int ramdisk_write(struct ramdisk* ramdisk, uint64_t sector,
- int nb_sectors, char* buf)
-{
- int i, rc;
-
- for (i = 0; i < nb_sectors; i++) {
- rc = ramdisk_write_hash(ramdisk->h, sector + i,
- buf + i * ramdisk->sector_size,
- ramdisk->sector_size);
- if (rc)
- return rc;
- }
-
- return 0;
-}
-
-static int uint64_compare(const void* k1, const void* k2)
-{
- uint64_t u1 = *(uint64_t*)k1;
- uint64_t u2 = *(uint64_t*)k2;
-
- /* u1 - u2 is unsigned */
- return u1 < u2 ? -1 : u1 > u2 ? 1 : 0;
-}
-
-/* set psectors to an array of the sector numbers in the hash, returning
- * the number of entries (or -1 on error) */
-static int ramdisk_get_sectors(struct hashtable* h, uint64_t** psectors)
-{
- struct hashtable_itr* itr;
- uint64_t* sectors;
- int count;
-
- if (!(count = hashtable_count(h)))
- return 0;
-
- if (!(*psectors = malloc(count * sizeof(uint64_t)))) {
- DPRINTF("ramdisk_get_sectors: error allocating sector map\n");
- return -1;
- }
- sectors = *psectors;
-
- itr = hashtable_iterator(h);
- count = 0;
- do {
- sectors[count++] = *(uint64_t*)hashtable_iterator_key(itr);
- } while (hashtable_iterator_advance(itr));
- free(itr);
-
- return count;
-}
-
-/*
- return -1 for OOM
- return -2 for merge lookup failure
- return -3 for WAW race
- return 0 on success.
-*/
-static int merge_requests(struct ramdisk* ramdisk, uint64_t start,
- size_t count, char **mergedbuf)
-{
- char* buf;
- char* sector;
- int i;
- uint64_t *key;
- int rc = 0;
-
- if (!(buf = valloc(count * ramdisk->sector_size))) {
- DPRINTF("merge_request: allocation failed\n");
- return -1;
- }
-
- for (i = 0; i < count; i++) {
- if (!(sector = hashtable_search(ramdisk->prev, &start))) {
- DPRINTF("merge_request: lookup failed on %"PRIu64"\n",
start);
- free(buf);
- rc = -2;
- goto fail;
- }
-
- /* Check inprogress requests to avoid waw non-determinism */
- if (hashtable_search(ramdisk->inprogress, &start)) {
- DPRINTF("merge_request: WAR RACE on %"PRIu64"\n",
start);
- free(buf);
- rc = -3;
- goto fail;
- }
- /* Insert req into inprogress (brief period of duplication of
hash entries until
- * they are removed from prev. Read tracking would not be
reading wrong entries)
- */
- if (!(key = malloc(sizeof(*key)))) {
- DPRINTF("%s: error allocating key\n", __FUNCTION__);
- free(buf);
- rc = -1;
- goto fail;
- }
- *key = start;
- if (!hashtable_insert(ramdisk->inprogress, key, NULL)) {
- DPRINTF("%s failed to insert sector %" PRIu64 " into
inprogress hash\n",
- __FUNCTION__, start);
- free(key);
- free(buf);
- rc = -1;
- goto fail;
- }
- memcpy(buf + i * ramdisk->sector_size, sector,
ramdisk->sector_size);
- start++;
- }
-
- *mergedbuf = buf;
- return 0;
-fail:
- for (start--; i >0; i--, start--)
- hashtable_remove(ramdisk->inprogress, &start);
- return rc;
-}
-
-/* The underlying driver may not handle having the whole ramdisk queued at
- * once. We queue what we can and let the callbacks attempt to queue more. */
-/* NOTE: may be called from callback, while dd->private still belongs to
- * the underlying driver */
-static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s)
-{
- uint64_t* sectors;
- char* buf = NULL;
- uint64_t base, batchlen;
- int i, j, count = 0;
-
- // RPRINTF("ramdisk flush\n");
-
- if ((count = ramdisk_get_sectors(s->ramdisk.prev, §ors)) <= 0)
- return count;
-
- /* Create the inprogress table if empty */
- if (!s->ramdisk.inprogress)
- s->ramdisk.inprogress = create_hashtable(RAMDISK_HASHSIZE,
- uint64_hash,
- rd_hash_equal);
-
- /*
- RPRINTF("ramdisk: flushing %d sectors\n", count);
- */
-
- /* sort and merge sectors to improve disk performance */
- qsort(sectors, count, sizeof(*sectors), uint64_compare);
-
- for (i = 0; i < count;) {
- base = sectors[i++];
- while (i < count && sectors[i] == sectors[i-1] + 1)
- i++;
- batchlen = sectors[i-1] - base + 1;
-
- j = merge_requests(&s->ramdisk, base, batchlen, &buf);
-
- if (j) {
- RPRINTF("ramdisk_flush: merge_requests failed:%s\n",
- j == -1? "OOM": (j==-2? "missing sector" : "WAW
race"));
- if (j == -3) continue;
- free(sectors);
- return -1;
- }
-
- /* NOTE: create_write_request() creates a treq AND forwards it
down
- * the driver chain */
- // RPRINTF("forwarding write request at %" PRIu64 ", length: %"
PRIu64 "\n", base, batchlen);
- create_write_request(s, base, batchlen, buf);
- //RPRINTF("write request at %" PRIu64 ", length: %" PRIu64 "
forwarded\n", base, batchlen);
-
- s->ramdisk.inflight++;
-
- for (j = 0; j < batchlen; j++) {
- buf = hashtable_search(s->ramdisk.prev, &base);
- free(buf);
- hashtable_remove(s->ramdisk.prev, &base);
- base++;
- }
- }
-
- if (!hashtable_count(s->ramdisk.prev)) {
- /* everything is in flight */
- hashtable_destroy(s->ramdisk.prev, 0);
- s->ramdisk.prev = NULL;
- }
-
- free(sectors);
-
- // RPRINTF("ramdisk flush done\n");
- return 0;
-}
-
-/* flush ramdisk contents to disk */
-static int ramdisk_start_flush(td_driver_t *driver)
-{
- struct tdremus_state *s = (struct tdremus_state *)driver->data;
- uint64_t* key;
- char* buf;
- int rc = 0;
- int i, j, count, batchlen;
- uint64_t* sectors;
-
- if (!hashtable_count(s->ramdisk.h)) {
- /*
- RPRINTF("Nothing to flush\n");
- */
- return 0;
- }
-
- if (s->ramdisk.prev) {
- /* a flush request issued while a previous flush is still in
progress
- * will merge with the previous request. If you want the
previous
- * request to be consistent, wait for it to complete. */
- if ((count = ramdisk_get_sectors(s->ramdisk.h, §ors)) < 0)
- return count;
-
- for (i = 0; i < count; i++) {
- buf = hashtable_search(s->ramdisk.h, sectors + i);
- ramdisk_write_hash(s->ramdisk.prev, sectors[i], buf,
- s->ramdisk.sector_size);
- }
- free(sectors);
-
- hashtable_destroy (s->ramdisk.h, 1);
- } else
- s->ramdisk.prev = s->ramdisk.h;
-
- /* We create a new hashtable so that new writes can be performed before
- * the old hashtable is completely drained. */
- s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
- rd_hash_equal);
-
- return ramdisk_flush(driver, s);
-}
-
-
static int ramdisk_start(td_driver_t *driver)
{
struct tdremus_state *s = (struct tdremus_state *)driver->data;
- if (s->ramdisk.h) {
+ if (s->h) {
RPRINTF("ramdisk already allocated\n");
return 0;
}
s->ramdisk.sector_size = driver->info.sector_size;
- s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
- rd_hash_equal);
+ s->ramdisk.log_prefix = "remus";
+ s->ramdisk.image = remus_image;
+ ramdisk_init(&s->ramdisk);
+ s->h = ramdisk_new_hashtable();
DPRINTF("Ramdisk started, %zu bytes/sector\n", s->ramdisk.sector_size);
@@ -1024,10 +597,7 @@ static inline int server_writes_inflight(td_driver_t
*driver)
{
struct tdremus_state *s = (struct tdremus_state *)driver->data;
- if (!s->ramdisk.inflight && !s->ramdisk.prev)
- return 0;
-
- return 1;
+ return ramdisk_writes_inflight(&s->ramdisk);
}
/* Due to block device prefetching this code may be called on the server side
@@ -1067,13 +637,9 @@ void backup_queue_write(td_driver_t *driver, td_request_t
treq)
static int server_flush(td_driver_t *driver)
{
struct tdremus_state *s = (struct tdremus_state *)driver->data;
- /*
- * Nothing to flush in beginning.
- */
- if (!s->ramdisk.prev)
- return 0;
+
/* Try to flush any remaining requests */
- return ramdisk_flush(driver, s);
+ return ramdisk_flush(&s->ramdisk);
}
static int backup_start(td_driver_t *driver)
@@ -1120,7 +686,9 @@ static void server_do_wreq(td_driver_t *driver)
if (mread(s->stream_fd.fd, buf, len) < 0)
goto err;
- if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0) {
+ if (ramdisk_write_to_hashtable(s->h, *sector, *sectors,
+ driver->info.sector_size, buf,
+ "remus") < 0) {
rc = ERROR_INTERNAL;
goto err;
}
@@ -1150,7 +718,7 @@ static void server_do_creq(td_driver_t *driver)
// RPRINTF("committing buffer\n");
- ramdisk_start_flush(driver);
+ ramdisk_start_flush(&s->ramdisk, &s->h);
/* XXX this message should not be sent until flush completes! */
if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4)
@@ -1199,12 +767,7 @@ void unprotected_queue_read(td_driver_t *driver,
td_request_t treq)
/* wait for previous ramdisk to flush before servicing reads */
if (server_writes_inflight(driver)) {
- /* for now lets just return EBUSY.
- * if there are any left-over requests in prev,
- * kick em again.
- */
- if(!s->ramdisk.inflight) /* nothing in inprogress */
- ramdisk_flush(driver, s);
+ ramdisk_flush(&s->ramdisk);
td_complete_request(treq, -EBUSY);
}
@@ -1222,8 +785,7 @@ void unprotected_queue_write(td_driver_t *driver,
td_request_t treq)
/* wait for previous ramdisk to flush */
if (server_writes_inflight(driver)) {
RPRINTF("queue_write: waiting for queue to drain");
- if(!s->ramdisk.inflight) /* nothing in inprogress. Kick prev */
- ramdisk_flush(driver, s);
+ ramdisk_flush(&s->ramdisk);
td_complete_request(treq, -EBUSY);
}
else {
@@ -1532,9 +1094,8 @@ static int tdremus_close(td_driver_t *driver)
struct tdremus_state *s = (struct tdremus_state *)driver->data;
RPRINTF("closing\n");
- if (s->ramdisk.inprogress)
- hashtable_destroy(s->ramdisk.inprogress, 0);
-
+ ramdisk_destroy(&s->ramdisk);
+ ramdisk_destroy_hashtable(s->h);
td_replication_connect_kill(&s->t);
ctl_unregister(s);
ctl_close(s);
diff --git a/tools/blktap2/drivers/block-replication.c
b/tools/blktap2/drivers/block-replication.c
index e4b2679..30eba8f 100644
--- a/tools/blktap2/drivers/block-replication.c
+++ b/tools/blktap2/drivers/block-replication.c
@@ -15,6 +15,10 @@
#include "tapdisk-server.h"
#include "block-replication.h"
+#include "tapdisk-interface.h"
+#include "hashtable.h"
+#include "hashtable_itr.h"
+#include "hashtable_utility.h"
#include <string.h>
#include <errno.h>
@@ -30,6 +34,8 @@
#define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _f, log_prefix, ## _a)
#define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f, log_prefix, ## _a)
+#define RAMDISK_HASHSIZE 128
+
/* connection status */
enum {
connection_none,
@@ -466,3 +472,449 @@ static void td_replication_connect_event(event_id_t id,
char mode,
fail:
td_replication_client_failed(t, rc);
}
+
+
+/* I/O replication */
+static void replicated_write_callback(td_request_t treq, int err)
+{
+ ramdisk_t *ramdisk = treq.cb_data;
+ td_vbd_request_t *vreq = treq.private;
+ int i;
+ uint64_t start;
+ const char *log_prefix = ramdisk->log_prefix;
+
+ /* the write failed for now, lets panic. this is very bad */
+ if (err) {
+ EPRINTF("ramdisk write failed, disk image is not consistent\n");
+ exit(-1);
+ }
+
+ /*
+ * The write succeeded. let's pull the vreq off whatever request list
+ * it is on and free() it
+ */
+ list_del(&vreq->next);
+ free(vreq);
+
+ ramdisk->inflight--;
+ start = treq.sec;
+ for (i = 0; i < treq.secs; i++) {
+ hashtable_remove(ramdisk->inprogress, &start);
+ start++;
+ }
+ free(treq.buf);
+
+ if (!ramdisk->inflight && ramdisk->prev)
+ ramdisk_flush(ramdisk);
+}
+
+static int
+create_write_request(ramdisk_t *ramdisk, td_sector_t sec, int secs, char *buf)
+{
+ td_request_t treq;
+ td_vbd_request_t *vreq;
+ td_vbd_t *vbd = ramdisk->image->private;
+
+ treq.op = TD_OP_WRITE;
+ treq.buf = buf;
+ treq.sec = sec;
+ treq.secs = secs;
+ treq.image = ramdisk->image;
+ treq.cb = replicated_write_callback;
+ treq.cb_data = ramdisk;
+ treq.id = 0;
+ treq.sidx = 0;
+
+ vreq = calloc(1, sizeof(td_vbd_request_t));
+ treq.private = vreq;
+
+ if(!vreq)
+ return -1;
+
+ vreq->submitting = 1;
+ INIT_LIST_HEAD(&vreq->next);
+ tapdisk_vbd_move_request(treq.private, &vbd->pending_requests);
+
+ td_forward_request(treq);
+
+ vreq->submitting--;
+
+ return 0;
+}
+
+/* http://www.concentric.net/~Ttwang/tech/inthash.htm */
+static unsigned int uint64_hash(void *k)
+{
+ uint64_t key = *(uint64_t*)k;
+
+ key = (~key) + (key << 18);
+ key = key ^ (key >> 31);
+ key = key * 21;
+ key = key ^ (key >> 11);
+ key = key + (key << 6);
+ key = key ^ (key >> 22);
+
+ return (unsigned int)key;
+}
+
+static int rd_hash_equal(void *k1, void *k2)
+{
+ uint64_t key1, key2;
+
+ key1 = *(uint64_t*)k1;
+ key2 = *(uint64_t*)k2;
+
+ return key1 == key2;
+}
+
+static int uint64_compare(const void *k1, const void *k2)
+{
+ uint64_t u1 = *(uint64_t*)k1;
+ uint64_t u2 = *(uint64_t*)k2;
+
+ /* u1 - u2 is unsigned */
+ return u1 < u2 ? -1 : u1 > u2 ? 1 : 0;
+}
+
+/*
+ * set psectors to an array of the sector numbers in the hash, returning
+ * the number of entries (or -1 on error)
+ */
+static int ramdisk_get_sectors(struct hashtable *h, uint64_t **psectors,
+ const char *log_prefix)
+{
+ struct hashtable_itr* itr;
+ uint64_t* sectors;
+ int count;
+
+ if (!(count = hashtable_count(h)))
+ return 0;
+
+ if (!(*psectors = malloc(count * sizeof(uint64_t)))) {
+ DPRINTF("ramdisk_get_sectors: error allocating sector map\n");
+ return -1;
+ }
+ sectors = *psectors;
+
+ itr = hashtable_iterator(h);
+ count = 0;
+ do {
+ sectors[count++] = *(uint64_t*)hashtable_iterator_key(itr);
+ } while (hashtable_iterator_advance(itr));
+ free(itr);
+
+ return count;
+}
+
+static int ramdisk_write_hash(struct hashtable *h, uint64_t sector, char *buf,
+ size_t len, const char *log_prefix)
+{
+ char *v;
+ uint64_t *key;
+
+ if ((v = hashtable_search(h, §or))) {
+ memcpy(v, buf, len);
+ return 0;
+ }
+
+ if (!(v = malloc(len))) {
+ DPRINTF("ramdisk_write_hash: malloc failed\n");
+ return -1;
+ }
+ memcpy(v, buf, len);
+ if (!(key = malloc(sizeof(*key)))) {
+ DPRINTF("ramdisk_write_hash: error allocating key\n");
+ free(v);
+ return -1;
+ }
+ *key = sector;
+ if (!hashtable_insert(h, key, v)) {
+ DPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n",
sector);
+ free(key);
+ free(v);
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ * return -1 for OOM
+ * return -2 for merge lookup failure(should not happen)
+ * return -3 for WAW race
+ * return 0 on success.
+ */
+static int merge_requests(struct ramdisk *ramdisk, uint64_t start,
+ size_t count, char **mergedbuf)
+{
+ char* buf;
+ char* sector;
+ int i;
+ uint64_t *key;
+ int rc = 0;
+ const char *log_prefix = ramdisk->log_prefix;
+
+ if (!(buf = valloc(count * ramdisk->sector_size))) {
+ DPRINTF("merge_request: allocation failed\n");
+ return -1;
+ }
+
+ for (i = 0; i < count; i++) {
+ if (!(sector = hashtable_search(ramdisk->prev, &start))) {
+ EPRINTF("merge_request: lookup failed on %"PRIu64"\n",
+ start);
+ free(buf);
+ rc = -2;
+ goto fail;
+ }
+
+ /* Check inprogress requests to avoid waw non-determinism */
+ if (hashtable_search(ramdisk->inprogress, &start)) {
+ DPRINTF("merge_request: WAR RACE on %"PRIu64"\n",
+ start);
+ free(buf);
+ rc = -3;
+ goto fail;
+ }
+
+ /*
+ * Insert req into inprogress (brief period of duplication of
+ * hash entries until they are removed from prev. Read tracking
+ * would not be reading wrong entries)
+ */
+ if (!(key = malloc(sizeof(*key)))) {
+ EPRINTF("%s: error allocating key\n", __FUNCTION__);
+ free(buf);
+ rc = -1;
+ goto fail;
+ }
+ *key = start;
+ if (!hashtable_insert(ramdisk->inprogress, key, NULL)) {
+ EPRINTF("%s failed to insert sector %" PRIu64 " into
inprogress hash\n",
+ __FUNCTION__, start);
+ free(key);
+ free(buf);
+ rc = -1;
+ goto fail;
+ }
+
+ memcpy(buf + i * ramdisk->sector_size, sector,
ramdisk->sector_size);
+ start++;
+ }
+
+ *mergedbuf = buf;
+ return 0;
+fail:
+ for (start--; i > 0; i--, start--)
+ hashtable_remove(ramdisk->inprogress, &start);
+ return rc;
+}
+
+int ramdisk_flush(ramdisk_t *ramdisk)
+{
+ uint64_t *sectors;
+ char *buf = NULL;
+ uint64_t base, batchlen;
+ int i, j, count = 0;
+ const char *log_prefix = ramdisk->log_prefix;
+
+ /* everything is in flight */
+ if (!ramdisk->prev)
+ return 0;
+
+ count = ramdisk_get_sectors(ramdisk->prev, §ors, log_prefix);
+ if (count <= 0)
+ /* should not happen */
+ return count;
+
+ /* Create the inprogress table if empty */
+ if (!ramdisk->inprogress)
+ ramdisk->inprogress = ramdisk_new_hashtable();
+
+ /* sort and merge sectors to improve disk performance */
+ qsort(sectors, count, sizeof(*sectors), uint64_compare);
+
+ for (i = 0; i < count;) {
+ base = sectors[i++];
+ while (i < count && sectors[i] == sectors[i-1] + 1)
+ i++;
+ batchlen = sectors[i-1] - base + 1;
+
+ j = merge_requests(ramdisk, base, batchlen, &buf);
+ if (j) {
+ EPRINTF("ramdisk_flush: merge_requests failed:%s\n",
+ j == -1 ? "OOM" :
+ (j == -2 ? "missing sector" :
+ "WAW race"));
+ if (j == -3)
+ continue;
+ free(sectors);
+ return -1;
+ }
+
+ /*
+ * NOTE: create_write_request() creates a treq AND forwards
+ * it down the driver chain
+ *
+ * TODO: handle create_write_request()'s error.
+ */
+ create_write_request(ramdisk, base, batchlen, buf);
+
+ ramdisk->inflight++;
+
+ for (j = 0; j < batchlen; j++) {
+ buf = hashtable_search(ramdisk->prev, &base);
+ free(buf);
+ hashtable_remove(ramdisk->prev, &base);
+ base++;
+ }
+ }
+
+ if (!hashtable_count(ramdisk->prev)) {
+ /* everything is in flight */
+ hashtable_destroy(ramdisk->prev, 0);
+ ramdisk->prev = NULL;
+ }
+
+ free(sectors);
+ return 0;
+}
+
+int ramdisk_start_flush(ramdisk_t *ramdisk, struct hashtable **new)
+{
+ uint64_t *key;
+ char *buf;
+ int rc = 0;
+ int i, j, count, batchlen;
+ uint64_t *sectors;
+ const char *log_prefix = ramdisk->log_prefix;
+
+ if (!hashtable_count(*new))
+ return 0;
+
+ if (ramdisk->prev) {
+ /*
+ * a flush request issued while a previous flush is still in
+ * progress will merge with the previous request. If you want
+ * the previous request to be consistent, wait for it to
+ * complete.
+ */
+ count = ramdisk_get_sectors(*new, §ors, log_prefix);
+ if (count < 0 )
+ return count;
+
+ for (i = 0; i < count; i++) {
+ buf = hashtable_search(*new, sectors + i);
+ ramdisk_write_hash(ramdisk->prev, sectors[i], buf,
+ ramdisk->sector_size, log_prefix);
+ }
+ free(sectors);
+
+ hashtable_destroy(*new, 1);
+ } else
+ ramdisk->prev = *new;
+
+ /*
+ * We create a new hashtable so that new writes can be performed before
+ * the old hashtable is completely drained.
+ */
+ *new = ramdisk_new_hashtable();
+
+ return ramdisk_flush(ramdisk);
+}
+
+void ramdisk_init(ramdisk_t *ramdisk)
+{
+ ramdisk->inflight = 0;
+ ramdisk->prev = NULL;
+ ramdisk->inprogress = NULL;
+}
+
+void ramdisk_destroy(ramdisk_t *ramdisk)
+{
+ const char *log_prefix = ramdisk->log_prefix;
+
+ /*
+ * ramdisk_destroy() is called only when we will close the tapdisk
image.
+ * In this case, there are no pending requests in vbd.
+ *
+ * If ramdisk->inflight is not 0, it means that the requests created by
+ * us are still in vbd->pending_requests.
+ */
+ if (ramdisk->inflight) {
+ /* should not happen */
+ EPRINTF("cannot destroy ramdisk\n");
+ return;
+ }
+
+ if (ramdisk->inprogress) {
+ hashtable_destroy(ramdisk->inprogress, 0);
+ ramdisk->inprogress = NULL;
+ }
+
+ if (ramdisk->prev) {
+ hashtable_destroy(ramdisk->prev, 1);
+ ramdisk->prev = NULL;
+ }
+}
+
+int ramdisk_writes_inflight(ramdisk_t *ramdisk)
+{
+ if (!ramdisk->inflight && !ramdisk->prev)
+ return 0;
+
+ return 1;
+}
+
+int ramdisk_read(struct ramdisk *ramdisk, uint64_t sector,
+ int nb_sectors, char *buf)
+{
+ int i;
+ char *v;
+ uint64_t key;
+
+ for (i = 0; i < nb_sectors; i++) {
+ key = sector + i;
+ /* check whether it is queued in a previous flush request */
+ if (!(ramdisk->prev &&
+ (v = hashtable_search(ramdisk->prev, &key)))) {
+ /* check whether it is an ongoing flush */
+ if (!(ramdisk->inprogress &&
+ (v = hashtable_search(ramdisk->inprogress, &key))))
+ return -1;
+ }
+ memcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size);
+ }
+
+ return 0;
+}
+
+struct hashtable *ramdisk_new_hashtable(void)
+{
+ return create_hashtable(RAMDISK_HASHSIZE, uint64_hash, rd_hash_equal);
+}
+
+int ramdisk_write_to_hashtable(struct hashtable *h, uint64_t sector,
+ int nb_sectors, size_t sector_size, char* buf,
+ const char *log_prefix)
+{
+ int i, rc;
+
+ for (i = 0; i < nb_sectors; i++) {
+ rc = ramdisk_write_hash(h, sector + i,
+ buf + i * sector_size,
+ sector_size, log_prefix);
+ if (rc)
+ return rc;
+ }
+
+ return 0;
+}
+
+void ramdisk_destroy_hashtable(struct hashtable *h)
+{
+ if (!h)
+ return;
+
+ hashtable_destroy(h, 1);
+}
diff --git a/tools/blktap2/drivers/block-replication.h
b/tools/blktap2/drivers/block-replication.h
index 0bd6e71..fdc216e 100644
--- a/tools/blktap2/drivers/block-replication.h
+++ b/tools/blktap2/drivers/block-replication.h
@@ -110,4 +110,52 @@ int td_replication_server_restart(td_replication_connect_t
*t);
*/
int td_replication_client_start(td_replication_connect_t *t);
+/* I/O replication */
+typedef struct ramdisk ramdisk_t;
+struct ramdisk {
+ size_t sector_size;
+ const char *log_prefix;
+ td_image_t *image;
+
+ /* private */
+ /* count of outstanding requests to the base driver */
+ size_t inflight;
+ /* prev holds the requests to be flushed, while inprogress holds
+ * requests being flushed. When requests complete, they are removed
+ * from inprogress.
+ * Whenever a new flush is merged with ongoing flush (i.e, prev),
+ * we have to make sure that none of the new requests overlap with
+ * ones in "inprogress". If it does, keep it back in prev and dont issue
+ * IO until the current one finishes. If we allow this IO to proceed,
+ * we might end up with two "overlapping" requests in the disk's queue
and
+ * the disk may not offer any guarantee on which one is written first.
+ * IOW, make sure we dont create a write-after-write time ordering
constraint.
+ */
+ struct hashtable *prev;
+ struct hashtable *inprogress;
+};
+
+void ramdisk_init(ramdisk_t *ramdisk);
+void ramdisk_destroy(ramdisk_t *ramdisk);
+
+/* flush pending contents to disk */
+int ramdisk_flush(ramdisk_t *ramdisk);
+/* flush new contents to disk */
+int ramdisk_start_flush(ramdisk_t *ramdisk, struct hashtable **new);
+int ramdisk_writes_inflight(ramdisk_t *ramdisk);
+
+/*
+ * try to read from ramdisk. Return -1 if some sectors are not in
+ * ramdisk. Otherwise, return 0.
+ */
+int ramdisk_read(struct ramdisk *ramdisk, uint64_t sector,
+ int nb_sectors, char *buf);
+
+/* create a new hashtable that can be used by ramdisk */
+struct hashtable *ramdisk_new_hashtable(void);
+int ramdisk_write_to_hashtable(struct hashtable *h, uint64_t sector,
+ int nb_sectors, size_t sector_size, char* buf,
+ const char *log_prefix);
+void ramdisk_destroy_hashtable(struct hashtable *h);
+
#endif
--
1.9.3
_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel
|
![]() |
Lists.xenproject.org is hosted with RackSpace, monitoring our |