[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Xen-devel] [RFC Patch v3 13/22] blktap2: connect to backup asynchronously



On Sep 5, 2014 5:30 AM, "Wen Congyang" <wency@xxxxxxxxxxxxxx> wrote:
>
> tapdisk2 is a single thread process. If we use remus,
> we will block in primary_blocking_connect(). The
> user will not have any chance to talk with tapdisk2.
> So we should connect to backup asynchronously.
> Before the connection is established, we queue
> all I/O request, and handle it when the connection
> is established.
>

Hi
 This is useful functionality but also very dangerous. I am afraid your comments are quite sparse in addressing how the I/O requests are handled at the primary until the connection is established.
Before this patch, the guest would block halfway through boot, until the backup connection is established. What happens after this patch?
Will the guest's write requests go through? If so, are you merging writes to same block in your internal ring? What about reads? Do you query your in memory queue to serve these reads?
 You are basically introducing a copy on write semantics at the primary until a backup connection is established, with the faulting writes and subsequent reads being served out of memory. What happens to the network io during this wait time? Are all outputs blocked?
The DRBD version allows the primary to function just like a normal VM until Remus is started. When Remus is started, it ensures that both disks are in sync before the initial migration starts. Are you doing something similar here?

The patch is unfortunately too mangled to follow. You may have to break it up, as the diff lines seem to straddle across function boundaries in confusing ways, making it hard to understand.

> Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
> Cc: Shriram Rajagopalan <rshriram@xxxxxxxxx>
> ---
> Âtools/blktap2/drivers/block-remus.c | 760 +++++++++++++++++++++++-------------
> Â1 file changed, 479 insertions(+), 281 deletions(-)
>
> diff --git a/tools/blktap2/drivers/block-remus.c b/tools/blktap2/drivers/block-remus.c
> index d358b44..c21f851 100644
> --- a/tools/blktap2/drivers/block-remus.c
> +++ b/tools/blktap2/drivers/block-remus.c
> @@ -63,10 +63,28 @@
> Â#define RAMDISK_HASHSIZE 128
>
> Â/* connect retry timeout (seconds) */
> -#define REMUS_CONNRETRY_TIMEOUT 10
> +#define REMUS_CONNRETRY_TIMEOUT 1
>
> Â#define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
>
> +#define UNREGISTER_EVENT(id)Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â\
> +Â Â Â Âdo {Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â \
> +Â Â Â Â Â Â Â Âif (id >= 0) {Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â \
> +Â Â Â Â Â Â Â Â Â Â Â Âtapdisk_server_unregister_event(id);Â Â \
> +Â Â Â Â Â Â Â Â Â Â Â Âid = -1;Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â \
> +Â Â Â Â Â Â Â Â}Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â\
> +Â Â Â Â} while (0)
> +
> +#define CLOSE_FD(fd)Â Â Â Â Â Â Â Â Â Â\
> +Â Â Â Âdo {Â Â Â Â Â Â Â Â Â Â Â Â Â Â \
> +Â Â Â Â Â Â Â Âif (fd >= 0) {Â Â Â Â Â \
> +Â Â Â Â Â Â Â Â Â Â Â Âclose(fd);Â Â Â \
> +Â Â Â Â Â Â Â Â Â Â Â Âfd = -1;Â Â Â Â \
> +Â Â Â Â Â Â Â Â}Â Â Â Â Â Â Â Â Â Â Â Â\
> +Â Â Â Â} while (0)
> +
> +#define MAX_REMUS_REQUESTÂ Â Â ÂTAPDISK_DATA_REQUESTS
> +
> Âenum tdremus_mode {
> Â Â Â Â mode_invalid = 0,
> Â Â Â Â mode_unprotected,
> @@ -74,17 +92,21 @@ enum tdremus_mode {
> Â Â Â Â mode_backup
> Â};
>
> +enum {
> +Â Â Â ÂERROR_INTERNAL = -1,
> +Â Â Â ÂERROR_IO = -2,
> +Â Â Â ÂERROR_CONNECTION = -3,
> +};
> +
> Âstruct tdremus_req {
> -Â Â Â Âuint64_t sector;
> -Â Â Â Âint nb_sectors;
> -Â Â Â Âchar buf[4096];
> +Â Â Â Âtd_request_t treq;
> Â};
>
> Âstruct req_ring {
> Â Â Â Â /* waste one slot to distinguish between empty and full */
> -Â Â Â Âstruct tdremus_req requests[MAX_REQUESTS * 2 + 1];
> -Â Â Â Âunsigned int head;
> -Â Â Â Âunsigned int tail;
> +Â Â Â Âstruct tdremus_req pending_requests[MAX_REMUS_REQUEST + 1];
> +Â Â Â Âunsigned int prod;
> +Â Â Â Âunsigned int cons;
> Â};
>
> Â/* TODO: This isn't very pretty, but to properly generate our own treqs (needed
> @@ -144,10 +166,21 @@ struct ramdisk_write_cbdata {
>
> Âtypedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
>
> -/* poll_fd type for blktap2 fd system. taken from block_log.c */
> +/*
> + * If cid, rid and wid are -1, fd must be -1. It means that
> + * we are in unpritected mode or we don't start to connect
> + * to backup.
> + * If fd is an valid fd:
> + *Â cid is valid, rid and wid must be invalid. It means that
> + *Â Â Â the connection is in progress.
> + *Â cid is invalid. rid or wid must be valid. It means that
> + *Â Â Â the connection is established.
> + */
> Âtypedef struct poll_fd {
>     int    fd;
> -Â Â Â Âevent_id_t id;
> +Â Â Â Âevent_id_t cid;
> +Â Â Â Âevent_id_t rid;
> +Â Â Â Âevent_id_t wid;
> Â} poll_fd_t;
>
> Âstruct tdremus_state {
> @@ -166,8 +199,11 @@ struct tdremus_state {
> Â Â Â Â poll_fd_t server_fd;Â Â /* server listen port */
> Â Â Â Â poll_fd_t stream_fd;Â Â Â/* replication channel */
>
> -Â Â Â Â/* queue write requests, batch-replicate at submit */
> -Â Â Â Âstruct req_ring write_ring;
> +Â Â Â Â/*
> +Â Â Â Â * queue I/O requests, batch-replicate when
> +Â Â Â Â * the connection is established.
> +Â Â Â Â */
> +Â Â Â Âstruct req_ring queued_io;
>
> Â Â Â Â /* ramdisk data*/
> Â Â Â Â struct ramdisk ramdisk;
> @@ -207,11 +243,13 @@ static int tdremus_close(td_driver_t *driver);
>
> Âstatic int switch_mode(td_driver_t *driver, enum tdremus_mode mode);
> Âstatic int ctl_respond(struct tdremus_state *s, const char *response);
> +static int ctl_register(struct tdremus_state *s);
> +static void ctl_unregister(struct tdremus_state *s);
>
> Â/* ring functions */
> -static inline unsigned int ring_next(struct req_ring* ring, unsigned int pos)
> +static inline unsigned int ring_next(unsigned int pos)
> Â{
> -Â Â Â Âif (++pos >= MAX_REQUESTS * 2 + 1)
> +Â Â Â Âif (++pos >= MAX_REMUS_REQUEST + 1)
> Â Â Â Â Â Â Â Â return 0;
>
> Â Â Â Â return pos;
> @@ -219,13 +257,26 @@ static inline unsigned int ring_next(struct req_ring* ring, unsigned int pos)
>
> Âstatic inline int ring_isempty(struct req_ring* ring)
> Â{
> -Â Â Â Âreturn ring->head == ring->tail;
> +Â Â Â Âreturn ring->cons == ring->prod;
> Â}
>
> Âstatic inline int ring_isfull(struct req_ring* ring)
> Â{
> -Â Â Â Âreturn ring_next(ring, ring->tail) == ring->head;
> +Â Â Â Âreturn ring_next(ring->prod) == ring->cons;
> +}
> +
> +static void ring_add_request(struct req_ring *ring, const td_request_t *treq)
> +{
> +Â Â Â Â/* If ring is full, it means that tapdisk2 has some bug */
> +Â Â Â Âif (ring_isfull(ring)) {
> +Â Â Â Â Â Â Â ÂRPRINTF("OOPS, ring is full\n");
> +Â Â Â Â Â Â Â Âexit(1);
> +Â Â Â Â}
> +
> +Â Â Â Âring->pending_requests[ring->prod].treq = *treq;
> +Â Â Â Âring->prod = ring_next(ring->prod);
> Â}
> +
> Â/* Prototype declarations */
> Âstatic int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s);
>
> @@ -728,30 +779,39 @@ static int mwrite(int fd, void* buf, size_t len)
>
> Âstatic void inline close_stream_fd(struct tdremus_state *s)
> Â{
> -Â Â Â Âif (s->stream_fd.fd < 0)
> -Â Â Â Â Â Â Â Âreturn;
>
> -Â Â Â Â/* XXX: -2 is magic. replace with macro perhaps? */
> -Â Â Â Âtapdisk_server_unregister_event(s->stream_fd.id);
> -Â Â Â Âclose(s->stream_fd.fd);
> -Â Â Â Âs->stream_fd.fd = -2;
> +Â Â Â ÂUNREGISTER_EVENT(s->stream_fd.cid);
> +Â Â Â ÂUNREGISTER_EVENT(s->stream_fd.rid);
> +Â Â Â ÂUNREGISTER_EVENT(s->stream_fd.wid);
> +
> +Â Â Â Â/* close the connection */
> +Â Â Â ÂCLOSE_FD(s->stream_fd.fd);
> Â}
>
> Âstatic void close_server_fd(struct tdremus_state *s)
> Â{
> -Â Â Â Âif (s->server_fd.fd < 0)
> -Â Â Â Â Â Â Â Âreturn;
> -
> -Â Â Â Âtapdisk_server_unregister_event(s->server_fd.id);
> -Â Â Â Âs->server_fd.id = -1;
> -Â Â Â Âclose(s->stream_fd.fd);
> -Â Â Â Âs->stream_fd.fd = -1;
> +Â Â Â ÂUNREGISTER_EVENT(s->server_fd.cid);
> +Â Â Â ÂCLOSE_FD(s->server_fd.fd);
> Â}
>
> Â/* primary functions */
> Âstatic void remus_client_event(event_id_t, char mode, void *private);
> Âstatic void remus_connect_event(event_id_t id, char mode, void *private);
> Âstatic void remus_retry_connect_event(event_id_t id, char mode, void *private);
> +static int primary_forward_request(struct tdremus_state *s,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â const td_request_t *treq);
> +
> +/*
> + * It is called when we cannot connect to backup, or find I/O error when
> + * reading/writing.
> + */
> +static void primary_failed(struct tdremus_state *s, int rc)
> +{
> +Â Â Â Âclose_stream_fd(s);
> +Â Â Â Âif (rc == ERROR_INTERNAL)
> +Â Â Â Â Â Â Â ÂRPRINTF("switch to unprotected mode due to internal error");
> +Â Â Â Âswitch_mode(s->tdremus_driver, mode_unprotected);
> +}
>
> Âstatic int primary_do_connect(struct tdremus_state *state)
> Â{
> @@ -760,281 +820,247 @@ static int primary_do_connect(struct tdremus_state *state)
> Â Â Â Â int rc;
> Â Â Â Â int flags;
>
> -Â Â Â ÂRPRINTF("client connecting to %s:%d...\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
> +Â Â Â ÂRPRINTF("client connecting to %s:%d...\n",
> +Â Â Â Â Â Â Â Âinet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
>
> Â Â Â Â if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
> Â Â Â Â Â Â Â Â RPRINTF("could not create client socket: %d\n", errno);
> -Â Â Â Â Â Â Â Âreturn -1;
> +Â Â Â Â Â Â Â Âreturn ERROR_INTERNAL;
> Â Â Â Â }
> +Â Â Â Âstate->stream_fd.fd = fd;
>
> Â Â Â Â /* make socket nonblocking */
> Â Â Â Â if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
> Â Â Â Â Â Â Â Â flags = 0;
> -Â Â Â Âif (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
> -Â Â Â Â Â Â Â Âreturn -1;
> +Â Â Â Âif (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
> +Â Â Â Â Â Â Â ÂRPRINTF("error setting fd %d to non block mode\n", fd);
> +Â Â Â Â Â Â Â Âreturn ERROR_INTERNAL;
> +Â Â Â Â}
>
> -Â Â Â Â/* once we have created the socket and populated the address, we can now start
> -Â Â Â Â * our non-blocking connect. rather than duplicating code we trigger a timeout
> -Â Â Â Â * on the socket fd, which calls out nonblocking connect code
> +Â Â Â Â/*
> +Â Â Â Â * once we have created the socket and populated the address,
> +Â Â Â Â * we can now start our non-blocking connect. rather than
> +Â Â Â Â * duplicating code we trigger a timeout on the socket fd,
> +Â Â Â Â * which calls out nonblocking connect code
> Â Â Â Â Â*/
> -Â Â Â Âif((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0, remus_retry_connect_event, state)) < 0) {
> -Â Â Â Â Â Â Â ÂRPRINTF("error registering timeout client connection event handler: %s\n", strerror(id));
> -Â Â Â Â Â Â Â Â/* TODO: we leak a fd here */
> -Â Â Â Â Â Â Â Âreturn -1;
> +Â Â Â Âif((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â remus_retry_connect_event,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â state)) < 0) {
> +Â Â Â Â Â Â Â ÂRPRINTF("error registering timeout client connection event handler: %s\n",
> +Â Â Â Â Â Â Â Â Â Â Â Âstrerror(id));
> +Â Â Â Â Â Â Â Âreturn ERROR_INTERNAL;
> Â Â Â Â }
> -Â Â Â Âstate->stream_fd.fd = fd;
> -Â Â Â Âstate->stream_fd.id = id;
> +
> +Â Â Â Âstate->stream_fd.cid = id;
> Â Â Â Â return 0;
> Â}
>
> -static int primary_blocking_connect(struct tdremus_state *state)
> +static int remus_handle_queued_io(struct tdremus_state *s)
> Â{
> -Â Â Â Âint fd;
> -Â Â Â Âint id;
> +Â Â Â Âstruct req_ring *queued_io = &s->queued_io;
> +Â Â Â Âunsigned int cons;
> +Â Â Â Âtd_request_t *treq;
> Â Â Â Â int rc;
> -Â Â Â Âint flags;
> -
> -Â Â Â ÂRPRINTF("client connecting to %s:%d...\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
>
> -Â Â Â Âif ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
> -Â Â Â Â Â Â Â ÂRPRINTF("could not create client socket: %d\n", errno);
> -Â Â Â Â Â Â Â Âreturn -1;
> -Â Â Â Â}
> +Â Â Â Âwhile (!ring_isempty(queued_io)) {
> +Â Â Â Â Â Â Â Âcons = queued_io->cons;
> +Â Â Â Â Â Â Â Âtreq = &queued_io->pending_requests[cons].treq;
>
> -Â Â Â Âdo {
> -Â Â Â Â Â Â Â Âif ((rc = connect(fd, (struct sockaddr *)&state->sa,
> -Â Â Â Â Â Â Â Â Â Âsizeof(state->sa))) < 0)
> -Â Â Â Â Â Â Â Â{
> -Â Â Â Â Â Â Â Â Â Â Â Âif (errno == ECONNREFUSED) {
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â ÂRPRINTF("connection refused -- retrying in 1 second\n");
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âsleep(1);
> -Â Â Â Â Â Â Â Â Â Â Â Â} else {
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â ÂRPRINTF("connection failed: %d\n", errno);
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âclose(fd);
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âreturn -1;
> -Â Â Â Â Â Â Â Â Â Â Â Â}
> +Â Â Â Â Â Â Â Âif (treq->op == TD_OP_WRITE) {
> +Â Â Â Â Â Â Â Â Â Â Â Ârc = primary_forward_request(s, treq);
> +Â Â Â Â Â Â Â Â Â Â Â Âif (rc)
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âreturn rc;
> Â Â Â Â Â Â Â Â }
> -Â Â Â Â} while (rc < 0);
> -
> -Â Â Â ÂRPRINTF("client connected\n");
> -
> -Â Â Â Â/* make socket nonblocking */
> -Â Â Â Âif ((flags = fcntl(fd, F_GETFL, 0)) == -1)
> -Â Â Â Â Â Â Â Âflags = 0;
> -Â Â Â Âif (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
> -Â Â Â Â{
> -Â Â Â Â Â Â Â ÂRPRINTF("error making socket nonblocking\n");
> -Â Â Â Â Â Â Â Âclose(fd);
> -Â Â Â Â Â Â Â Âreturn -1;
> -Â Â Â Â}
>
> -Â Â Â Âif((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, fd, 0, remus_client_event, state)) < 0) {
> -Â Â Â Â Â Â Â ÂRPRINTF("error registering client event handler: %s\n", strerror(id));
> -Â Â Â Â Â Â Â Âclose(fd);
> -Â Â Â Â Â Â Â Âreturn -1;
> +Â Â Â Â Â Â Â Âtd_forward_request(*treq);
> +Â Â Â Â Â Â Â Âqueued_io->cons = ring_next(cons);
> Â Â Â Â }
>
> -Â Â Â Âstate->stream_fd.fd = fd;
> -Â Â Â Âstate->stream_fd.id = id;
> Â Â Â Â return 0;
> Â}
>
> -/* on read, just pass request through */
> -static void primary_queue_read(td_driver_t *driver, td_request_t treq)
> -{
> -Â Â Â Â/* just pass read through */
> -Â Â Â Âtd_forward_request(treq);
> -}
> -
> -/* TODO:
> - * The primary uses mwrite() to write the contents of a write request to the
> - * backup. This effectively blocks until all data has been copied into a system
> - * buffer or a timeout has occured. We may wish to instead use tapdisk's
> - * nonblocking i/o interface, tapdisk_server_register_event(), to set timeouts
> - * and write data in an asynchronous fashion.
> - */
> -static void primary_queue_write(td_driver_t *driver, td_request_t treq)
> +static int remus_connection_done(struct tdremus_state *s)
> Â{
> -Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)driver->data;
> -
> -Â Â Â Âchar header[sizeof(uint32_t) + sizeof(uint64_t)];
> -Â Â Â Âuint32_t *sectors = (uint32_t *)header;
> -Â Â Â Âuint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
> +Â Â Â Âevent_id_t id;
>
> -Â Â Â Â// RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
> +Â Â Â Â/* the connect succeeded */
> +Â Â Â Â/* unregister this function and register a new event handler */
> +Â Â Â Âtapdisk_server_unregister_event(s->stream_fd.cid);
> +Â Â Â Âs->stream_fd.cid = -1;
>
> -Â Â Â Â/* -1 means we haven't connected yet, -2 means the connection was lost */
> -Â Â Â Âif(s->stream_fd.fd == -1) {
> -Â Â Â Â Â Â Â ÂRPRINTF("connecting to backup...\n");
> -Â Â Â Â Â Â Â Âprimary_blocking_connect(s);
> +Â Â Â Âid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->stream_fd.fd,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â 0, remus_client_event, s);
> +Â Â Â Âif(id < 0) {
> +Â Â Â Â Â Â Â ÂRPRINTF("error registering client event handler: %s\n",
> +Â Â Â Â Â Â Â Â Â Â Â Âstrerror(id));
> +Â Â Â Â Â Â Â Âreturn ERROR_INTERNAL;
> Â Â Â Â }
> +Â Â Â Âs->stream_fd.rid = id;
>
> -Â Â Â Â*sectors = treq.secs;
> -Â Â Â Â*sector = treq.sec;
> -
> -Â Â Â Âif (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0)
> -Â Â Â Â Â Â Â Âgoto fail;
> -Â Â Â Âif (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
> -Â Â Â Â Â Â Â Âgoto fail;
> +Â Â Â Â/* handle the queued requests */
> +Â Â Â Âreturn remus_handle_queued_io(s);
> +}
>
> -Â Â Â Âif (mwrite(s->stream_fd.fd, treq.buf, treq.secs * driver->info.sector_size) < 0)
> -Â Â Â Â Â Â Â Âgoto fail;
> +static int remus_retry_connect(struct tdremus_state *s)
> +{
> +Â Â Â Âevent_id_t id;
>
> -Â Â Â Âtd_forward_request(treq);
> +Â Â Â Âtapdisk_server_unregister_event(s->stream_fd.cid);
> +Â Â Â Âs->stream_fd.cid = -1;
>
> -Â Â Â Âreturn;
> +Â Â Â ÂRPRINTF("connect to backup 1 second later");
> +Â Â Â Âid = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â s->stream_fd.fd,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â REMUS_CONNRETRY_TIMEOUT,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â remus_retry_connect_event, s);
> +Â Â Â Âif (id < 0) {
> +Â Â Â Â Â Â Â ÂRPRINTF("error registering timeout client connection event handler: %s\n",
> +Â Â Â Â Â Â Â Â Â Â Â Âstrerror(id));
> +Â Â Â Â Â Â Â Âreturn ERROR_INTERNAL;
> +Â Â Â Â}
>
> - fail:
> -Â Â Â Â/* switch to unprotected mode and tell tapdisk to retry */
> -Â Â Â ÂRPRINTF("write request replication failed, switching to unprotected mode");
> -Â Â Â Âswitch_mode(s->tdremus_driver, mode_unprotected);
> -Â Â Â Âtd_complete_request(treq, -EBUSY);
> +Â Â Â Âs->stream_fd.cid = id;
> +Â Â Â Âreturn 0;
> Â}
>
> -
> -/* It is called when the user writes "flush" to control file */
> -static int client_flush(td_driver_t *driver)
> +static int remus_wait_connect_done(struct tdremus_state *s)
> Â{
> -Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)driver->data;
> -
> -Â Â Â Â// RPRINTF("committing output\n");
> +Â Â Â Âevent_id_t id;
>
> -Â Â Â Âif (s->stream_fd.fd == -1)
> -Â Â Â Â Â Â Â Â/* connection not yet established, nothing to flush */
> -Â Â Â Â Â Â Â Âreturn 0;
> +Â Â Â Âtapdisk_server_unregister_event(s->stream_fd.cid);
> +Â Â Â Âs->stream_fd.cid = -1;
>
> -Â Â Â Âif (mwrite(s->stream_fd.fd, TDREMUS_COMMIT,
> -Â Â Â Â Â Âstrlen(TDREMUS_COMMIT)) < 0) {
> -Â Â Â Â Â Â Â ÂRPRINTF("error flushing output");
> -Â Â Â Â Â Â Â Âclose_stream_fd(s);
> -Â Â Â Â Â Â Â Âreturn -1;
> +Â Â Â Âid = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â s->stream_fd.fd, 0,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â remus_connect_event, s);
> +Â Â Â Âif (id < 0) {
> +Â Â Â Â Â Â Â ÂRPRINTF("error registering client connection event handler: %s\n",
> +Â Â Â Â Â Â Â Â Â Â Â Âstrerror(id));
> +Â Â Â Â Â Â Â Âreturn ERROR_INTERNAL;
> Â Â Â Â }
> +Â Â Â Âs->stream_fd.cid = id;
>
> Â Â Â Â return 0;
> Â}
>
> -static int server_flush(td_driver_t *driver)
> +/* return 1 if we need to reconnect to backup */
> +static int check_connect_errno(int err)
> Â{
> -Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)driver->data;
> -Â Â Â Â/*
> -Â Â Â Â * Nothing to flush in beginning.
> +Â Â Â Â/*
> +Â Â Â Â * The fd is non-block, so we will not get ETIMEDOUT
> +Â Â Â Â * after calling connect(). We only can get this errno
> +Â Â Â Â * by getsockopt().
> Â Â Â Â Â*/
> -Â Â Â Âif (!s->ramdisk.prev)
> -Â Â Â Â Â Â Â Âreturn 0;
> -Â Â Â Â/* Try to flush any remaining requests */
> -Â Â Â Âreturn ramdisk_flush(driver, s);
> -}
> -
> -static int primary_start(td_driver_t *driver)
> -{
> -Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)driver->data;
> -
> -Â Â Â ÂRPRINTF("activating client mode\n");
> -
> -Â Â Â Âtapdisk_remus.td_queue_read = primary_queue_read;
> -Â Â Â Âtapdisk_remus.td_queue_write = primary_queue_write;
> -
> -Â Â Â Âs->stream_fd.fd = -1;
> -Â Â Â Âs->stream_fd.id = -1;
> +Â Â Â Âif (err == ECONNREFUSED || err == ENETUNREACH ||
> +Â Â Â Â Â Âerr == EAGAIN || err == ECONNABORTED ||
> +Â Â Â Â Â Âerr == ETIMEDOUT)
> +Â Â Â Â Â Âreturn 1;
>
> Â Â Â Â return 0;
> Â}
>
> -/* timeout callback */
> Âstatic void remus_retry_connect_event(event_id_t id, char mode, void *private)
> Â{
> Â Â Â Â struct tdremus_state *s = (struct tdremus_state *)private;
> +Â Â Â Âint rc, ret;
>
> Â Â Â Â /* do a non-blocking connect */
> -Â Â Â Âif (connect(s->stream_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa))
> -Â Â Â Â Â Â&& errno != EINPROGRESS)
> -Â Â Â Â{
> -Â Â Â Â Â Â Â Âif(errno == ECONNREFUSED || errno == ENETUNREACH || errno == EAGAIN || errno == ECONNABORTED)
> -Â Â Â Â Â Â Â Â{
> -Â Â Â Â Â Â Â Â Â Â Â Â/* try again in a second */
> -Â Â Â Â Â Â Â Â Â Â Â Âtapdisk_server_unregister_event(s->stream_fd.id);
> -Â Â Â Â Â Â Â Â Â Â Â Âif((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, s->stream_fd.fd, REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) {
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â ÂRPRINTF("error registering timeout client connection event handler: %s\n", strerror(id));
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âreturn;
> -Â Â Â Â Â Â Â Â Â Â Â Â}
> -Â Â Â Â Â Â Â Â Â Â Â Âs->stream_fd.id = id;
> -Â Â Â Â Â Â Â Â}
> -Â Â Â Â Â Â Â Âelse
> -Â Â Â Â Â Â Â Â{
> -Â Â Â Â Â Â Â Â Â Â Â Â/* not recoverable */
> -Â Â Â Â Â Â Â Â Â Â Â ÂRPRINTF("error connection to server %s\n", strerror(errno));
> +Â Â Â Âret = connect(s->stream_fd.fd,
> +Â Â Â Â Â Â Â Â Â Â Â(struct sockaddr *)&s->sa,
> +Â Â Â Â Â Â Â Â Â Â Âsizeof(s->sa));
> +Â Â Â Âif (ret) {
> +Â Â Â Â Â Â Â Âif (errno == EINPROGRESS) {
> +Â Â Â Â Â Â Â Â Â Â Â Â/*
> +Â Â Â Â Â Â Â Â Â Â Â Â * the connect returned EINPROGRESS (nonblocking
> +Â Â Â Â Â Â Â Â Â Â Â Â * connect) we must wait for the fd to be writeable
> +Â Â Â Â Â Â Â Â Â Â Â Â * to determine if the connect worked
> +Â Â Â Â Â Â Â Â Â Â Â Â */
> +Â Â Â Â Â Â Â Â Â Â Â Ârc = remus_wait_connect_done(s);
> +Â Â Â Â Â Â Â Â Â Â Â Âif (rc)
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> Â Â Â Â Â Â Â Â Â Â Â Â return;
> Â Â Â Â Â Â Â Â }
> -Â Â Â Â}
> -Â Â Â Âelse
> -Â Â Â Â{
> -Â Â Â Â Â Â Â Â/* the connect returned EINPROGRESS (nonblocking connect) we must wait for the fd to be writeable to determine if the connect worked */
>
> -Â Â Â Â Â Â Â Âtapdisk_server_unregister_event(s->stream_fd.id);
> -Â Â Â Â Â Â Â Âif((id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD, s->stream_fd.fd, 0, remus_connect_event, s)) < 0) {
> -Â Â Â Â Â Â Â Â Â Â Â ÂRPRINTF("error registering client connection event handler: %s\n", strerror(id));
> +Â Â Â Â Â Â Â Âif (check_connect_errno(errno)) {
> +Â Â Â Â Â Â Â Â Â Â Â Ârc = remus_retry_connect(s);
> +Â Â Â Â Â Â Â Â Â Â Â Âif (rc)
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> Â Â Â Â Â Â Â Â Â Â Â Â return;
> Â Â Â Â Â Â Â Â }
> -Â Â Â Â Â Â Â Âs->stream_fd.id = id;
> +
> +Â Â Â Â Â Â Â Â/* not recoverable */
> +Â Â Â Â Â Â Â ÂRPRINTF("error connection to server %s\n", strerror(errno));
> +Â Â Â Â Â Â Â Ârc = ERROR_CONNECTION;
> +Â Â Â Â Â Â Â Âgoto fail;
> Â Â Â Â }
> +
> +Â Â Â Â/* The connection is established unexpectedly */
> +Â Â Â Ârc = remus_connection_done(s);
> +Â Â Â Âif (rc)
> +Â Â Â Â Â Â Â Âgoto fail;
> +
> +Â Â Â Âreturn;
> +
> +fail:
> +Â Â Â Âprimary_failed(s, rc);
> +Â Â Â Âreturn;
> Â}
>
> Â/* callback when nonblocking connect() is finished */
> -/* called only by primary in unprotected state */
> Âstatic void remus_connect_event(event_id_t id, char mode, void *private)
> Â{
> Â Â Â Â int socket_errno;
> Â Â Â Â socklen_t socket_errno_size;
> Â Â Â Â struct tdremus_state *s = (struct tdremus_state *)private;
> +Â Â Â Âint rc;
>
> -Â Â Â Â/* check to se if the connect succeeded */
> +Â Â Â Â/* check to see if the connect succeeded */
> Â Â Â Â socket_errno_size = sizeof(socket_errno);
> -Â Â Â Âif (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR, &socket_errno, &socket_errno_size)) {
> +Â Â Â Âif (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR,
> +Â Â Â Â Â Â Â Â Â Â Â &socket_errno, &socket_errno_size)) {
> Â Â Â Â Â Â Â Â RPRINTF("error getting socket errno\n");
> Â Â Â Â Â Â Â Â return;
> Â Â Â Â }
>
> Â Â Â Â RPRINTF("socket connect returned %d\n", socket_errno);
>
> -Â Â Â Âif(socket_errno)
> -Â Â Â Â{
> +Â Â Â Âif (socket_errno) {
> Â Â Â Â Â Â Â Â /* the connect did not succeed */
> +Â Â Â Â Â Â Â Âif (check_connect_errno(socket_errno)) {
> +Â Â Â Â Â Â Â Â Â Â Â Â/*
> +Â Â Â Â Â Â Â Â Â Â Â Â * we can probably assume that the backup is down.
> +Â Â Â Â Â Â Â Â Â Â Â Â * just try again later
> +Â Â Â Â Â Â Â Â Â Â Â Â */
> +Â Â Â Â Â Â Â Â Â Â Â Ârc = remus_retry_connect(s);
> +Â Â Â Â Â Â Â Â Â Â Â Âif (rc)
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
>
> -Â Â Â Â Â Â Â Âif(socket_errno == ECONNREFUSED || socket_errno == ENETUNREACH || socket_errno == ETIMEDOUT
> -Â Â Â Â Â Â Â Â Â || socket_errno == ECONNABORTED || socket_errno == EAGAIN)
> -Â Â Â Â Â Â Â Â{
> -Â Â Â Â Â Â Â Â Â Â Â Â/* we can probably assume that the backup is down. just try again later */
> -Â Â Â Â Â Â Â Â Â Â Â Âtapdisk_server_unregister_event(s->stream_fd.id);
> -Â Â Â Â Â Â Â Â Â Â Â Âif((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, s->stream_fd.fd, REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) {
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â ÂRPRINTF("error registering timeout client connection event handler: %s\n", strerror(id));
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âreturn;
> -Â Â Â Â Â Â Â Â Â Â Â Â}
> -Â Â Â Â Â Â Â Â Â Â Â Âs->stream_fd.id = id;
> -Â Â Â Â Â Â Â Â}
> -Â Â Â Â Â Â Â Âelse
> -Â Â Â Â Â Â Â Â{
> -Â Â Â Â Â Â Â Â Â Â Â ÂRPRINTF("socket connect returned %d, giving up\n", socket_errno);
> -Â Â Â Â Â Â Â Â}
> -Â Â Â Â}
> -Â Â Â Âelse
> -Â Â Â Â{
> -Â Â Â Â Â Â Â Â/* the connect succeeded */
> -
> -Â Â Â Â Â Â Â Â/* unregister this function and register a new event handler */
> -Â Â Â Â Â Â Â Âtapdisk_server_unregister_event(s->stream_fd.id);
> -Â Â Â Â Â Â Â Âif((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->stream_fd.fd, 0, remus_client_event, s)) < 0) {
> -Â Â Â Â Â Â Â Â Â Â Â ÂRPRINTF("error registering client event handler: %s\n", strerror(id));
> Â Â Â Â Â Â Â Â Â Â Â Â return;
> +Â Â Â Â Â Â Â Â} else {
> +Â Â Â Â Â Â Â Â Â Â Â ÂRPRINTF("socket connect returned %d, giving up\n",
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âsocket_errno);
> +Â Â Â Â Â Â Â Â Â Â Â Ârc = ERROR_CONNECTION;
> +Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> Â Â Â Â Â Â Â Â }
> -Â Â Â Â Â Â Â Âs->stream_fd.id = id;
>
> -Â Â Â Â Â Â Â Â/* switch from unprotected to protected client */
> -Â Â Â Â Â Â Â Âswitch_mode(s->tdremus_driver, mode_primary);
> +Â Â Â Â Â Â Â Âreturn;
> Â Â Â Â }
> +
> +Â Â Â Ârc = remus_connection_done(s);
> +Â Â Â Âif (rc)
> +Â Â Â Â Â Â Â Âgoto fail;
> +
> +Â Â Â Âreturn;
> +
> +fail:
> +Â Â Â Âprimary_failed(s, rc);
> Â}
>
>
> -/* we install this event handler on the primary once we have connected to the backup */
> +/*
> + * we install this event handler on the primary once we have
> + * connected to the backup.
> + */
> Â/* wait for "done" message to commit checkpoint */
> Âstatic void remus_client_event(event_id_t id, char mode, void *private)
> Â{
> @@ -1043,9 +1069,12 @@ static void remus_client_event(event_id_t id, char mode, void *private)
> Â Â Â Â int rc;
>
> Â Â Â Â if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
> -Â Â Â Â Â Â Â Â/* replication stream closed or otherwise broken (timeout, reset, &c) */
> +Â Â Â Â Â Â Â Â/*
> +Â Â Â Â Â Â Â Â * replication stream closed or otherwise broken
> +Â Â Â Â Â Â Â Â * (timeout, reset, &c)
> +Â Â Â Â Â Â Â Â */
> Â Â Â Â Â Â Â Â RPRINTF("error reading from backup\n");
> -Â Â Â Â Â Â Â Âclose_stream_fd(s);
> +Â Â Â Â Â Â Â Âprimary_failed(s, ERROR_IO);
> Â Â Â Â Â Â Â Â return;
> Â Â Â Â }
>
> @@ -1056,22 +1085,169 @@ static void remus_client_event(event_id_t id, char mode, void *private)
> Â Â Â Â Â Â Â Â ctl_respond(s, TDREMUS_DONE);
> Â Â Â Â else {
> Â Â Â Â Â Â Â Â RPRINTF("received unknown message: %s\n", req);
> -Â Â Â Â Â Â Â Âclose_stream_fd(s);
> +Â Â Â Â Â Â Â Âprimary_failed(s, ERROR_IO);
> +Â Â Â Â}
> +
> +Â Â Â Âreturn;
> +}
> +
> +static void primary_queue_read(td_driver_t *driver, td_request_t treq)
> +{
> +Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)driver->data;
> +Â Â Â Âstruct req_ring *ring = &s->queued_io;
> +
> +Â Â Â Âif (ring_isempty(ring)) {
> +Â Â Â Â Â Â Â Â/* just pass read through */
> +Â Â Â Â Â Â Â Âtd_forward_request(treq);
> +Â Â Â Â Â Â Â Âreturn;
> +Â Â Â Â}
> +
> +Â Â Â Âring_add_request(ring, &treq);
> +}
> +
> +static int primary_forward_request(struct tdremus_state *s,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â const td_request_t *treq)
> +{
> +Â Â Â Âchar header[sizeof(uint32_t) + sizeof(uint64_t)];
> +Â Â Â Âuint32_t *sectors = (uint32_t *)header;
> +Â Â Â Âuint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
> +Â Â Â Âtd_driver_t *driver = s->tdremus_driver;
> +
> +Â Â Â Â*sectors = treq->secs;
> +Â Â Â Â*sector = treq->sec;
> +
> +Â Â Â Âif (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0)
> +Â Â Â Â Â Â Â Âreturn ERROR_IO;
> +
> +Â Â Â Âif (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
> +Â Â Â Â Â Â Â Âreturn ERROR_IO;
> +
> +Â Â Â Âif (mwrite(s->stream_fd.fd, treq->buf,
> +Â Â Â Â Â Âtreq->secs * driver->info.sector_size) < 0)
> +Â Â Â Â Â Â Â Âreturn ERROR_IO;
> +
> +Â Â Â Âreturn 0;
> +}
> +
> +/* TODO:
> + * The primary uses mwrite() to write the contents of a write request to the
> + * backup. This effectively blocks until all data has been copied into a system
> + * buffer or a timeout has occured. We may wish to instead use tapdisk's
> + * nonblocking i/o interface, tapdisk_server_register_event(), to set timeouts
> + * and write data in an asynchronous fashion.
> + */
> +static void primary_queue_write(td_driver_t *driver, td_request_t treq)
> +{
> +Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)driver->data;
> +Â Â Â Âint rc;
> +
> +Â Â Â Â// RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
> +
> +Â Â Â Âif(s->stream_fd.fd < 0) {
> +Â Â Â Â Â Â Â ÂRPRINTF("connecting to backup...\n");
> +Â Â Â Â Â Â Â Ârc = primary_do_connect(s);
> +Â Â Â Â Â Â Â Âif (rc)
> +Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> +Â Â Â Â}
> +
> +Â Â Â Â/* The connection is not established, just queue the request */
> +Â Â Â Âif (s->stream_fd.cid >= 0) {
> +Â Â Â Â Â Â Â Âring_add_request(&s->queued_io, &treq);
> +Â Â Â Â Â Â Â Âreturn;
> Â Â Â Â }
>
> +Â Â Â Â/* The connection is established */
> +Â Â Â Ârc = primary_forward_request(s, &treq);
> +Â Â Â Âif (rc)
> +Â Â Â Â Â Â Â Âgoto fail;
> +
> +Â Â Â Âtd_forward_request(treq);
> +
> Â Â Â Â return;
> +
> +fail:
> +Â Â Â Â/* switch to unprotected mode and forward the request */
> +Â Â Â ÂRPRINTF("write request replication failed, switching to unprotected mode");
> +Â Â Â Âprimary_failed(s, rc);
> +Â Â Â Âtd_forward_request(treq);
> +}
> +
> +/* It is called when the user write "flush" to control file. */
> +static int client_flush(td_driver_t *driver)
> +{
> +Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)driver->data;
> +
> +Â Â Â Â// RPRINTF("committing output\n");
> +
> +Â Â Â Âif (s->stream_fd.fd == -1)
> +Â Â Â Â Â Â Â Â/* connection not yet established, nothing to flush */
> +Â Â Â Â Â Â Â Âreturn 0;
> +
> +Â Â Â Âif (mwrite(s->stream_fd.fd, TDREMUS_COMMIT,
> +Â Â Â Â Â Âstrlen(TDREMUS_COMMIT)) < 0) {
> +Â Â Â Â Â Â Â ÂRPRINTF("error flushing output");
> +Â Â Â Â Â Â Â Âprimary_failed(s, ERROR_IO);
> +Â Â Â Â Â Â Â Âreturn -1;
> +Â Â Â Â}
> +
> +Â Â Â Âreturn 0;
> +}
> +
> +/* It is called when switching the mode from primary to unprotected */
> +static int primary_flush(td_driver_t *driver)
> +{
> +Â Â Â Âstruct tdremus_state *s = driver->data;
> +Â Â Â Âstruct req_ring *ring = &s->queued_io;
> +Â Â Â Âunsigned int cons;
> +
> +Â Â Â Âif (ring_isempty(ring))
> +Â Â Â Â Â Â Â Âreturn 0;
> +
> +Â Â Â Âwhile (!ring_isempty(ring)) {
> +Â Â Â Â Â Â Â Âcons = ring->cons;
> +Â Â Â Â Â Â Â Âring->cons = ring_next(cons);
> +
> +Â Â Â Â Â Â Â Âtd_forward_request(ring->pending_requests[cons].treq);
> +Â Â Â Â}
> +
> +Â Â Â Âreturn 0;
> +}
> +
> +static int primary_start(td_driver_t *driver)
> +{
> +Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)driver->data;
> +
> +Â Â Â ÂRPRINTF("activating client mode\n");
> +
> +Â Â Â Âtapdisk_remus.td_queue_read = primary_queue_read;
> +Â Â Â Âtapdisk_remus.td_queue_write = primary_queue_write;
> +Â Â Â Âs->queue_flush = primary_flush;
> +
> +Â Â Â Âs->stream_fd.fd = -1;
> +Â Â Â Âs->stream_fd.cid = -1;
> +Â Â Â Âs->stream_fd.rid = -1;
> +Â Â Â Âs->stream_fd.wid = -1;
> +
> +Â Â Â Âreturn 0;
> Â}
>
> Â/* backup functions */
> Âstatic void remus_server_event(event_id_t id, char mode, void *private);
>
> +/* It is called when we find some I/O error */
> +static void backup_failed(struct tdremus_state *s, int rc)
> +{
> +Â Â Â Âclose_stream_fd(s);
> +Â Â Â Âclose_server_fd(s);
> +Â Â Â Â/* We will switch to unprotected mode in backup_queue_write() */
> +}
> +
> Â/* returns the socket that receives write requests */
> Âstatic void remus_server_accept(event_id_t id, char mode, void* private)
> Â{
> Â Â Â Â struct tdremus_state* s = (struct tdremus_state *) private;
>
> Â Â Â Â int stream_fd;
> -Â Â Â Âevent_id_t cid;
>
> Â Â Â Â /* XXX: add address-based black/white list */
> Â Â Â Â if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
> @@ -1079,68 +1255,80 @@ static void remus_server_accept(event_id_t id, char mode, void* private)
> Â Â Â Â Â Â Â Â return;
> Â Â Â Â }
>
> -Â Â Â Â/* TODO: check to see if we are already replicating. if so just close the
> -Â Â Â Â * connection (or do something smarter) */
> +Â Â Â Â/*
> +Â Â Â Â * TODO: check to see if we are already replicating.
> +Â Â Â Â * if so just close the connection (or do something
> +Â Â Â Â * smarter)
> +Â Â Â Â */
> Â Â Â Â RPRINTF("server accepted connection\n");
>
> Â Â Â Â /* add tapdisk event for replication stream */
> -Â Â Â Âcid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âremus_server_event, s);
> +Â Â Â Âid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â remus_server_event, s);
>
> -Â Â Â Âif(cid < 0) {
> -Â Â Â Â Â Â Â ÂRPRINTF("error registering connection event handler: %s\n", strerror(errno));
> +Â Â Â Âif (id < 0) {
> +Â Â Â Â Â Â Â ÂRPRINTF("error registering connection event handler: %s\n",
> +Â Â Â Â Â Â Â Â Â Â Â Âstrerror(errno));
> Â Â Â Â Â Â Â Â close(stream_fd);
> Â Â Â Â Â Â Â Â return;
> Â Â Â Â }
>
> Â Â Â Â /* store replication file descriptor */
> Â Â Â Â s->stream_fd.fd = stream_fd;
> -Â Â Â Âs->stream_fd.id = cid;
> +Â Â Â Âs->stream_fd.rid = id;
> Â}
>
> Â/* returns -2 if EADDRNOTAVAIL */
> Âstatic int remus_bind(struct tdremus_state* s)
> Â{
> -//Â struct sockaddr_in sa;
> Â Â Â Â int opt;
> Â Â Â Â int rc = -1;
> +Â Â Â Âevent_id_t id;
>
> Â Â Â Â if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
> Â Â Â Â Â Â Â Â RPRINTF("could not create server socket: %d\n", errno);
> Â Â Â Â Â Â Â Â return rc;
> Â Â Â Â }
> -Â Â Â Âopt = 1;
> -Â Â Â Âif (setsockopt(s->server_fd.fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0)
> -Â Â Â Â Â Â Â ÂRPRINTF("Error setting REUSEADDR on %d: %d\n", s->server_fd.fd, errno);
>
> -Â Â Â Âif (bind(s->server_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa)) < 0) {
> -Â Â Â Â Â Â Â ÂRPRINTF("could not bind server socket %d to %s:%d: %d %s\n", s->server_fd.fd,
> -Â Â Â Â Â Â Â Â Â Â Â Âinet_ntoa(s->sa.sin_addr), ntohs(s->sa.sin_port), errno, strerror(errno));
> -Â Â Â Â Â Â Â Âif (errno != EADDRINUSE)
> +Â Â Â Âopt = 1;
> +Â Â Â Âif (setsockopt(s->server_fd.fd, SOL_SOCKET,
> +Â Â Â Â Â Â Â Â Â Â Â SO_REUSEADDR, &opt, sizeof(opt)) < 0)
> +Â Â Â Â Â Â Â ÂRPRINTF("Error setting REUSEADDR on %d: %d\n",
> +Â Â Â Â Â Â Â Â Â Â Â Âs->server_fd.fd, errno);
> +
> +Â Â Â Âif (bind(s->server_fd.fd, (struct sockaddr *)&s->sa,
> +Â Â Â Â Â Â Â Â sizeof(s->sa)) < 0) {
> +Â Â Â Â Â Â Â ÂRPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
> +Â Â Â Â Â Â Â Â Â Â Â Âs->server_fd.fd, inet_ntoa(s->sa.sin_addr),
> +Â Â Â Â Â Â Â Â Â Â Â Ântohs(s->sa.sin_port), errno, strerror(errno));
> +Â Â Â Â Â Â Â Âif (errno == EADDRNOTAVAIL)
> Â Â Â Â Â Â Â Â Â Â Â Â rc = -2;
> Â Â Â Â Â Â Â Â goto err_sfd;
> Â Â Â Â }
> +
> Â Â Â Â if (listen(s->server_fd.fd, 10)) {
> Â Â Â Â Â Â Â Â RPRINTF("could not listen on socket: %d\n", errno);
> Â Â Â Â Â Â Â Â goto err_sfd;
> Â Â Â Â }
>
> -Â Â Â Â/* The socket s now bound to the address and listening so we may now register
> -Â Â* the fd with tapdisk */
> -
> -Â Â Â Âif((s->server_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âs->server_fd.fd, 0,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âremus_server_accept, s)) < 0) {
> +Â Â Â Â/*
> +Â Â Â Â * The socket s now bound to the address and listening so we
> +Â Â Â Â * may now register the fd with tapdisk
> +Â Â Â Â */
> +Â Â Â Âid =Â tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âs->server_fd.fd, 0,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âremus_server_accept, s);
> +Â Â Â Âif (id < 0) {
> Â Â Â Â Â Â Â Â RPRINTF("error registering server connection event handler: %s",
> -Â Â Â Â Â Â Â Â Â Â Â Âstrerror(s->server_fd.id));
> +Â Â Â Â Â Â Â Â Â Â Â Âstrerror(id));
> Â Â Â Â Â Â Â Â goto err_sfd;
> Â Â Â Â }
> +Â Â Â Âs->server_fd.cid = id;
>
> Â Â Â Â return 0;
>
> - err_sfd:
> -Â Â Â Âclose(s->server_fd.fd);
> -Â Â Â Âs->server_fd.fd = -1;
> +err_sfd:
> +Â Â Â ÂCLOSE_FD(s->server_fd.fd);
>
> Â Â Â Â return rc;
> Â}
> @@ -1190,10 +1378,21 @@ void backup_queue_write(td_driver_t *driver, td_request_t treq)
> Â Â Â Â td_complete_request(treq, -EBUSY);
> Â}
>
> +static int server_flush(td_driver_t *driver)
> +{
> +Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)driver->data;
> +Â Â Â Â/*
> +Â Â Â Â * Nothing to flush in beginning.
> +Â Â Â Â */
> +Â Â Â Âif (!s->ramdisk.prev)
> +Â Â Â Â Â Â Â Âreturn 0;
> +Â Â Â Â/* Try to flush any remaining requests */
> +Â Â Â Âreturn ramdisk_flush(driver, s);
> +}
> +
> Âstatic int backup_start(td_driver_t *driver)
> Â{
> Â Â Â Â struct tdremus_state *s = (struct tdremus_state *)driver->data;
> -Â Â Â Âint fd;
>
> Â Â Â Â if (ramdisk_start(driver) < 0)
> Â Â Â Â Â Â Â Â return -1;
> @@ -1201,16 +1400,15 @@ static int backup_start(td_driver_t *driver)
> Â Â Â Â tapdisk_remus.td_queue_read = backup_queue_read;
> Â Â Â Â tapdisk_remus.td_queue_write = backup_queue_write;
> Â Â Â Â s->queue_flush = server_flush;
> -Â Â Â Â/* TODO set flush function */
> Â Â Â Â return 0;
> Â}
>
> -static int server_do_wreq(td_driver_t *driver)
> +static void server_do_wreq(td_driver_t *driver)
> Â{
> Â Â Â Â struct tdremus_state *s = (struct tdremus_state *)driver->data;
> Â Â Â Â static tdremus_wire_t twreq;
> Â Â Â Â char buf[4096];
> -Â Â Â Âint len, rc;
> +Â Â Â Âint len, rc = ERROR_IO;
>
> Â Â Â Â char header[sizeof(uint32_t) + sizeof(uint64_t)];
> Â Â Â Â uint32_t *sectors = (uint32_t *) header;
> @@ -1227,39 +1425,40 @@ static int server_do_wreq(td_driver_t *driver)
> Â Â Â Â // *sector);
>
> Â Â Â Â if (len > sizeof(buf)) {
> -Â Â Â Â Â Â Â Â/* freak out! */
> -Â Â Â Â Â Â Â ÂRPRINTF("write request too large: %d/%u\n", len, (unsigned)sizeof(buf));
> -Â Â Â Â Â Â Â Âreturn -1;
> +Â Â Â Â Â Â Â Â/* freak out! How to handle the remaining data from primary */
> +Â Â Â Â Â Â Â ÂRPRINTF("write request too large: %d/%u\n",
> +Â Â Â Â Â Â Â Â Â Â Â Âlen, (unsigned)sizeof(buf));
> +Â Â Â Â Â Â Â Âgoto err;
> Â Â Â Â }
>
> Â Â Â Â if (mread(s->stream_fd.fd, buf, len) < 0)
> Â Â Â Â Â Â Â Â goto err;
>
> -Â Â Â Âif (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0)
> +Â Â Â Âif (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0) {
> +Â Â Â Â Â Â Â Ârc = ERROR_INTERNAL;
> Â Â Â Â Â Â Â Â goto err;
> +Â Â Â Â}
>
> -Â Â Â Âreturn 0;
> +Â Â Â Âreturn;
>
> Â err:
> Â Â Â Â /* should start failover */
> Â Â Â Â RPRINTF("backup write request error\n");
> -Â Â Â Âclose_stream_fd(s);
> -
> -Â Â Â Âreturn -1;
> +Â Â Â Âbackup_failed(s, rc);
> Â}
>
> -static int server_do_sreq(td_driver_t *driver)
> +static void server_do_sreq(td_driver_t *driver)
> Â{
> Â Â Â Â /*
> Â Â Â Â Â RPRINTF("submit request received\n");
> Â Â*/
>
> -Â Â Â Âreturn 0;
> +Â Â Â Âreturn;
> Â}
>
> Â/* at this point, the server can start applying the most recent
> Â * ramdisk. */
> -static int server_do_creq(td_driver_t *driver)
> +static void server_do_creq(td_driver_t *driver)
> Â{
> Â Â Â Â struct tdremus_state *s = (struct tdremus_state *)driver->data;
>
> @@ -1269,9 +1468,7 @@ static int server_do_creq(td_driver_t *driver)
>
> Â Â Â Â /* XXX this message should not be sent until flush completes! */
> Â Â Â Â if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4)
> -Â Â Â Â Â Â Â Âreturn -1;
> -
> -Â Â Â Âreturn 0;
> +Â Â Â Â Â Â Â Âbackup_failed(s, ERROR_IO);
> Â}
>
>
> @@ -1356,10 +1553,6 @@ static int unprotected_start(td_driver_t *driver)
>
> Â Â Â Â RPRINTF("failure detected, activating passthrough\n");
>
> -Â Â Â Â/* close the server socket */
> -Â Â Â Âclose_stream_fd(s);
> -
> -Â Â Â Âclose_server_fd(s);
>
> Â Â Â Â /* install the unprotected read/write handlers */
> Â Â Â Â tapdisk_remus.td_queue_read = unprotected_queue_read;
> @@ -1486,6 +1679,19 @@ static int switch_mode(td_driver_t *driver, enum tdremus_mode mode)
> Â Â Â Â return rc;
> Â}
>
> +static void ctl_reopen(struct tdremus_state *s)
> +{
> +Â Â Â Âctl_unregister(s);
> +Â Â Â ÂCLOSE_FD(s->ctl_fd.fd);
> +Â Â Â ÂRPRINTF("FIFO closed\n");
> +
> +Â Â Â Âif ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
> +Â Â Â Â Â Â Â ÂRPRINTF("error reopening FIFO: %d\n", errno);
> +Â Â Â Â Â Â Â Âreturn;
> +Â Â Â Â}
> +Â Â Â Âctl_register(s);
> +}
> +
> Âstatic void ctl_request(event_id_t id, char mode, void *private)
> Â{
> Â Â Â Â struct tdremus_state *s = (struct tdremus_state *)private;
> @@ -1497,12 +1703,6 @@ static void ctl_request(event_id_t id, char mode, void *private)
>
> Â Â Â Â if (!(rc = read(s->ctl_fd.fd, msg, sizeof(msg) - 1 /* append nul */))) {
> Â Â Â Â Â Â Â Â RPRINTF("0-byte read received, reopening FIFO\n");
> -Â Â Â Â Â Â Â Â/*TODO: we may have to unregister/re-register with tapdisk_server */
> -Â Â Â Â Â Â Â Âclose(s->ctl_fd.fd);
> -Â Â Â Â Â Â Â ÂRPRINTF("FIFO closed\n");
> -Â Â Â Â Â Â Â Âif ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
> -Â Â Â Â Â Â Â Â Â Â Â ÂRPRINTF("error reopening FIFO: %d\n", errno);
> -Â Â Â Â Â Â Â Â}
> Â Â Â Â Â Â Â Â return;
> Â Â Â Â }
>
> @@ -1641,10 +1841,11 @@ static int ctl_register(struct tdremus_state *s)
> Â Â Â Â RPRINTF("registering ctl fifo\n");
>
> Â Â Â Â /* register ctl fd */
> -Â Â Â Âs->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s);
> +Â Â Â Âs->ctl_fd.cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s);
>
> -Â Â Â Âif (s->ctl_fd.id < 0) {
> -Â Â Â Â Â Â Â ÂRPRINTF("error registering ctrl FIFO %s: %d\n", s->ctl_path, s->ctl_fd.id);
> +Â Â Â Âif (s->ctl_fd.cid < 0) {
> +Â Â Â Â Â Â Â ÂRPRINTF("error registering ctrl FIFO %s: %d\n",
> +Â Â Â Â Â Â Â Â Â Â Â Âs->ctl_path, s->ctl_fd.cid);
> Â Â Â Â Â Â Â Â return -1;
> Â Â Â Â }
>
> @@ -1655,10 +1856,7 @@ static void ctl_unregister(struct tdremus_state *s)
> Â{
> Â Â Â Â RPRINTF("unregistering ctl fifo\n");
>
> -Â Â Â Âif (s->ctl_fd.id >= 0) {
> -Â Â Â Â Â Â Â Âtapdisk_server_unregister_event(s->ctl_fd.id);
> -Â Â Â Â Â Â Â Âs->ctl_fd.id = -1;
> -Â Â Â Â}
> +Â Â Â ÂUNREGISTER_EVENT(s->ctl_fd.cid);
> Â}
>
> Â/* interface */
> --
> 1.9.3
>

_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel

 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.