[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [RFC Patch v3 15/22] blktap2: move async connect related codes to block-replication.c



   COLO will reuse them.

Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
Cc: Shriram Rajagopalan <rshriram@xxxxxxxxx>
---
 tools/blktap2/drivers/Makefile            |   2 +-
 tools/blktap2/drivers/block-remus.c       | 494 +++---------------------------
 tools/blktap2/drivers/block-replication.c | 468 ++++++++++++++++++++++++++++
 tools/blktap2/drivers/block-replication.h | 113 +++++++
 4 files changed, 630 insertions(+), 447 deletions(-)
 create mode 100644 tools/blktap2/drivers/block-replication.c
 create mode 100644 tools/blktap2/drivers/block-replication.h

diff --git a/tools/blktap2/drivers/Makefile b/tools/blktap2/drivers/Makefile
index 37c3485..3d8ed8a 100644
--- a/tools/blktap2/drivers/Makefile
+++ b/tools/blktap2/drivers/Makefile
@@ -23,7 +23,7 @@ endif
 
 VHDLIBS    := -L$(LIBVHDDIR) -lvhd
 
-REMUS-OBJS  := block-remus.o
+REMUS-OBJS  := block-remus.o block-replication.o
 REMUS-OBJS  += hashtable.o
 REMUS-OBJS  += hashtable_itr.o
 REMUS-OBJS  += hashtable_utility.o
diff --git a/tools/blktap2/drivers/block-remus.c 
b/tools/blktap2/drivers/block-remus.c
index 5d27d41..8b6f157 100644
--- a/tools/blktap2/drivers/block-remus.c
+++ b/tools/blktap2/drivers/block-remus.c
@@ -40,6 +40,7 @@
 #include "hashtable.h"
 #include "hashtable_itr.h"
 #include "hashtable_utility.h"
+#include "block-replication.h"
 
 #include <errno.h>
 #include <inttypes.h>
@@ -49,10 +50,7 @@
 #include <string.h>
 #include <sys/time.h>
 #include <sys/types.h>
-#include <sys/socket.h>
-#include <netdb.h>
 #include <netinet/in.h>
-#include <arpa/inet.h>
 #include <sys/param.h>
 #include <sys/sysctl.h>
 #include <unistd.h>
@@ -67,22 +65,6 @@
 
 #define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
 
-#define UNREGISTER_EVENT(id)                                   \
-       do {                                                    \
-               if (id >= 0) {                                  \
-                       tapdisk_server_unregister_event(id);    \
-                       id = -1;                                \
-               }                                               \
-       } while (0)
-
-#define CLOSE_FD(fd)                   \
-       do {                            \
-               if (fd >= 0) {          \
-                       close(fd);      \
-                       fd = -1;        \
-               }                       \
-       } while (0)
-
 #define MAX_REMUS_REQUEST       TAPDISK_DATA_REQUESTS
 
 enum tdremus_mode {
@@ -92,13 +74,6 @@ enum tdremus_mode {
        mode_backup
 };
 
-enum {
-       ERROR_INTERNAL = -1,
-       ERROR_IO = -2,
-       ERROR_CONNECTION = -3,
-       ERROR_CLOSE = -4,
-};
-
 struct tdremus_req {
        td_request_t treq;
 };
@@ -167,21 +142,9 @@ struct ramdisk_write_cbdata {
 
 typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
 
-/*
- * If cid, rid and wid are -1, fd must be -1. It means that
- * we are in unpritected mode or we don't start to connect
- * to backup.
- * If fd is an valid fd:
- *  cid is valid, rid and wid must be invalid. It means that
- *      the connection is in progress.
- *  cid is invalid. rid or wid must be valid. It means that
- *      the connection is established.
- */
 typedef struct poll_fd {
        int        fd;
-       event_id_t cid;
-       event_id_t rid;
-       event_id_t wid;
+       event_id_t id;
 } poll_fd_t;
 
 struct tdremus_state {
@@ -195,9 +158,7 @@ struct tdremus_state {
        char*     msg_path; /* output completion message here */
        poll_fd_t msg_fd;
 
-  /* replication host */
-       struct sockaddr_in sa;
-       poll_fd_t server_fd;    /* server listen port */
+       td_replication_connect_t t;
        poll_fd_t stream_fd;     /* replication channel */
 
        /*
@@ -777,28 +738,8 @@ static int mwrite(int fd, void* buf, size_t len)
        select(fd + 1, NULL, &wfds, NULL, &tv);
 }
 
-
-static void inline close_stream_fd(struct tdremus_state *s)
-{
-
-       UNREGISTER_EVENT(s->stream_fd.cid);
-       UNREGISTER_EVENT(s->stream_fd.rid);
-       UNREGISTER_EVENT(s->stream_fd.wid);
-
-       /* close the connection */
-       CLOSE_FD(s->stream_fd.fd);
-}
-
-static void close_server_fd(struct tdremus_state *s)
-{
-       UNREGISTER_EVENT(s->server_fd.cid);
-       CLOSE_FD(s->server_fd.fd);
-}
-
 /* primary functions */
-static void remus_client_event(event_id_t, char mode, void *private);
-static void remus_connect_event(event_id_t id, char mode, void *private);
-static void remus_retry_connect_event(event_id_t id, char mode, void *private);
+static void remus_client_event(event_id_t id, char mode, void *private);
 static int primary_forward_request(struct tdremus_state *s,
                                   const td_request_t *treq);
 
@@ -808,56 +749,15 @@ static int primary_forward_request(struct tdremus_state 
*s,
  */
 static void primary_failed(struct tdremus_state *s, int rc)
 {
-       close_stream_fd(s);
+       td_replication_connect_kill(&s->t);
        if (rc == ERROR_INTERNAL)
                RPRINTF("switch to unprotected mode due to internal error");
        if (rc == ERROR_CLOSE)
                RPRINTF("switch to unprotected mode before closing");
+       UNREGISTER_EVENT(s->stream_fd.id);
        switch_mode(s->tdremus_driver, mode_unprotected);
 }
 
-static int primary_do_connect(struct tdremus_state *state)
-{
-       event_id_t id;
-       int fd;
-       int rc;
-       int flags;
-
-       RPRINTF("client connecting to %s:%d...\n",
-               inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
-
-       if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
-               RPRINTF("could not create client socket: %d\n", errno);
-               return ERROR_INTERNAL;
-       }
-       state->stream_fd.fd = fd;
-
-       /* make socket nonblocking */
-       if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
-               flags = 0;
-       if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
-               RPRINTF("error setting fd %d to non block mode\n", fd);
-               return ERROR_INTERNAL;
-       }
-
-       /*
-        * once we have created the socket and populated the address,
-        * we can now start our non-blocking connect. rather than
-        * duplicating code we trigger a timeout on the socket fd,
-        * which calls out nonblocking connect code
-        */
-       if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0,
-                                              remus_retry_connect_event,
-                                              state)) < 0) {
-               RPRINTF("error registering timeout client connection event 
handler: %s\n",
-                       strerror(id));
-               return ERROR_INTERNAL;
-       }
-
-       state->stream_fd.cid = id;
-       return 0;
-}
-
 static int remus_handle_queued_io(struct tdremus_state *s)
 {
        struct req_ring *queued_io = &s->queued_io;
@@ -882,184 +782,35 @@ static int remus_handle_queued_io(struct tdremus_state 
*s)
        return 0;
 }
 
-static int remus_connection_done(struct tdremus_state *s)
+static void remus_client_established(td_replication_connect_t *t, int rc)
 {
+       struct tdremus_state *s = CONTAINER_OF(t, *s, t);
        event_id_t id;
 
-       /* the connect succeeded */
-       /* unregister this function and register a new event handler */
-       tapdisk_server_unregister_event(s->stream_fd.cid);
-       s->stream_fd.cid = -1;
+       if (rc) {
+               primary_failed(s, rc);
+               return;
+       }
 
-       id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, 
s->stream_fd.fd,
+       /* the connect succeeded */
+       id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd,
                                           0, remus_client_event, s);
        if(id < 0) {
                RPRINTF("error registering client event handler: %s\n",
                        strerror(id));
-               return ERROR_INTERNAL;
-       }
-       s->stream_fd.rid = id;
-
-       /* handle the queued requests */
-       return remus_handle_queued_io(s);
-}
-
-static int remus_retry_connect(struct tdremus_state *s)
-{
-       event_id_t id;
-
-       tapdisk_server_unregister_event(s->stream_fd.cid);
-       s->stream_fd.cid = -1;
-
-       RPRINTF("connect to backup 1 second later");
-       id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
-                                          s->stream_fd.fd,
-                                          REMUS_CONNRETRY_TIMEOUT,
-                                          remus_retry_connect_event, s);
-       if (id < 0) {
-               RPRINTF("error registering timeout client connection event 
handler: %s\n",
-                       strerror(id));
-               return ERROR_INTERNAL;
-       }
-
-       s->stream_fd.cid = id;
-       return 0;
-}
-
-static int remus_wait_connect_done(struct tdremus_state *s)
-{
-       event_id_t id;
-
-       tapdisk_server_unregister_event(s->stream_fd.cid);
-       s->stream_fd.cid = -1;
-
-       id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
-                                          s->stream_fd.fd, 0,
-                                          remus_connect_event, s);
-       if (id < 0) {
-               RPRINTF("error registering client connection event handler: 
%s\n",
-                       strerror(id));
-               return ERROR_INTERNAL;
-       }
-       s->stream_fd.cid = id;
-
-       return 0;
-}
-
-/* return 1 if we need to reconnect to backup */
-static int check_connect_errno(int err)
-{
-       /*
-        * The fd is non-block, so we will not get ETIMEDOUT
-        * after calling connect(). We only can get this errno
-        * by getsockopt().
-        */
-       if (err == ECONNREFUSED || err == ENETUNREACH ||
-           err == EAGAIN || err == ECONNABORTED ||
-           err == ETIMEDOUT)
-           return 1;
-
-       return 0;
-}
-
-static void remus_retry_connect_event(event_id_t id, char mode, void *private)
-{
-       struct tdremus_state *s = (struct tdremus_state *)private;
-       int rc, ret;
-
-       /* do a non-blocking connect */
-       ret = connect(s->stream_fd.fd,
-                     (struct sockaddr *)&s->sa,
-                     sizeof(s->sa));
-       if (ret) {
-               if (errno == EINPROGRESS) {
-                       /*
-                        * the connect returned EINPROGRESS (nonblocking
-                        * connect) we must wait for the fd to be writeable
-                        * to determine if the connect worked
-                        */
-                       rc = remus_wait_connect_done(s);
-                       if (rc)
-                               goto fail;
-                       return;
-               }
-
-               if (check_connect_errno(errno)) {
-                       rc = remus_retry_connect(s);
-                       if (rc)
-                               goto fail;
-                       return;
-               }
-
-               /* not recoverable */
-               RPRINTF("error connection to server %s\n", strerror(errno));
-               rc = ERROR_CONNECTION;
-               goto fail;
-       }
-
-       /* The connection is established unexpectedly */
-       rc = remus_connection_done(s);
-       if (rc)
-               goto fail;
-
-       return;
-
-fail:
-       primary_failed(s, rc);
-       return;
-}
-
-/* callback when nonblocking connect() is finished */
-static void remus_connect_event(event_id_t id, char mode, void *private)
-{
-       int socket_errno;
-       socklen_t socket_errno_size;
-       struct tdremus_state *s = (struct tdremus_state *)private;
-       int rc;
-
-       /* check to see if the connect succeeded */
-       socket_errno_size = sizeof(socket_errno);
-       if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR,
-                      &socket_errno, &socket_errno_size)) {
-               RPRINTF("error getting socket errno\n");
+               primary_failed(s, ERROR_INTERNAL);
                return;
        }
 
-       RPRINTF("socket connect returned %d\n", socket_errno);
+       s->stream_fd.fd = t->fd;
+       s->stream_fd.id = id;
 
-       if (socket_errno) {
-               /* the connect did not succeed */
-               if (check_connect_errno(socket_errno)) {
-                       /*
-                        * we can probably assume that the backup is down.
-                        * just try again later
-                        */
-                       rc = remus_retry_connect(s);
-                       if (rc)
-                               goto fail;
-
-                       return;
-               } else {
-                       RPRINTF("socket connect returned %d, giving up\n",
-                               socket_errno);
-                       rc = ERROR_CONNECTION;
-                       goto fail;
-               }
-
-               return;
-       }
-
-       rc = remus_connection_done(s);
+       /* handle the queued requests */
+       rc = remus_handle_queued_io(s);
        if (rc)
-               goto fail;
-
-       return;
-
-fail:
-       primary_failed(s, rc);
+               primary_failed(s, rc);
 }
 
-
 /*
  * we install this event handler on the primary once we have
  * connected to the backup.
@@ -1142,19 +893,21 @@ static int primary_forward_request(struct tdremus_state 
*s,
 static void primary_queue_write(td_driver_t *driver, td_request_t treq)
 {
        struct tdremus_state *s = (struct tdremus_state *)driver->data;
-       int rc;
+       int rc, ret;
 
        // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
 
-       if(s->stream_fd.fd < 0) {
+       ret = td_replication_connect_status(&s->t);
+       if(ret == -1) {
                RPRINTF("connecting to backup...\n");
-               rc = primary_do_connect(s);
+               s->t.callback = remus_client_established;
+               rc = td_replication_client_start(&s->t);
                if (rc)
                        goto fail;
        }
 
        /* The connection is not established, just queue the request */
-       if (s->stream_fd.cid >= 0) {
+       if (ret != 1) {
                ring_add_request(&s->queued_io, &treq);
                return;
        }
@@ -1227,9 +980,7 @@ static int primary_start(td_driver_t *driver)
        s->queue_flush = primary_flush;
 
        s->stream_fd.fd = -1;
-       s->stream_fd.cid = -1;
-       s->stream_fd.rid = -1;
-       s->stream_fd.wid = -1;
+       s->stream_fd.id = -1;
 
        return 0;
 }
@@ -1240,100 +991,32 @@ static void remus_server_event(event_id_t id, char 
mode, void *private);
 /* It is called when we find some I/O error */
 static void backup_failed(struct tdremus_state *s, int rc)
 {
-       close_stream_fd(s);
-       close_server_fd(s);
+       td_replication_connect_kill(&s->t);
        /* We will switch to unprotected mode in backup_queue_write() */
 }
 
 /* returns the socket that receives write requests */
-static void remus_server_accept(event_id_t id, char mode, void* private)
+static void remus_server_established(td_replication_connect_t *t, int rc)
 {
-       struct tdremus_state* s = (struct tdremus_state *) private;
-
-       int stream_fd;
-
-       /* XXX: add address-based black/white list */
-       if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
-               RPRINTF("error accepting connection: %d\n", errno);
-               return;
-       }
+       struct tdremus_state *s = CONTAINER_OF(t, *s, t);
+       event_id_t id;
 
-       /*
-        * TODO: check to see if we are already replicating.
-        * if so just close the connection (or do something
-        * smarter)
-        */
-       RPRINTF("server accepted connection\n");
+       /* rc is always 0 */
 
        /* add tapdisk event for replication stream */
-       id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0,
+       id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd, 0,
                                           remus_server_event, s);
 
        if (id < 0) {
                RPRINTF("error registering connection event handler: %s\n",
                        strerror(errno));
-               close(stream_fd);
+               td_replication_server_restart(t);
                return;
        }
 
        /* store replication file descriptor */
-       s->stream_fd.fd = stream_fd;
-       s->stream_fd.rid = id;
-}
-
-/* returns -2 if EADDRNOTAVAIL */
-static int remus_bind(struct tdremus_state* s)
-{
-       int opt;
-       int rc = -1;
-       event_id_t id;
-
-       if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-               RPRINTF("could not create server socket: %d\n", errno);
-               return rc;
-       }
-
-       opt = 1;
-       if (setsockopt(s->server_fd.fd, SOL_SOCKET,
-                      SO_REUSEADDR, &opt, sizeof(opt)) < 0)
-               RPRINTF("Error setting REUSEADDR on %d: %d\n",
-                       s->server_fd.fd, errno);
-
-       if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa,
-                sizeof(s->sa)) < 0) {
-               RPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
-                       s->server_fd.fd, inet_ntoa(s->sa.sin_addr),
-                       ntohs(s->sa.sin_port), errno, strerror(errno));
-               if (errno == EADDRNOTAVAIL)
-                       rc = -2;
-               goto err_sfd;
-       }
-
-       if (listen(s->server_fd.fd, 10)) {
-               RPRINTF("could not listen on socket: %d\n", errno);
-               goto err_sfd;
-       }
-
-       /*
-        * The socket s now bound to the address and listening so we
-        * may now register the fd with tapdisk
-        */
-       id =  tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
-                                           s->server_fd.fd, 0,
-                                           remus_server_accept, s);
-       if (id < 0) {
-               RPRINTF("error registering server connection event handler: %s",
-                       strerror(id));
-               goto err_sfd;
-       }
-       s->server_fd.cid = id;
-
-       return 0;
-
-err_sfd:
-       CLOSE_FD(s->server_fd.fd);
-
-       return rc;
+       s->stream_fd.fd = t->fd;
+       s->stream_fd.id = id;
 }
 
 /* wait for latest checkpoint to be applied */
@@ -1566,90 +1249,6 @@ static int unprotected_start(td_driver_t *driver)
 
 
 /* control */
-
-static inline int resolve_address(const char* addr, struct in_addr* ia)
-{
-       struct hostent* he;
-       uint32_t ip;
-
-       if (!(he = gethostbyname(addr))) {
-               RPRINTF("error resolving %s: %d\n", addr, h_errno);
-               return -1;
-       }
-
-       if (!he->h_addr_list[0]) {
-               RPRINTF("no address found for %s\n", addr);
-               return -1;
-       }
-
-       /* network byte order */
-       ip = *((uint32_t**)he->h_addr_list)[0];
-       ia->s_addr = ip;
-
-       return 0;
-}
-
-static int get_args(td_driver_t *driver, const char* name)
-{
-       struct tdremus_state *state = (struct tdremus_state *)driver->data;
-       char* host;
-       char* port;
-//  char* driver_str;
-//  char* parent;
-//  int type;
-//  char* path;
-//  unsigned long ulport;
-//  int i;
-//  struct sockaddr_in server_addr_in;
-
-       int gai_status;
-       int valid_addr;
-       struct addrinfo gai_hints;
-       struct addrinfo *servinfo, *servinfo_itr;
-
-       memset(&gai_hints, 0, sizeof gai_hints);
-       gai_hints.ai_family = AF_UNSPEC;
-       gai_hints.ai_socktype = SOCK_STREAM;
-
-       port = strchr(name, ':');
-       if (!port) {
-               RPRINTF("missing host in %s\n", name);
-               return -ENOENT;
-       }
-       if (!(host = strndup(name, port - name))) {
-               RPRINTF("unable to allocate host\n");
-               return -ENOMEM;
-       }
-       port++;
-
-       if ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo)) != 0) 
{
-               RPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
-               return -ENOENT;
-       }
-
-       /* TODO: do something smarter here */
-       valid_addr = 0;
-       for(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr = 
servinfo_itr->ai_next) {
-               void *addr;
-               char *ipver;
-
-               if (servinfo_itr->ai_family == AF_INET) {
-                       valid_addr = 1;
-                       memset(&state->sa, 0, sizeof(state->sa));
-                       state->sa = *(struct sockaddr_in 
*)servinfo_itr->ai_addr;
-                       break;
-               }
-       }
-       freeaddrinfo(servinfo);
-
-       if (!valid_addr)
-               return -ENOENT;
-
-       RPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr), 
ntohs(state->sa.sin_port));
-
-       return 0;
-}
-
 static int switch_mode(td_driver_t *driver, enum tdremus_mode mode)
 {
        struct tdremus_state *s = (struct tdremus_state *)driver->data;
@@ -1844,11 +1443,11 @@ static int ctl_register(struct tdremus_state *s)
        RPRINTF("registering ctl fifo\n");
 
        /* register ctl fd */
-       s->ctl_fd.cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, 
s->ctl_fd.fd, 0, ctl_request, s);
+       s->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, 
s->ctl_fd.fd, 0, ctl_request, s);
 
-       if (s->ctl_fd.cid < 0) {
+       if (s->ctl_fd.id < 0) {
                RPRINTF("error registering ctrl FIFO %s: %d\n",
-                       s->ctl_path, s->ctl_fd.cid);
+                       s->ctl_path, s->ctl_fd.id);
                return -1;
        }
 
@@ -1859,7 +1458,7 @@ static void ctl_unregister(struct tdremus_state *s)
 {
        RPRINTF("unregistering ctl fifo\n");
 
-       UNREGISTER_EVENT(s->ctl_fd.cid);
+       UNREGISTER_EVENT(s->ctl_fd.id);
 }
 
 /* interface */
@@ -1867,6 +1466,7 @@ static void ctl_unregister(struct tdremus_state *s)
 static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
 {
        struct tdremus_state *s = (struct tdremus_state *)driver->data;
+       td_replication_connect_t *t = &s->t;
        int rc;
        const char *name = image->name;
        td_flag_t flags = image->flags;
@@ -1877,7 +1477,6 @@ static int tdremus_open(td_driver_t *driver, td_image_t 
*image, td_uuid_t uuid)
        remus_image = image;
 
        memset(s, 0, sizeof(*s));
-       s->server_fd.fd = -1;
        s->stream_fd.fd = -1;
        s->ctl_fd.fd = -1;
        s->msg_fd.fd = -1;
@@ -1886,8 +1485,12 @@ static int tdremus_open(td_driver_t *driver, td_image_t 
*image, td_uuid_t uuid)
         * the driver stack from the stream_fd event handler */
        s->tdremus_driver = driver;
 
+       t->log_prefix = "remus";
+       t->retry_timeout_s = REMUS_CONNRETRY_TIMEOUT;
+       t->max_connections = 10;
+       t->callback = remus_server_established;
        /* parse name to get info etc */
-       if ((rc = get_args(driver, name)))
+       if ((rc = td_replication_connect_init(t, name)))
                return rc;
 
        if ((rc = ctl_open(driver, name))) {
@@ -1901,7 +1504,7 @@ static int tdremus_open(td_driver_t *driver, td_image_t 
*image, td_uuid_t uuid)
                return rc;
        }
 
-       if (!(rc = remus_bind(s)))
+       if (!(rc = td_replication_server_start(t)))
                rc = switch_mode(driver, mode_backup);
        else if (rc == -2)
                rc = switch_mode(driver, mode_primary);
@@ -1932,8 +1535,7 @@ static int tdremus_close(td_driver_t *driver)
        if (s->ramdisk.inprogress)
                hashtable_destroy(s->ramdisk.inprogress, 0);
 
-       close_server_fd(s);
-       close_stream_fd(s);
+       td_replication_connect_kill(&s->t);
        ctl_unregister(s);
        ctl_close(s);
 
diff --git a/tools/blktap2/drivers/block-replication.c 
b/tools/blktap2/drivers/block-replication.c
new file mode 100644
index 0000000..e4b2679
--- /dev/null
+++ b/tools/blktap2/drivers/block-replication.c
@@ -0,0 +1,468 @@
+/*
+ * Copyright (C) 2014 FUJITSU LIMITED
+ * Author: Wen Congyang <wency@xxxxxxxxxxxxxx>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; version 2.1 only. with the special
+ * exception on linking described in file LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Lesser General Public License for more details.
+ */
+
+#include "tapdisk-server.h"
+#include "block-replication.h"
+
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <syslog.h>
+#include <stdlib.h>
+#include <arpa/inet.h>
+
+#undef DPRINTF
+#undef EPRINTF
+#define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _f, log_prefix, ## _a)
+#define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f, log_prefix, ## _a)
+
+/* connection status */
+enum {
+       connection_none,
+       connection_in_progress,
+       connection_established,
+       connection_closed,
+};
+
+/* common functions */
+/* args should be host:port */
+static int get_args(td_replication_connect_t *t, const char* name)
+{
+       char* host;
+       const char* port;
+       int gai_status;
+       int valid_addr;
+       struct addrinfo gai_hints;
+       struct addrinfo *servinfo, *servinfo_itr;
+       const char *log_prefix = t->log_prefix;
+
+       memset(&gai_hints, 0, sizeof gai_hints);
+       gai_hints.ai_family = AF_UNSPEC;
+       gai_hints.ai_socktype = SOCK_STREAM;
+
+       port = strchr(name, ':');
+       if (!port) {
+               EPRINTF("missing host in %s\n", name);
+               return -ENOENT;
+       }
+       if (!(host = strndup(name, port - name))) {
+               EPRINTF("unable to allocate host\n");
+               return -ENOMEM;
+       }
+       port++;
+       if ((gai_status = getaddrinfo(host, port,
+                                     &gai_hints, &servinfo)) != 0) {
+               EPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
+               free(host);
+               return -ENOENT;
+       }
+       free(host);
+
+       /* TODO: do something smarter here */
+       valid_addr = 0;
+       for (servinfo_itr = servinfo; servinfo_itr != NULL;
+            servinfo_itr = servinfo_itr->ai_next) {
+               if (servinfo_itr->ai_family == AF_INET) {
+                       valid_addr = 1;
+                       memset(&t->sa, 0, sizeof(t->sa));
+                       t->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr;
+                       break;
+               }
+       }
+       freeaddrinfo(servinfo);
+
+       if (!valid_addr)
+               return -ENOENT;
+
+       DPRINTF("host: %s, port: %d\n", inet_ntoa(t->sa.sin_addr),
+               ntohs(t->sa.sin_port));
+
+       return 0;
+}
+
+int td_replication_connect_init(td_replication_connect_t *t, const char *name)
+{
+       int rc;
+
+       rc = get_args(t, name);
+       if (rc)
+               return rc;
+
+       t->listen_fd = -1;
+       t->id = -1;
+       t->status = connection_none;
+       return 0;
+}
+
+int td_replication_connect_status(td_replication_connect_t *t)
+{
+       const char *log_prefix = t->log_prefix;
+
+       switch (t->status) {
+       case connection_none:
+       case connection_closed:
+               return -1;
+       case connection_in_progress:
+               return 0;
+       case connection_established:
+               return 1;
+       default:
+               EPRINTF("td_replication_connect is corruptted\n");
+               return -2;
+       }
+}
+
+void td_replication_connect_kill(td_replication_connect_t *t)
+{
+       if (t->status != connection_in_progress &&
+           t->status != connection_established)
+               return;
+
+       UNREGISTER_EVENT(t->id);
+       CLOSE_FD(t->fd);
+       CLOSE_FD(t->listen_fd);
+       t->status = connection_closed;
+}
+
+/* server */
+static void td_replication_server_accept(event_id_t id, char mode,
+                                        void *private);
+
+int td_replication_server_start(td_replication_connect_t *t)
+{
+       int opt;
+       int rc = -1;
+       event_id_t id;
+       int fd;
+       const char *log_prefix = t->log_prefix;
+
+       if (t->status == connection_in_progress ||
+           t->status == connection_established)
+               return rc;
+
+       fd = socket(AF_INET, SOCK_STREAM, 0);
+       if (fd < 0) {
+               EPRINTF("could not create server socket: %d\n", errno);
+               return rc;
+       }
+
+       opt = 1;
+       if (setsockopt(fd, SOL_SOCKET,
+                      SO_REUSEADDR, &opt, sizeof(opt)) < 0)
+               DPRINTF("Error setting REUSEADDR on %d: %d\n", fd, errno);
+
+       if (bind(fd, (struct sockaddr *)&t->sa, sizeof(t->sa)) < 0) {
+               DPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
+                       fd, inet_ntoa(t->sa.sin_addr),
+                       ntohs(t->sa.sin_port), errno, strerror(errno));
+               if (errno == EADDRNOTAVAIL)
+                       rc = -2;
+               goto err;
+       }
+
+       if (listen(fd, t->max_connections)) {
+               EPRINTF("could not listen on socket: %d\n", errno);
+               goto err;
+       }
+
+       /*
+        * The socket is now bound to the address and listening so we
+        * may now register the fd with tapdisk
+        */
+       id =  tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+                                           fd, 0,
+                                           td_replication_server_accept, t);
+       if (id < 0) {
+               EPRINTF("error registering server connection event handler: %s",
+                       strerror(id));
+               goto err;
+       }
+       t->listen_fd = fd;
+       t->id = id;
+       t->status = connection_in_progress;
+
+       return 0;
+
+err:
+       close(fd);
+       return rc;
+}
+
+static void td_replication_server_accept(event_id_t id, char mode,
+                                        void *private)
+{
+       td_replication_connect_t *t = private;
+       int fd;
+       const char *log_prefix = t->log_prefix;
+
+       /* XXX: add address-based black/white list */
+       fd = accept(t->listen_fd, NULL, NULL);
+       if (fd < 0) {
+               EPRINTF("error accepting connection: %d\n", errno);
+               return;
+       }
+
+       if (t->status == connection_established) {
+               EPRINTF("connection is already established\n");
+               close(fd);
+               return;
+       }
+
+       DPRINTF("server accepted connection\n");
+       t->fd = fd;
+       t->status = connection_established;
+       t->callback(t, 0);
+}
+
+int td_replication_server_restart(td_replication_connect_t *t)
+{
+       switch (t->status) {
+       case connection_in_progress:
+               return 0;
+       case connection_established:
+               CLOSE_FD(t->fd);
+               t->status = connection_in_progress;
+               return 0;
+       case connection_none:
+       case connection_closed:
+               return td_replication_server_start(t);
+       default:
+               /* not reached */
+               return -1;
+       }
+}
+
+/* client */
+static void td_replication_retry_connect_event(event_id_t id, char mode,
+                                              void *private);
+static void td_replication_connect_event(event_id_t id, char mode,
+                                        void *private);
+int td_replication_client_start(td_replication_connect_t *t)
+{
+       event_id_t id;
+       int fd;
+       int rc;
+       int flags;
+       const char *log_prefix = t->log_prefix;
+
+       if (t->status == connection_in_progress ||
+           t->status == connection_established)
+               return ERROR_INTERNAL;
+
+       DPRINTF("client connecting to %s:%d...\n",
+               inet_ntoa(t->sa.sin_addr), ntohs(t->sa.sin_port));
+
+       if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+               EPRINTF("could not create client socket: %d\n", errno);
+               return ERROR_INTERNAL;
+       }
+
+       /* make socket nonblocking */
+       if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
+               flags = 0;
+       if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
+               EPRINTF("error setting fd %d to non block mode\n", fd);
+               goto err;
+       }
+
+       /*
+        * once we have created the socket and populated the address,
+        * we can now start our non-blocking connect. rather than
+        * duplicating code we trigger a timeout on the socket fd,
+        * which calls out nonblocking connect code
+        */
+       id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0,
+                                          td_replication_retry_connect_event,
+                                          t);
+       if(id < 0) {
+               EPRINTF("error registering timeout client connection event 
handler: %s\n",
+                       strerror(id));
+               goto err;
+       }
+
+       t->fd = fd;
+       t->id = id;
+       t->status = connection_in_progress;
+       return 0;
+
+err:
+       close(fd);
+       return ERROR_INTERNAL;
+}
+
+static void td_replication_client_failed(td_replication_connect_t *t, int rc)
+{
+       td_replication_connect_kill(t);
+       t->callback(t, rc);
+}
+
+static void td_replication_client_done(td_replication_connect_t *t)
+{
+       UNREGISTER_EVENT(t->id);
+       t->status = connection_established;
+       t->callback(t, 0);
+}
+
+static int td_replication_retry_connect(td_replication_connect_t *t)
+{
+       event_id_t id;
+       const char *log_prefix = t->log_prefix;
+
+       UNREGISTER_EVENT(t->id);
+
+       DPRINTF("connect to server 1 second later");
+       id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
+                                          t->fd, t->retry_timeout_s,
+                                          td_replication_retry_connect_event,
+                                          t);
+       if (id < 0) {
+               EPRINTF("error registering timeout client connection event 
handler: %s\n",
+                       strerror(id));
+               return ERROR_INTERNAL;
+       }
+
+       t->id = id;
+       return 0;
+}
+
+static int td_replication_wait_connect_done(td_replication_connect_t *t)
+{
+       event_id_t id;
+       const char *log_prefix = t->log_prefix;
+
+       UNREGISTER_EVENT(t->id);
+
+       id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
+                                          t->fd, 0,
+                                          td_replication_connect_event, t);
+       if (id < 0) {
+               EPRINTF("error registering client connection event handler: 
%s\n",
+                       strerror(id));
+               return ERROR_INTERNAL;
+       }
+       t->id = id;
+
+       return 0;
+}
+
+/* return 1 if we need to reconnect to backup server */
+static int check_connect_errno(int err)
+{
+       /*
+        * The fd is non-block, so we will not get ETIMEDOUT
+        * after calling connect(). We only can get this errno
+        * by getsockopt().
+        */
+       if (err == ECONNREFUSED || err == ENETUNREACH ||
+           err == EAGAIN || err == ECONNABORTED ||
+           err == ETIMEDOUT)
+           return 1;
+
+       return 0;
+}
+
+static void td_replication_retry_connect_event(event_id_t id, char mode,
+                                              void *private)
+{
+       td_replication_connect_t *t = private;
+       int rc, ret;
+       const char *log_prefix = t->log_prefix;
+
+       /* do a non-blocking connect */
+       ret = connect(t->fd, (struct sockaddr *)&t->sa, sizeof(t->sa));
+       if (ret) {
+               if (errno == EINPROGRESS) {
+                       /*
+                        * the connect returned EINPROGRESS (nonblocking
+                        * connect) we must wait for the fd to be writeable
+                        * to determine if the connect worked
+                        */
+                       rc = td_replication_wait_connect_done(t);
+                       if (rc)
+                               goto fail;
+                       return;
+               }
+
+               if (check_connect_errno(errno)) {
+                       rc = td_replication_retry_connect(t);
+                       if (rc)
+                               goto fail;
+                       return;
+               }
+
+               /* not recoverable */
+               EPRINTF("error connection to server %s\n", strerror(errno));
+               rc = ERROR_CONNECTION;
+               goto fail;
+       }
+
+       /* The connection is established unexpectedly */
+       td_replication_client_done(t);
+
+       return;
+
+fail:
+       td_replication_client_failed(t, rc);
+}
+
+/* callback when nonblocking connect() is finished */
+static void td_replication_connect_event(event_id_t id, char mode,
+                                        void *private)
+{
+       int socket_errno;
+       socklen_t socket_errno_size;
+       td_replication_connect_t *t = private;
+       int rc;
+       const char *log_prefix = t->log_prefix;
+
+       /* check to see if the connect succeeded */
+       socket_errno_size = sizeof(socket_errno);
+       if (getsockopt(t->fd, SOL_SOCKET, SO_ERROR,
+                      &socket_errno, &socket_errno_size)) {
+               EPRINTF("error getting socket errno\n");
+               return;
+       }
+
+       DPRINTF("socket connect returned %d\n", socket_errno);
+
+       if (socket_errno) {
+               /* the connect did not succeed */
+               if (check_connect_errno(socket_errno)) {
+                       /*
+                        * we can probably assume that the backup is down.
+                        * just try again later
+                        */
+                       rc = td_replication_retry_connect(t);
+                       if (rc)
+                               goto fail;
+
+                       return;
+               } else {
+                       EPRINTF("socket connect returned %d, giving up\n",
+                               socket_errno);
+                       rc = ERROR_CONNECTION;
+                       goto fail;
+               }
+       }
+
+       td_replication_client_done(t);
+
+       return;
+
+fail:
+       td_replication_client_failed(t, rc);
+}
diff --git a/tools/blktap2/drivers/block-replication.h 
b/tools/blktap2/drivers/block-replication.h
new file mode 100644
index 0000000..0bd6e71
--- /dev/null
+++ b/tools/blktap2/drivers/block-replication.h
@@ -0,0 +1,113 @@
+/*
+ * Copyright (C) 2014 FUJITSU LIMITED
+ * Author: Wen Congyang <wency@xxxxxxxxxxxxxx>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; version 2.1 only. with the special
+ * exception on linking described in file LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Lesser General Public License for more details.
+ */
+
+#ifndef BLOCK_REPLICATION_H
+#define BLOCK_REPLICATION_H
+
+#include "scheduler.h"
+#include <sys/socket.h>
+#include <netdb.h>
+
+#define CONTAINER_OF(inner_ptr, outer, member_name)                    \
+       ({                                                              \
+               typeof(outer) *container_of_;                           \
+               container_of_ = (void*)((char*)(inner_ptr) -            \
+                               offsetof(typeof(outer), member_name));  \
+               (void)(&container_of_->member_name ==                   \
+                      (typeof(inner_ptr))0) /* type check */;          \
+               container_of_;                                          \
+       })
+
+#define UNREGISTER_EVENT(id)                                   \
+       do {                                                    \
+               if (id >= 0) {                                  \
+                       tapdisk_server_unregister_event(id);    \
+                       id = -1;                                \
+               }                                               \
+       } while (0)
+#define CLOSE_FD(fd)                   \
+       do {                            \
+               if (fd >= 0) {          \
+                       close(fd);      \
+                       fd = -1;        \
+               }                       \
+       } while (0)
+
+enum {
+       ERROR_INTERNAL = -1,
+       ERROR_IO = -2,
+       ERROR_CONNECTION = -3,
+       ERROR_CLOSE = -4,
+};
+
+typedef struct td_replication_connect td_replication_connect_t;
+typedef void td_replication_callback(td_replication_connect_t *r, int rc);
+
+struct td_replication_connect {
+       /*
+        * caller must fill these in before calling
+        * td_replication_connect_init()
+        */
+       const char *log_prefix;
+       td_replication_callback *callback;
+       int retry_timeout_s;
+       int max_connections;
+       /*
+        * The caller uses this fd to read/write after
+        * the connection is established
+        */
+       int fd;
+
+       /* private */
+       struct sockaddr_in sa;
+       int listen_fd;
+       event_id_t id;
+
+       int status;
+};
+
+/* return -errno if failure happened, otherwise return 0 */
+int td_replication_connect_init(td_replication_connect_t *t, const char *name);
+/*
+ * Return value:
+ *   -1: connection is closed or not connected
+ *    0: connection is in progress
+ *    1: connection is established
+ */
+int td_replication_connect_status(td_replication_connect_t *t);
+void td_replication_connect_kill(td_replication_connect_t *t);
+
+/*
+ * Return value:
+ *   -2: this caller should be client
+ *   -1: error
+ *    0: connection is in progress
+ */
+int td_replication_server_start(td_replication_connect_t *t);
+/*
+ * Return value:
+ *   -2: this caller should be client
+ *   -1: error
+ *    0: connection is in progress
+ */
+int td_replication_server_restart(td_replication_connect_t *t);
+/*
+ * Return value:
+ *   -1: error
+ *    0: connection is in progress
+ */
+int td_replication_client_start(td_replication_connect_t *t);
+
+#endif
-- 
1.9.3


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.