[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [RFC Patch v3 15/22] blktap2: move async connect related codes to block-replication.c
COLO will reuse them. Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx> Cc: Shriram Rajagopalan <rshriram@xxxxxxxxx> --- tools/blktap2/drivers/Makefile | 2 +- tools/blktap2/drivers/block-remus.c | 494 +++--------------------------- tools/blktap2/drivers/block-replication.c | 468 ++++++++++++++++++++++++++++ tools/blktap2/drivers/block-replication.h | 113 +++++++ 4 files changed, 630 insertions(+), 447 deletions(-) create mode 100644 tools/blktap2/drivers/block-replication.c create mode 100644 tools/blktap2/drivers/block-replication.h diff --git a/tools/blktap2/drivers/Makefile b/tools/blktap2/drivers/Makefile index 37c3485..3d8ed8a 100644 --- a/tools/blktap2/drivers/Makefile +++ b/tools/blktap2/drivers/Makefile @@ -23,7 +23,7 @@ endif VHDLIBS := -L$(LIBVHDDIR) -lvhd -REMUS-OBJS := block-remus.o +REMUS-OBJS := block-remus.o block-replication.o REMUS-OBJS += hashtable.o REMUS-OBJS += hashtable_itr.o REMUS-OBJS += hashtable_utility.o diff --git a/tools/blktap2/drivers/block-remus.c b/tools/blktap2/drivers/block-remus.c index 5d27d41..8b6f157 100644 --- a/tools/blktap2/drivers/block-remus.c +++ b/tools/blktap2/drivers/block-remus.c @@ -40,6 +40,7 @@ #include "hashtable.h" #include "hashtable_itr.h" #include "hashtable_utility.h" +#include "block-replication.h" #include <errno.h> #include <inttypes.h> @@ -49,10 +50,7 @@ #include <string.h> #include <sys/time.h> #include <sys/types.h> -#include <sys/socket.h> -#include <netdb.h> #include <netinet/in.h> -#include <arpa/inet.h> #include <sys/param.h> #include <sys/sysctl.h> #include <unistd.h> @@ -67,22 +65,6 @@ #define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a) -#define UNREGISTER_EVENT(id) \ - do { \ - if (id >= 0) { \ - tapdisk_server_unregister_event(id); \ - id = -1; \ - } \ - } while (0) - -#define CLOSE_FD(fd) \ - do { \ - if (fd >= 0) { \ - close(fd); \ - fd = -1; \ - } \ - } while (0) - #define MAX_REMUS_REQUEST TAPDISK_DATA_REQUESTS enum tdremus_mode { @@ -92,13 +74,6 @@ enum tdremus_mode { mode_backup }; -enum { - ERROR_INTERNAL = -1, - ERROR_IO = -2, - ERROR_CONNECTION = -3, - ERROR_CLOSE = -4, -}; - struct tdremus_req { td_request_t treq; }; @@ -167,21 +142,9 @@ struct ramdisk_write_cbdata { typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq); -/* - * If cid, rid and wid are -1, fd must be -1. It means that - * we are in unpritected mode or we don't start to connect - * to backup. - * If fd is an valid fd: - * cid is valid, rid and wid must be invalid. It means that - * the connection is in progress. - * cid is invalid. rid or wid must be valid. It means that - * the connection is established. - */ typedef struct poll_fd { int fd; - event_id_t cid; - event_id_t rid; - event_id_t wid; + event_id_t id; } poll_fd_t; struct tdremus_state { @@ -195,9 +158,7 @@ struct tdremus_state { char* msg_path; /* output completion message here */ poll_fd_t msg_fd; - /* replication host */ - struct sockaddr_in sa; - poll_fd_t server_fd; /* server listen port */ + td_replication_connect_t t; poll_fd_t stream_fd; /* replication channel */ /* @@ -777,28 +738,8 @@ static int mwrite(int fd, void* buf, size_t len) select(fd + 1, NULL, &wfds, NULL, &tv); } - -static void inline close_stream_fd(struct tdremus_state *s) -{ - - UNREGISTER_EVENT(s->stream_fd.cid); - UNREGISTER_EVENT(s->stream_fd.rid); - UNREGISTER_EVENT(s->stream_fd.wid); - - /* close the connection */ - CLOSE_FD(s->stream_fd.fd); -} - -static void close_server_fd(struct tdremus_state *s) -{ - UNREGISTER_EVENT(s->server_fd.cid); - CLOSE_FD(s->server_fd.fd); -} - /* primary functions */ -static void remus_client_event(event_id_t, char mode, void *private); -static void remus_connect_event(event_id_t id, char mode, void *private); -static void remus_retry_connect_event(event_id_t id, char mode, void *private); +static void remus_client_event(event_id_t id, char mode, void *private); static int primary_forward_request(struct tdremus_state *s, const td_request_t *treq); @@ -808,56 +749,15 @@ static int primary_forward_request(struct tdremus_state *s, */ static void primary_failed(struct tdremus_state *s, int rc) { - close_stream_fd(s); + td_replication_connect_kill(&s->t); if (rc == ERROR_INTERNAL) RPRINTF("switch to unprotected mode due to internal error"); if (rc == ERROR_CLOSE) RPRINTF("switch to unprotected mode before closing"); + UNREGISTER_EVENT(s->stream_fd.id); switch_mode(s->tdremus_driver, mode_unprotected); } -static int primary_do_connect(struct tdremus_state *state) -{ - event_id_t id; - int fd; - int rc; - int flags; - - RPRINTF("client connecting to %s:%d...\n", - inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port)); - - if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) { - RPRINTF("could not create client socket: %d\n", errno); - return ERROR_INTERNAL; - } - state->stream_fd.fd = fd; - - /* make socket nonblocking */ - if ((flags = fcntl(fd, F_GETFL, 0)) == -1) - flags = 0; - if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { - RPRINTF("error setting fd %d to non block mode\n", fd); - return ERROR_INTERNAL; - } - - /* - * once we have created the socket and populated the address, - * we can now start our non-blocking connect. rather than - * duplicating code we trigger a timeout on the socket fd, - * which calls out nonblocking connect code - */ - if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0, - remus_retry_connect_event, - state)) < 0) { - RPRINTF("error registering timeout client connection event handler: %s\n", - strerror(id)); - return ERROR_INTERNAL; - } - - state->stream_fd.cid = id; - return 0; -} - static int remus_handle_queued_io(struct tdremus_state *s) { struct req_ring *queued_io = &s->queued_io; @@ -882,184 +782,35 @@ static int remus_handle_queued_io(struct tdremus_state *s) return 0; } -static int remus_connection_done(struct tdremus_state *s) +static void remus_client_established(td_replication_connect_t *t, int rc) { + struct tdremus_state *s = CONTAINER_OF(t, *s, t); event_id_t id; - /* the connect succeeded */ - /* unregister this function and register a new event handler */ - tapdisk_server_unregister_event(s->stream_fd.cid); - s->stream_fd.cid = -1; + if (rc) { + primary_failed(s, rc); + return; + } - id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->stream_fd.fd, + /* the connect succeeded */ + id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd, 0, remus_client_event, s); if(id < 0) { RPRINTF("error registering client event handler: %s\n", strerror(id)); - return ERROR_INTERNAL; - } - s->stream_fd.rid = id; - - /* handle the queued requests */ - return remus_handle_queued_io(s); -} - -static int remus_retry_connect(struct tdremus_state *s) -{ - event_id_t id; - - tapdisk_server_unregister_event(s->stream_fd.cid); - s->stream_fd.cid = -1; - - RPRINTF("connect to backup 1 second later"); - id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, - s->stream_fd.fd, - REMUS_CONNRETRY_TIMEOUT, - remus_retry_connect_event, s); - if (id < 0) { - RPRINTF("error registering timeout client connection event handler: %s\n", - strerror(id)); - return ERROR_INTERNAL; - } - - s->stream_fd.cid = id; - return 0; -} - -static int remus_wait_connect_done(struct tdremus_state *s) -{ - event_id_t id; - - tapdisk_server_unregister_event(s->stream_fd.cid); - s->stream_fd.cid = -1; - - id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD, - s->stream_fd.fd, 0, - remus_connect_event, s); - if (id < 0) { - RPRINTF("error registering client connection event handler: %s\n", - strerror(id)); - return ERROR_INTERNAL; - } - s->stream_fd.cid = id; - - return 0; -} - -/* return 1 if we need to reconnect to backup */ -static int check_connect_errno(int err) -{ - /* - * The fd is non-block, so we will not get ETIMEDOUT - * after calling connect(). We only can get this errno - * by getsockopt(). - */ - if (err == ECONNREFUSED || err == ENETUNREACH || - err == EAGAIN || err == ECONNABORTED || - err == ETIMEDOUT) - return 1; - - return 0; -} - -static void remus_retry_connect_event(event_id_t id, char mode, void *private) -{ - struct tdremus_state *s = (struct tdremus_state *)private; - int rc, ret; - - /* do a non-blocking connect */ - ret = connect(s->stream_fd.fd, - (struct sockaddr *)&s->sa, - sizeof(s->sa)); - if (ret) { - if (errno == EINPROGRESS) { - /* - * the connect returned EINPROGRESS (nonblocking - * connect) we must wait for the fd to be writeable - * to determine if the connect worked - */ - rc = remus_wait_connect_done(s); - if (rc) - goto fail; - return; - } - - if (check_connect_errno(errno)) { - rc = remus_retry_connect(s); - if (rc) - goto fail; - return; - } - - /* not recoverable */ - RPRINTF("error connection to server %s\n", strerror(errno)); - rc = ERROR_CONNECTION; - goto fail; - } - - /* The connection is established unexpectedly */ - rc = remus_connection_done(s); - if (rc) - goto fail; - - return; - -fail: - primary_failed(s, rc); - return; -} - -/* callback when nonblocking connect() is finished */ -static void remus_connect_event(event_id_t id, char mode, void *private) -{ - int socket_errno; - socklen_t socket_errno_size; - struct tdremus_state *s = (struct tdremus_state *)private; - int rc; - - /* check to see if the connect succeeded */ - socket_errno_size = sizeof(socket_errno); - if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR, - &socket_errno, &socket_errno_size)) { - RPRINTF("error getting socket errno\n"); + primary_failed(s, ERROR_INTERNAL); return; } - RPRINTF("socket connect returned %d\n", socket_errno); + s->stream_fd.fd = t->fd; + s->stream_fd.id = id; - if (socket_errno) { - /* the connect did not succeed */ - if (check_connect_errno(socket_errno)) { - /* - * we can probably assume that the backup is down. - * just try again later - */ - rc = remus_retry_connect(s); - if (rc) - goto fail; - - return; - } else { - RPRINTF("socket connect returned %d, giving up\n", - socket_errno); - rc = ERROR_CONNECTION; - goto fail; - } - - return; - } - - rc = remus_connection_done(s); + /* handle the queued requests */ + rc = remus_handle_queued_io(s); if (rc) - goto fail; - - return; - -fail: - primary_failed(s, rc); + primary_failed(s, rc); } - /* * we install this event handler on the primary once we have * connected to the backup. @@ -1142,19 +893,21 @@ static int primary_forward_request(struct tdremus_state *s, static void primary_queue_write(td_driver_t *driver, td_request_t treq) { struct tdremus_state *s = (struct tdremus_state *)driver->data; - int rc; + int rc, ret; // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd); - if(s->stream_fd.fd < 0) { + ret = td_replication_connect_status(&s->t); + if(ret == -1) { RPRINTF("connecting to backup...\n"); - rc = primary_do_connect(s); + s->t.callback = remus_client_established; + rc = td_replication_client_start(&s->t); if (rc) goto fail; } /* The connection is not established, just queue the request */ - if (s->stream_fd.cid >= 0) { + if (ret != 1) { ring_add_request(&s->queued_io, &treq); return; } @@ -1227,9 +980,7 @@ static int primary_start(td_driver_t *driver) s->queue_flush = primary_flush; s->stream_fd.fd = -1; - s->stream_fd.cid = -1; - s->stream_fd.rid = -1; - s->stream_fd.wid = -1; + s->stream_fd.id = -1; return 0; } @@ -1240,100 +991,32 @@ static void remus_server_event(event_id_t id, char mode, void *private); /* It is called when we find some I/O error */ static void backup_failed(struct tdremus_state *s, int rc) { - close_stream_fd(s); - close_server_fd(s); + td_replication_connect_kill(&s->t); /* We will switch to unprotected mode in backup_queue_write() */ } /* returns the socket that receives write requests */ -static void remus_server_accept(event_id_t id, char mode, void* private) +static void remus_server_established(td_replication_connect_t *t, int rc) { - struct tdremus_state* s = (struct tdremus_state *) private; - - int stream_fd; - - /* XXX: add address-based black/white list */ - if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) { - RPRINTF("error accepting connection: %d\n", errno); - return; - } + struct tdremus_state *s = CONTAINER_OF(t, *s, t); + event_id_t id; - /* - * TODO: check to see if we are already replicating. - * if so just close the connection (or do something - * smarter) - */ - RPRINTF("server accepted connection\n"); + /* rc is always 0 */ /* add tapdisk event for replication stream */ - id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0, + id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd, 0, remus_server_event, s); if (id < 0) { RPRINTF("error registering connection event handler: %s\n", strerror(errno)); - close(stream_fd); + td_replication_server_restart(t); return; } /* store replication file descriptor */ - s->stream_fd.fd = stream_fd; - s->stream_fd.rid = id; -} - -/* returns -2 if EADDRNOTAVAIL */ -static int remus_bind(struct tdremus_state* s) -{ - int opt; - int rc = -1; - event_id_t id; - - if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - RPRINTF("could not create server socket: %d\n", errno); - return rc; - } - - opt = 1; - if (setsockopt(s->server_fd.fd, SOL_SOCKET, - SO_REUSEADDR, &opt, sizeof(opt)) < 0) - RPRINTF("Error setting REUSEADDR on %d: %d\n", - s->server_fd.fd, errno); - - if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa, - sizeof(s->sa)) < 0) { - RPRINTF("could not bind server socket %d to %s:%d: %d %s\n", - s->server_fd.fd, inet_ntoa(s->sa.sin_addr), - ntohs(s->sa.sin_port), errno, strerror(errno)); - if (errno == EADDRNOTAVAIL) - rc = -2; - goto err_sfd; - } - - if (listen(s->server_fd.fd, 10)) { - RPRINTF("could not listen on socket: %d\n", errno); - goto err_sfd; - } - - /* - * The socket s now bound to the address and listening so we - * may now register the fd with tapdisk - */ - id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, - s->server_fd.fd, 0, - remus_server_accept, s); - if (id < 0) { - RPRINTF("error registering server connection event handler: %s", - strerror(id)); - goto err_sfd; - } - s->server_fd.cid = id; - - return 0; - -err_sfd: - CLOSE_FD(s->server_fd.fd); - - return rc; + s->stream_fd.fd = t->fd; + s->stream_fd.id = id; } /* wait for latest checkpoint to be applied */ @@ -1566,90 +1249,6 @@ static int unprotected_start(td_driver_t *driver) /* control */ - -static inline int resolve_address(const char* addr, struct in_addr* ia) -{ - struct hostent* he; - uint32_t ip; - - if (!(he = gethostbyname(addr))) { - RPRINTF("error resolving %s: %d\n", addr, h_errno); - return -1; - } - - if (!he->h_addr_list[0]) { - RPRINTF("no address found for %s\n", addr); - return -1; - } - - /* network byte order */ - ip = *((uint32_t**)he->h_addr_list)[0]; - ia->s_addr = ip; - - return 0; -} - -static int get_args(td_driver_t *driver, const char* name) -{ - struct tdremus_state *state = (struct tdremus_state *)driver->data; - char* host; - char* port; -// char* driver_str; -// char* parent; -// int type; -// char* path; -// unsigned long ulport; -// int i; -// struct sockaddr_in server_addr_in; - - int gai_status; - int valid_addr; - struct addrinfo gai_hints; - struct addrinfo *servinfo, *servinfo_itr; - - memset(&gai_hints, 0, sizeof gai_hints); - gai_hints.ai_family = AF_UNSPEC; - gai_hints.ai_socktype = SOCK_STREAM; - - port = strchr(name, ':'); - if (!port) { - RPRINTF("missing host in %s\n", name); - return -ENOENT; - } - if (!(host = strndup(name, port - name))) { - RPRINTF("unable to allocate host\n"); - return -ENOMEM; - } - port++; - - if ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo)) != 0) { - RPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status)); - return -ENOENT; - } - - /* TODO: do something smarter here */ - valid_addr = 0; - for(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr = servinfo_itr->ai_next) { - void *addr; - char *ipver; - - if (servinfo_itr->ai_family == AF_INET) { - valid_addr = 1; - memset(&state->sa, 0, sizeof(state->sa)); - state->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr; - break; - } - } - freeaddrinfo(servinfo); - - if (!valid_addr) - return -ENOENT; - - RPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port)); - - return 0; -} - static int switch_mode(td_driver_t *driver, enum tdremus_mode mode) { struct tdremus_state *s = (struct tdremus_state *)driver->data; @@ -1844,11 +1443,11 @@ static int ctl_register(struct tdremus_state *s) RPRINTF("registering ctl fifo\n"); /* register ctl fd */ - s->ctl_fd.cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s); + s->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s); - if (s->ctl_fd.cid < 0) { + if (s->ctl_fd.id < 0) { RPRINTF("error registering ctrl FIFO %s: %d\n", - s->ctl_path, s->ctl_fd.cid); + s->ctl_path, s->ctl_fd.id); return -1; } @@ -1859,7 +1458,7 @@ static void ctl_unregister(struct tdremus_state *s) { RPRINTF("unregistering ctl fifo\n"); - UNREGISTER_EVENT(s->ctl_fd.cid); + UNREGISTER_EVENT(s->ctl_fd.id); } /* interface */ @@ -1867,6 +1466,7 @@ static void ctl_unregister(struct tdremus_state *s) static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid) { struct tdremus_state *s = (struct tdremus_state *)driver->data; + td_replication_connect_t *t = &s->t; int rc; const char *name = image->name; td_flag_t flags = image->flags; @@ -1877,7 +1477,6 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid) remus_image = image; memset(s, 0, sizeof(*s)); - s->server_fd.fd = -1; s->stream_fd.fd = -1; s->ctl_fd.fd = -1; s->msg_fd.fd = -1; @@ -1886,8 +1485,12 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid) * the driver stack from the stream_fd event handler */ s->tdremus_driver = driver; + t->log_prefix = "remus"; + t->retry_timeout_s = REMUS_CONNRETRY_TIMEOUT; + t->max_connections = 10; + t->callback = remus_server_established; /* parse name to get info etc */ - if ((rc = get_args(driver, name))) + if ((rc = td_replication_connect_init(t, name))) return rc; if ((rc = ctl_open(driver, name))) { @@ -1901,7 +1504,7 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid) return rc; } - if (!(rc = remus_bind(s))) + if (!(rc = td_replication_server_start(t))) rc = switch_mode(driver, mode_backup); else if (rc == -2) rc = switch_mode(driver, mode_primary); @@ -1932,8 +1535,7 @@ static int tdremus_close(td_driver_t *driver) if (s->ramdisk.inprogress) hashtable_destroy(s->ramdisk.inprogress, 0); - close_server_fd(s); - close_stream_fd(s); + td_replication_connect_kill(&s->t); ctl_unregister(s); ctl_close(s); diff --git a/tools/blktap2/drivers/block-replication.c b/tools/blktap2/drivers/block-replication.c new file mode 100644 index 0000000..e4b2679 --- /dev/null +++ b/tools/blktap2/drivers/block-replication.c @@ -0,0 +1,468 @@ +/* + * Copyright (C) 2014 FUJITSU LIMITED + * Author: Wen Congyang <wency@xxxxxxxxxxxxxx> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + */ + +#include "tapdisk-server.h" +#include "block-replication.h" + +#include <string.h> +#include <errno.h> +#include <sys/types.h> +#include <unistd.h> +#include <fcntl.h> +#include <syslog.h> +#include <stdlib.h> +#include <arpa/inet.h> + +#undef DPRINTF +#undef EPRINTF +#define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _f, log_prefix, ## _a) +#define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f, log_prefix, ## _a) + +/* connection status */ +enum { + connection_none, + connection_in_progress, + connection_established, + connection_closed, +}; + +/* common functions */ +/* args should be host:port */ +static int get_args(td_replication_connect_t *t, const char* name) +{ + char* host; + const char* port; + int gai_status; + int valid_addr; + struct addrinfo gai_hints; + struct addrinfo *servinfo, *servinfo_itr; + const char *log_prefix = t->log_prefix; + + memset(&gai_hints, 0, sizeof gai_hints); + gai_hints.ai_family = AF_UNSPEC; + gai_hints.ai_socktype = SOCK_STREAM; + + port = strchr(name, ':'); + if (!port) { + EPRINTF("missing host in %s\n", name); + return -ENOENT; + } + if (!(host = strndup(name, port - name))) { + EPRINTF("unable to allocate host\n"); + return -ENOMEM; + } + port++; + if ((gai_status = getaddrinfo(host, port, + &gai_hints, &servinfo)) != 0) { + EPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status)); + free(host); + return -ENOENT; + } + free(host); + + /* TODO: do something smarter here */ + valid_addr = 0; + for (servinfo_itr = servinfo; servinfo_itr != NULL; + servinfo_itr = servinfo_itr->ai_next) { + if (servinfo_itr->ai_family == AF_INET) { + valid_addr = 1; + memset(&t->sa, 0, sizeof(t->sa)); + t->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr; + break; + } + } + freeaddrinfo(servinfo); + + if (!valid_addr) + return -ENOENT; + + DPRINTF("host: %s, port: %d\n", inet_ntoa(t->sa.sin_addr), + ntohs(t->sa.sin_port)); + + return 0; +} + +int td_replication_connect_init(td_replication_connect_t *t, const char *name) +{ + int rc; + + rc = get_args(t, name); + if (rc) + return rc; + + t->listen_fd = -1; + t->id = -1; + t->status = connection_none; + return 0; +} + +int td_replication_connect_status(td_replication_connect_t *t) +{ + const char *log_prefix = t->log_prefix; + + switch (t->status) { + case connection_none: + case connection_closed: + return -1; + case connection_in_progress: + return 0; + case connection_established: + return 1; + default: + EPRINTF("td_replication_connect is corruptted\n"); + return -2; + } +} + +void td_replication_connect_kill(td_replication_connect_t *t) +{ + if (t->status != connection_in_progress && + t->status != connection_established) + return; + + UNREGISTER_EVENT(t->id); + CLOSE_FD(t->fd); + CLOSE_FD(t->listen_fd); + t->status = connection_closed; +} + +/* server */ +static void td_replication_server_accept(event_id_t id, char mode, + void *private); + +int td_replication_server_start(td_replication_connect_t *t) +{ + int opt; + int rc = -1; + event_id_t id; + int fd; + const char *log_prefix = t->log_prefix; + + if (t->status == connection_in_progress || + t->status == connection_established) + return rc; + + fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) { + EPRINTF("could not create server socket: %d\n", errno); + return rc; + } + + opt = 1; + if (setsockopt(fd, SOL_SOCKET, + SO_REUSEADDR, &opt, sizeof(opt)) < 0) + DPRINTF("Error setting REUSEADDR on %d: %d\n", fd, errno); + + if (bind(fd, (struct sockaddr *)&t->sa, sizeof(t->sa)) < 0) { + DPRINTF("could not bind server socket %d to %s:%d: %d %s\n", + fd, inet_ntoa(t->sa.sin_addr), + ntohs(t->sa.sin_port), errno, strerror(errno)); + if (errno == EADDRNOTAVAIL) + rc = -2; + goto err; + } + + if (listen(fd, t->max_connections)) { + EPRINTF("could not listen on socket: %d\n", errno); + goto err; + } + + /* + * The socket is now bound to the address and listening so we + * may now register the fd with tapdisk + */ + id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, + fd, 0, + td_replication_server_accept, t); + if (id < 0) { + EPRINTF("error registering server connection event handler: %s", + strerror(id)); + goto err; + } + t->listen_fd = fd; + t->id = id; + t->status = connection_in_progress; + + return 0; + +err: + close(fd); + return rc; +} + +static void td_replication_server_accept(event_id_t id, char mode, + void *private) +{ + td_replication_connect_t *t = private; + int fd; + const char *log_prefix = t->log_prefix; + + /* XXX: add address-based black/white list */ + fd = accept(t->listen_fd, NULL, NULL); + if (fd < 0) { + EPRINTF("error accepting connection: %d\n", errno); + return; + } + + if (t->status == connection_established) { + EPRINTF("connection is already established\n"); + close(fd); + return; + } + + DPRINTF("server accepted connection\n"); + t->fd = fd; + t->status = connection_established; + t->callback(t, 0); +} + +int td_replication_server_restart(td_replication_connect_t *t) +{ + switch (t->status) { + case connection_in_progress: + return 0; + case connection_established: + CLOSE_FD(t->fd); + t->status = connection_in_progress; + return 0; + case connection_none: + case connection_closed: + return td_replication_server_start(t); + default: + /* not reached */ + return -1; + } +} + +/* client */ +static void td_replication_retry_connect_event(event_id_t id, char mode, + void *private); +static void td_replication_connect_event(event_id_t id, char mode, + void *private); +int td_replication_client_start(td_replication_connect_t *t) +{ + event_id_t id; + int fd; + int rc; + int flags; + const char *log_prefix = t->log_prefix; + + if (t->status == connection_in_progress || + t->status == connection_established) + return ERROR_INTERNAL; + + DPRINTF("client connecting to %s:%d...\n", + inet_ntoa(t->sa.sin_addr), ntohs(t->sa.sin_port)); + + if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) { + EPRINTF("could not create client socket: %d\n", errno); + return ERROR_INTERNAL; + } + + /* make socket nonblocking */ + if ((flags = fcntl(fd, F_GETFL, 0)) == -1) + flags = 0; + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { + EPRINTF("error setting fd %d to non block mode\n", fd); + goto err; + } + + /* + * once we have created the socket and populated the address, + * we can now start our non-blocking connect. rather than + * duplicating code we trigger a timeout on the socket fd, + * which calls out nonblocking connect code + */ + id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0, + td_replication_retry_connect_event, + t); + if(id < 0) { + EPRINTF("error registering timeout client connection event handler: %s\n", + strerror(id)); + goto err; + } + + t->fd = fd; + t->id = id; + t->status = connection_in_progress; + return 0; + +err: + close(fd); + return ERROR_INTERNAL; +} + +static void td_replication_client_failed(td_replication_connect_t *t, int rc) +{ + td_replication_connect_kill(t); + t->callback(t, rc); +} + +static void td_replication_client_done(td_replication_connect_t *t) +{ + UNREGISTER_EVENT(t->id); + t->status = connection_established; + t->callback(t, 0); +} + +static int td_replication_retry_connect(td_replication_connect_t *t) +{ + event_id_t id; + const char *log_prefix = t->log_prefix; + + UNREGISTER_EVENT(t->id); + + DPRINTF("connect to server 1 second later"); + id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, + t->fd, t->retry_timeout_s, + td_replication_retry_connect_event, + t); + if (id < 0) { + EPRINTF("error registering timeout client connection event handler: %s\n", + strerror(id)); + return ERROR_INTERNAL; + } + + t->id = id; + return 0; +} + +static int td_replication_wait_connect_done(td_replication_connect_t *t) +{ + event_id_t id; + const char *log_prefix = t->log_prefix; + + UNREGISTER_EVENT(t->id); + + id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD, + t->fd, 0, + td_replication_connect_event, t); + if (id < 0) { + EPRINTF("error registering client connection event handler: %s\n", + strerror(id)); + return ERROR_INTERNAL; + } + t->id = id; + + return 0; +} + +/* return 1 if we need to reconnect to backup server */ +static int check_connect_errno(int err) +{ + /* + * The fd is non-block, so we will not get ETIMEDOUT + * after calling connect(). We only can get this errno + * by getsockopt(). + */ + if (err == ECONNREFUSED || err == ENETUNREACH || + err == EAGAIN || err == ECONNABORTED || + err == ETIMEDOUT) + return 1; + + return 0; +} + +static void td_replication_retry_connect_event(event_id_t id, char mode, + void *private) +{ + td_replication_connect_t *t = private; + int rc, ret; + const char *log_prefix = t->log_prefix; + + /* do a non-blocking connect */ + ret = connect(t->fd, (struct sockaddr *)&t->sa, sizeof(t->sa)); + if (ret) { + if (errno == EINPROGRESS) { + /* + * the connect returned EINPROGRESS (nonblocking + * connect) we must wait for the fd to be writeable + * to determine if the connect worked + */ + rc = td_replication_wait_connect_done(t); + if (rc) + goto fail; + return; + } + + if (check_connect_errno(errno)) { + rc = td_replication_retry_connect(t); + if (rc) + goto fail; + return; + } + + /* not recoverable */ + EPRINTF("error connection to server %s\n", strerror(errno)); + rc = ERROR_CONNECTION; + goto fail; + } + + /* The connection is established unexpectedly */ + td_replication_client_done(t); + + return; + +fail: + td_replication_client_failed(t, rc); +} + +/* callback when nonblocking connect() is finished */ +static void td_replication_connect_event(event_id_t id, char mode, + void *private) +{ + int socket_errno; + socklen_t socket_errno_size; + td_replication_connect_t *t = private; + int rc; + const char *log_prefix = t->log_prefix; + + /* check to see if the connect succeeded */ + socket_errno_size = sizeof(socket_errno); + if (getsockopt(t->fd, SOL_SOCKET, SO_ERROR, + &socket_errno, &socket_errno_size)) { + EPRINTF("error getting socket errno\n"); + return; + } + + DPRINTF("socket connect returned %d\n", socket_errno); + + if (socket_errno) { + /* the connect did not succeed */ + if (check_connect_errno(socket_errno)) { + /* + * we can probably assume that the backup is down. + * just try again later + */ + rc = td_replication_retry_connect(t); + if (rc) + goto fail; + + return; + } else { + EPRINTF("socket connect returned %d, giving up\n", + socket_errno); + rc = ERROR_CONNECTION; + goto fail; + } + } + + td_replication_client_done(t); + + return; + +fail: + td_replication_client_failed(t, rc); +} diff --git a/tools/blktap2/drivers/block-replication.h b/tools/blktap2/drivers/block-replication.h new file mode 100644 index 0000000..0bd6e71 --- /dev/null +++ b/tools/blktap2/drivers/block-replication.h @@ -0,0 +1,113 @@ +/* + * Copyright (C) 2014 FUJITSU LIMITED + * Author: Wen Congyang <wency@xxxxxxxxxxxxxx> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + */ + +#ifndef BLOCK_REPLICATION_H +#define BLOCK_REPLICATION_H + +#include "scheduler.h" +#include <sys/socket.h> +#include <netdb.h> + +#define CONTAINER_OF(inner_ptr, outer, member_name) \ + ({ \ + typeof(outer) *container_of_; \ + container_of_ = (void*)((char*)(inner_ptr) - \ + offsetof(typeof(outer), member_name)); \ + (void)(&container_of_->member_name == \ + (typeof(inner_ptr))0) /* type check */; \ + container_of_; \ + }) + +#define UNREGISTER_EVENT(id) \ + do { \ + if (id >= 0) { \ + tapdisk_server_unregister_event(id); \ + id = -1; \ + } \ + } while (0) +#define CLOSE_FD(fd) \ + do { \ + if (fd >= 0) { \ + close(fd); \ + fd = -1; \ + } \ + } while (0) + +enum { + ERROR_INTERNAL = -1, + ERROR_IO = -2, + ERROR_CONNECTION = -3, + ERROR_CLOSE = -4, +}; + +typedef struct td_replication_connect td_replication_connect_t; +typedef void td_replication_callback(td_replication_connect_t *r, int rc); + +struct td_replication_connect { + /* + * caller must fill these in before calling + * td_replication_connect_init() + */ + const char *log_prefix; + td_replication_callback *callback; + int retry_timeout_s; + int max_connections; + /* + * The caller uses this fd to read/write after + * the connection is established + */ + int fd; + + /* private */ + struct sockaddr_in sa; + int listen_fd; + event_id_t id; + + int status; +}; + +/* return -errno if failure happened, otherwise return 0 */ +int td_replication_connect_init(td_replication_connect_t *t, const char *name); +/* + * Return value: + * -1: connection is closed or not connected + * 0: connection is in progress + * 1: connection is established + */ +int td_replication_connect_status(td_replication_connect_t *t); +void td_replication_connect_kill(td_replication_connect_t *t); + +/* + * Return value: + * -2: this caller should be client + * -1: error + * 0: connection is in progress + */ +int td_replication_server_start(td_replication_connect_t *t); +/* + * Return value: + * -2: this caller should be client + * -1: error + * 0: connection is in progress + */ +int td_replication_server_restart(td_replication_connect_t *t); +/* + * Return value: + * -1: error + * 0: connection is in progress + */ +int td_replication_client_start(td_replication_connect_t *t); + +#endif -- 1.9.3 _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxx http://lists.xen.org/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |