|
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [RFC Patch v3 15/22] blktap2: move async connect related codes to block-replication.c
COLO will reuse them.
Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
Cc: Shriram Rajagopalan <rshriram@xxxxxxxxx>
---
tools/blktap2/drivers/Makefile | 2 +-
tools/blktap2/drivers/block-remus.c | 494 +++---------------------------
tools/blktap2/drivers/block-replication.c | 468 ++++++++++++++++++++++++++++
tools/blktap2/drivers/block-replication.h | 113 +++++++
4 files changed, 630 insertions(+), 447 deletions(-)
create mode 100644 tools/blktap2/drivers/block-replication.c
create mode 100644 tools/blktap2/drivers/block-replication.h
diff --git a/tools/blktap2/drivers/Makefile b/tools/blktap2/drivers/Makefile
index 37c3485..3d8ed8a 100644
--- a/tools/blktap2/drivers/Makefile
+++ b/tools/blktap2/drivers/Makefile
@@ -23,7 +23,7 @@ endif
VHDLIBS := -L$(LIBVHDDIR) -lvhd
-REMUS-OBJS := block-remus.o
+REMUS-OBJS := block-remus.o block-replication.o
REMUS-OBJS += hashtable.o
REMUS-OBJS += hashtable_itr.o
REMUS-OBJS += hashtable_utility.o
diff --git a/tools/blktap2/drivers/block-remus.c
b/tools/blktap2/drivers/block-remus.c
index 5d27d41..8b6f157 100644
--- a/tools/blktap2/drivers/block-remus.c
+++ b/tools/blktap2/drivers/block-remus.c
@@ -40,6 +40,7 @@
#include "hashtable.h"
#include "hashtable_itr.h"
#include "hashtable_utility.h"
+#include "block-replication.h"
#include <errno.h>
#include <inttypes.h>
@@ -49,10 +50,7 @@
#include <string.h>
#include <sys/time.h>
#include <sys/types.h>
-#include <sys/socket.h>
-#include <netdb.h>
#include <netinet/in.h>
-#include <arpa/inet.h>
#include <sys/param.h>
#include <sys/sysctl.h>
#include <unistd.h>
@@ -67,22 +65,6 @@
#define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
-#define UNREGISTER_EVENT(id) \
- do { \
- if (id >= 0) { \
- tapdisk_server_unregister_event(id); \
- id = -1; \
- } \
- } while (0)
-
-#define CLOSE_FD(fd) \
- do { \
- if (fd >= 0) { \
- close(fd); \
- fd = -1; \
- } \
- } while (0)
-
#define MAX_REMUS_REQUEST TAPDISK_DATA_REQUESTS
enum tdremus_mode {
@@ -92,13 +74,6 @@ enum tdremus_mode {
mode_backup
};
-enum {
- ERROR_INTERNAL = -1,
- ERROR_IO = -2,
- ERROR_CONNECTION = -3,
- ERROR_CLOSE = -4,
-};
-
struct tdremus_req {
td_request_t treq;
};
@@ -167,21 +142,9 @@ struct ramdisk_write_cbdata {
typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
-/*
- * If cid, rid and wid are -1, fd must be -1. It means that
- * we are in unpritected mode or we don't start to connect
- * to backup.
- * If fd is an valid fd:
- * cid is valid, rid and wid must be invalid. It means that
- * the connection is in progress.
- * cid is invalid. rid or wid must be valid. It means that
- * the connection is established.
- */
typedef struct poll_fd {
int fd;
- event_id_t cid;
- event_id_t rid;
- event_id_t wid;
+ event_id_t id;
} poll_fd_t;
struct tdremus_state {
@@ -195,9 +158,7 @@ struct tdremus_state {
char* msg_path; /* output completion message here */
poll_fd_t msg_fd;
- /* replication host */
- struct sockaddr_in sa;
- poll_fd_t server_fd; /* server listen port */
+ td_replication_connect_t t;
poll_fd_t stream_fd; /* replication channel */
/*
@@ -777,28 +738,8 @@ static int mwrite(int fd, void* buf, size_t len)
select(fd + 1, NULL, &wfds, NULL, &tv);
}
-
-static void inline close_stream_fd(struct tdremus_state *s)
-{
-
- UNREGISTER_EVENT(s->stream_fd.cid);
- UNREGISTER_EVENT(s->stream_fd.rid);
- UNREGISTER_EVENT(s->stream_fd.wid);
-
- /* close the connection */
- CLOSE_FD(s->stream_fd.fd);
-}
-
-static void close_server_fd(struct tdremus_state *s)
-{
- UNREGISTER_EVENT(s->server_fd.cid);
- CLOSE_FD(s->server_fd.fd);
-}
-
/* primary functions */
-static void remus_client_event(event_id_t, char mode, void *private);
-static void remus_connect_event(event_id_t id, char mode, void *private);
-static void remus_retry_connect_event(event_id_t id, char mode, void *private);
+static void remus_client_event(event_id_t id, char mode, void *private);
static int primary_forward_request(struct tdremus_state *s,
const td_request_t *treq);
@@ -808,56 +749,15 @@ static int primary_forward_request(struct tdremus_state
*s,
*/
static void primary_failed(struct tdremus_state *s, int rc)
{
- close_stream_fd(s);
+ td_replication_connect_kill(&s->t);
if (rc == ERROR_INTERNAL)
RPRINTF("switch to unprotected mode due to internal error");
if (rc == ERROR_CLOSE)
RPRINTF("switch to unprotected mode before closing");
+ UNREGISTER_EVENT(s->stream_fd.id);
switch_mode(s->tdremus_driver, mode_unprotected);
}
-static int primary_do_connect(struct tdremus_state *state)
-{
- event_id_t id;
- int fd;
- int rc;
- int flags;
-
- RPRINTF("client connecting to %s:%d...\n",
- inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
-
- if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
- RPRINTF("could not create client socket: %d\n", errno);
- return ERROR_INTERNAL;
- }
- state->stream_fd.fd = fd;
-
- /* make socket nonblocking */
- if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
- flags = 0;
- if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
- RPRINTF("error setting fd %d to non block mode\n", fd);
- return ERROR_INTERNAL;
- }
-
- /*
- * once we have created the socket and populated the address,
- * we can now start our non-blocking connect. rather than
- * duplicating code we trigger a timeout on the socket fd,
- * which calls out nonblocking connect code
- */
- if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0,
- remus_retry_connect_event,
- state)) < 0) {
- RPRINTF("error registering timeout client connection event
handler: %s\n",
- strerror(id));
- return ERROR_INTERNAL;
- }
-
- state->stream_fd.cid = id;
- return 0;
-}
-
static int remus_handle_queued_io(struct tdremus_state *s)
{
struct req_ring *queued_io = &s->queued_io;
@@ -882,184 +782,35 @@ static int remus_handle_queued_io(struct tdremus_state
*s)
return 0;
}
-static int remus_connection_done(struct tdremus_state *s)
+static void remus_client_established(td_replication_connect_t *t, int rc)
{
+ struct tdremus_state *s = CONTAINER_OF(t, *s, t);
event_id_t id;
- /* the connect succeeded */
- /* unregister this function and register a new event handler */
- tapdisk_server_unregister_event(s->stream_fd.cid);
- s->stream_fd.cid = -1;
+ if (rc) {
+ primary_failed(s, rc);
+ return;
+ }
- id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
s->stream_fd.fd,
+ /* the connect succeeded */
+ id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd,
0, remus_client_event, s);
if(id < 0) {
RPRINTF("error registering client event handler: %s\n",
strerror(id));
- return ERROR_INTERNAL;
- }
- s->stream_fd.rid = id;
-
- /* handle the queued requests */
- return remus_handle_queued_io(s);
-}
-
-static int remus_retry_connect(struct tdremus_state *s)
-{
- event_id_t id;
-
- tapdisk_server_unregister_event(s->stream_fd.cid);
- s->stream_fd.cid = -1;
-
- RPRINTF("connect to backup 1 second later");
- id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
- s->stream_fd.fd,
- REMUS_CONNRETRY_TIMEOUT,
- remus_retry_connect_event, s);
- if (id < 0) {
- RPRINTF("error registering timeout client connection event
handler: %s\n",
- strerror(id));
- return ERROR_INTERNAL;
- }
-
- s->stream_fd.cid = id;
- return 0;
-}
-
-static int remus_wait_connect_done(struct tdremus_state *s)
-{
- event_id_t id;
-
- tapdisk_server_unregister_event(s->stream_fd.cid);
- s->stream_fd.cid = -1;
-
- id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
- s->stream_fd.fd, 0,
- remus_connect_event, s);
- if (id < 0) {
- RPRINTF("error registering client connection event handler:
%s\n",
- strerror(id));
- return ERROR_INTERNAL;
- }
- s->stream_fd.cid = id;
-
- return 0;
-}
-
-/* return 1 if we need to reconnect to backup */
-static int check_connect_errno(int err)
-{
- /*
- * The fd is non-block, so we will not get ETIMEDOUT
- * after calling connect(). We only can get this errno
- * by getsockopt().
- */
- if (err == ECONNREFUSED || err == ENETUNREACH ||
- err == EAGAIN || err == ECONNABORTED ||
- err == ETIMEDOUT)
- return 1;
-
- return 0;
-}
-
-static void remus_retry_connect_event(event_id_t id, char mode, void *private)
-{
- struct tdremus_state *s = (struct tdremus_state *)private;
- int rc, ret;
-
- /* do a non-blocking connect */
- ret = connect(s->stream_fd.fd,
- (struct sockaddr *)&s->sa,
- sizeof(s->sa));
- if (ret) {
- if (errno == EINPROGRESS) {
- /*
- * the connect returned EINPROGRESS (nonblocking
- * connect) we must wait for the fd to be writeable
- * to determine if the connect worked
- */
- rc = remus_wait_connect_done(s);
- if (rc)
- goto fail;
- return;
- }
-
- if (check_connect_errno(errno)) {
- rc = remus_retry_connect(s);
- if (rc)
- goto fail;
- return;
- }
-
- /* not recoverable */
- RPRINTF("error connection to server %s\n", strerror(errno));
- rc = ERROR_CONNECTION;
- goto fail;
- }
-
- /* The connection is established unexpectedly */
- rc = remus_connection_done(s);
- if (rc)
- goto fail;
-
- return;
-
-fail:
- primary_failed(s, rc);
- return;
-}
-
-/* callback when nonblocking connect() is finished */
-static void remus_connect_event(event_id_t id, char mode, void *private)
-{
- int socket_errno;
- socklen_t socket_errno_size;
- struct tdremus_state *s = (struct tdremus_state *)private;
- int rc;
-
- /* check to see if the connect succeeded */
- socket_errno_size = sizeof(socket_errno);
- if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR,
- &socket_errno, &socket_errno_size)) {
- RPRINTF("error getting socket errno\n");
+ primary_failed(s, ERROR_INTERNAL);
return;
}
- RPRINTF("socket connect returned %d\n", socket_errno);
+ s->stream_fd.fd = t->fd;
+ s->stream_fd.id = id;
- if (socket_errno) {
- /* the connect did not succeed */
- if (check_connect_errno(socket_errno)) {
- /*
- * we can probably assume that the backup is down.
- * just try again later
- */
- rc = remus_retry_connect(s);
- if (rc)
- goto fail;
-
- return;
- } else {
- RPRINTF("socket connect returned %d, giving up\n",
- socket_errno);
- rc = ERROR_CONNECTION;
- goto fail;
- }
-
- return;
- }
-
- rc = remus_connection_done(s);
+ /* handle the queued requests */
+ rc = remus_handle_queued_io(s);
if (rc)
- goto fail;
-
- return;
-
-fail:
- primary_failed(s, rc);
+ primary_failed(s, rc);
}
-
/*
* we install this event handler on the primary once we have
* connected to the backup.
@@ -1142,19 +893,21 @@ static int primary_forward_request(struct tdremus_state
*s,
static void primary_queue_write(td_driver_t *driver, td_request_t treq)
{
struct tdremus_state *s = (struct tdremus_state *)driver->data;
- int rc;
+ int rc, ret;
// RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
- if(s->stream_fd.fd < 0) {
+ ret = td_replication_connect_status(&s->t);
+ if(ret == -1) {
RPRINTF("connecting to backup...\n");
- rc = primary_do_connect(s);
+ s->t.callback = remus_client_established;
+ rc = td_replication_client_start(&s->t);
if (rc)
goto fail;
}
/* The connection is not established, just queue the request */
- if (s->stream_fd.cid >= 0) {
+ if (ret != 1) {
ring_add_request(&s->queued_io, &treq);
return;
}
@@ -1227,9 +980,7 @@ static int primary_start(td_driver_t *driver)
s->queue_flush = primary_flush;
s->stream_fd.fd = -1;
- s->stream_fd.cid = -1;
- s->stream_fd.rid = -1;
- s->stream_fd.wid = -1;
+ s->stream_fd.id = -1;
return 0;
}
@@ -1240,100 +991,32 @@ static void remus_server_event(event_id_t id, char
mode, void *private);
/* It is called when we find some I/O error */
static void backup_failed(struct tdremus_state *s, int rc)
{
- close_stream_fd(s);
- close_server_fd(s);
+ td_replication_connect_kill(&s->t);
/* We will switch to unprotected mode in backup_queue_write() */
}
/* returns the socket that receives write requests */
-static void remus_server_accept(event_id_t id, char mode, void* private)
+static void remus_server_established(td_replication_connect_t *t, int rc)
{
- struct tdremus_state* s = (struct tdremus_state *) private;
-
- int stream_fd;
-
- /* XXX: add address-based black/white list */
- if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
- RPRINTF("error accepting connection: %d\n", errno);
- return;
- }
+ struct tdremus_state *s = CONTAINER_OF(t, *s, t);
+ event_id_t id;
- /*
- * TODO: check to see if we are already replicating.
- * if so just close the connection (or do something
- * smarter)
- */
- RPRINTF("server accepted connection\n");
+ /* rc is always 0 */
/* add tapdisk event for replication stream */
- id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0,
+ id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd, 0,
remus_server_event, s);
if (id < 0) {
RPRINTF("error registering connection event handler: %s\n",
strerror(errno));
- close(stream_fd);
+ td_replication_server_restart(t);
return;
}
/* store replication file descriptor */
- s->stream_fd.fd = stream_fd;
- s->stream_fd.rid = id;
-}
-
-/* returns -2 if EADDRNOTAVAIL */
-static int remus_bind(struct tdremus_state* s)
-{
- int opt;
- int rc = -1;
- event_id_t id;
-
- if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- RPRINTF("could not create server socket: %d\n", errno);
- return rc;
- }
-
- opt = 1;
- if (setsockopt(s->server_fd.fd, SOL_SOCKET,
- SO_REUSEADDR, &opt, sizeof(opt)) < 0)
- RPRINTF("Error setting REUSEADDR on %d: %d\n",
- s->server_fd.fd, errno);
-
- if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa,
- sizeof(s->sa)) < 0) {
- RPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
- s->server_fd.fd, inet_ntoa(s->sa.sin_addr),
- ntohs(s->sa.sin_port), errno, strerror(errno));
- if (errno == EADDRNOTAVAIL)
- rc = -2;
- goto err_sfd;
- }
-
- if (listen(s->server_fd.fd, 10)) {
- RPRINTF("could not listen on socket: %d\n", errno);
- goto err_sfd;
- }
-
- /*
- * The socket s now bound to the address and listening so we
- * may now register the fd with tapdisk
- */
- id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
- s->server_fd.fd, 0,
- remus_server_accept, s);
- if (id < 0) {
- RPRINTF("error registering server connection event handler: %s",
- strerror(id));
- goto err_sfd;
- }
- s->server_fd.cid = id;
-
- return 0;
-
-err_sfd:
- CLOSE_FD(s->server_fd.fd);
-
- return rc;
+ s->stream_fd.fd = t->fd;
+ s->stream_fd.id = id;
}
/* wait for latest checkpoint to be applied */
@@ -1566,90 +1249,6 @@ static int unprotected_start(td_driver_t *driver)
/* control */
-
-static inline int resolve_address(const char* addr, struct in_addr* ia)
-{
- struct hostent* he;
- uint32_t ip;
-
- if (!(he = gethostbyname(addr))) {
- RPRINTF("error resolving %s: %d\n", addr, h_errno);
- return -1;
- }
-
- if (!he->h_addr_list[0]) {
- RPRINTF("no address found for %s\n", addr);
- return -1;
- }
-
- /* network byte order */
- ip = *((uint32_t**)he->h_addr_list)[0];
- ia->s_addr = ip;
-
- return 0;
-}
-
-static int get_args(td_driver_t *driver, const char* name)
-{
- struct tdremus_state *state = (struct tdremus_state *)driver->data;
- char* host;
- char* port;
-// char* driver_str;
-// char* parent;
-// int type;
-// char* path;
-// unsigned long ulport;
-// int i;
-// struct sockaddr_in server_addr_in;
-
- int gai_status;
- int valid_addr;
- struct addrinfo gai_hints;
- struct addrinfo *servinfo, *servinfo_itr;
-
- memset(&gai_hints, 0, sizeof gai_hints);
- gai_hints.ai_family = AF_UNSPEC;
- gai_hints.ai_socktype = SOCK_STREAM;
-
- port = strchr(name, ':');
- if (!port) {
- RPRINTF("missing host in %s\n", name);
- return -ENOENT;
- }
- if (!(host = strndup(name, port - name))) {
- RPRINTF("unable to allocate host\n");
- return -ENOMEM;
- }
- port++;
-
- if ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo)) != 0)
{
- RPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
- return -ENOENT;
- }
-
- /* TODO: do something smarter here */
- valid_addr = 0;
- for(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr =
servinfo_itr->ai_next) {
- void *addr;
- char *ipver;
-
- if (servinfo_itr->ai_family == AF_INET) {
- valid_addr = 1;
- memset(&state->sa, 0, sizeof(state->sa));
- state->sa = *(struct sockaddr_in
*)servinfo_itr->ai_addr;
- break;
- }
- }
- freeaddrinfo(servinfo);
-
- if (!valid_addr)
- return -ENOENT;
-
- RPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr),
ntohs(state->sa.sin_port));
-
- return 0;
-}
-
static int switch_mode(td_driver_t *driver, enum tdremus_mode mode)
{
struct tdremus_state *s = (struct tdremus_state *)driver->data;
@@ -1844,11 +1443,11 @@ static int ctl_register(struct tdremus_state *s)
RPRINTF("registering ctl fifo\n");
/* register ctl fd */
- s->ctl_fd.cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
s->ctl_fd.fd, 0, ctl_request, s);
+ s->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
s->ctl_fd.fd, 0, ctl_request, s);
- if (s->ctl_fd.cid < 0) {
+ if (s->ctl_fd.id < 0) {
RPRINTF("error registering ctrl FIFO %s: %d\n",
- s->ctl_path, s->ctl_fd.cid);
+ s->ctl_path, s->ctl_fd.id);
return -1;
}
@@ -1859,7 +1458,7 @@ static void ctl_unregister(struct tdremus_state *s)
{
RPRINTF("unregistering ctl fifo\n");
- UNREGISTER_EVENT(s->ctl_fd.cid);
+ UNREGISTER_EVENT(s->ctl_fd.id);
}
/* interface */
@@ -1867,6 +1466,7 @@ static void ctl_unregister(struct tdremus_state *s)
static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
{
struct tdremus_state *s = (struct tdremus_state *)driver->data;
+ td_replication_connect_t *t = &s->t;
int rc;
const char *name = image->name;
td_flag_t flags = image->flags;
@@ -1877,7 +1477,6 @@ static int tdremus_open(td_driver_t *driver, td_image_t
*image, td_uuid_t uuid)
remus_image = image;
memset(s, 0, sizeof(*s));
- s->server_fd.fd = -1;
s->stream_fd.fd = -1;
s->ctl_fd.fd = -1;
s->msg_fd.fd = -1;
@@ -1886,8 +1485,12 @@ static int tdremus_open(td_driver_t *driver, td_image_t
*image, td_uuid_t uuid)
* the driver stack from the stream_fd event handler */
s->tdremus_driver = driver;
+ t->log_prefix = "remus";
+ t->retry_timeout_s = REMUS_CONNRETRY_TIMEOUT;
+ t->max_connections = 10;
+ t->callback = remus_server_established;
/* parse name to get info etc */
- if ((rc = get_args(driver, name)))
+ if ((rc = td_replication_connect_init(t, name)))
return rc;
if ((rc = ctl_open(driver, name))) {
@@ -1901,7 +1504,7 @@ static int tdremus_open(td_driver_t *driver, td_image_t
*image, td_uuid_t uuid)
return rc;
}
- if (!(rc = remus_bind(s)))
+ if (!(rc = td_replication_server_start(t)))
rc = switch_mode(driver, mode_backup);
else if (rc == -2)
rc = switch_mode(driver, mode_primary);
@@ -1932,8 +1535,7 @@ static int tdremus_close(td_driver_t *driver)
if (s->ramdisk.inprogress)
hashtable_destroy(s->ramdisk.inprogress, 0);
- close_server_fd(s);
- close_stream_fd(s);
+ td_replication_connect_kill(&s->t);
ctl_unregister(s);
ctl_close(s);
diff --git a/tools/blktap2/drivers/block-replication.c
b/tools/blktap2/drivers/block-replication.c
new file mode 100644
index 0000000..e4b2679
--- /dev/null
+++ b/tools/blktap2/drivers/block-replication.c
@@ -0,0 +1,468 @@
+/*
+ * Copyright (C) 2014 FUJITSU LIMITED
+ * Author: Wen Congyang <wency@xxxxxxxxxxxxxx>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; version 2.1 only. with the special
+ * exception on linking described in file LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ */
+
+#include "tapdisk-server.h"
+#include "block-replication.h"
+
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <syslog.h>
+#include <stdlib.h>
+#include <arpa/inet.h>
+
+#undef DPRINTF
+#undef EPRINTF
+#define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _f, log_prefix, ## _a)
+#define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f, log_prefix, ## _a)
+
+/* connection status */
+enum {
+ connection_none,
+ connection_in_progress,
+ connection_established,
+ connection_closed,
+};
+
+/* common functions */
+/* args should be host:port */
+static int get_args(td_replication_connect_t *t, const char* name)
+{
+ char* host;
+ const char* port;
+ int gai_status;
+ int valid_addr;
+ struct addrinfo gai_hints;
+ struct addrinfo *servinfo, *servinfo_itr;
+ const char *log_prefix = t->log_prefix;
+
+ memset(&gai_hints, 0, sizeof gai_hints);
+ gai_hints.ai_family = AF_UNSPEC;
+ gai_hints.ai_socktype = SOCK_STREAM;
+
+ port = strchr(name, ':');
+ if (!port) {
+ EPRINTF("missing host in %s\n", name);
+ return -ENOENT;
+ }
+ if (!(host = strndup(name, port - name))) {
+ EPRINTF("unable to allocate host\n");
+ return -ENOMEM;
+ }
+ port++;
+ if ((gai_status = getaddrinfo(host, port,
+ &gai_hints, &servinfo)) != 0) {
+ EPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
+ free(host);
+ return -ENOENT;
+ }
+ free(host);
+
+ /* TODO: do something smarter here */
+ valid_addr = 0;
+ for (servinfo_itr = servinfo; servinfo_itr != NULL;
+ servinfo_itr = servinfo_itr->ai_next) {
+ if (servinfo_itr->ai_family == AF_INET) {
+ valid_addr = 1;
+ memset(&t->sa, 0, sizeof(t->sa));
+ t->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr;
+ break;
+ }
+ }
+ freeaddrinfo(servinfo);
+
+ if (!valid_addr)
+ return -ENOENT;
+
+ DPRINTF("host: %s, port: %d\n", inet_ntoa(t->sa.sin_addr),
+ ntohs(t->sa.sin_port));
+
+ return 0;
+}
+
+int td_replication_connect_init(td_replication_connect_t *t, const char *name)
+{
+ int rc;
+
+ rc = get_args(t, name);
+ if (rc)
+ return rc;
+
+ t->listen_fd = -1;
+ t->id = -1;
+ t->status = connection_none;
+ return 0;
+}
+
+int td_replication_connect_status(td_replication_connect_t *t)
+{
+ const char *log_prefix = t->log_prefix;
+
+ switch (t->status) {
+ case connection_none:
+ case connection_closed:
+ return -1;
+ case connection_in_progress:
+ return 0;
+ case connection_established:
+ return 1;
+ default:
+ EPRINTF("td_replication_connect is corruptted\n");
+ return -2;
+ }
+}
+
+void td_replication_connect_kill(td_replication_connect_t *t)
+{
+ if (t->status != connection_in_progress &&
+ t->status != connection_established)
+ return;
+
+ UNREGISTER_EVENT(t->id);
+ CLOSE_FD(t->fd);
+ CLOSE_FD(t->listen_fd);
+ t->status = connection_closed;
+}
+
+/* server */
+static void td_replication_server_accept(event_id_t id, char mode,
+ void *private);
+
+int td_replication_server_start(td_replication_connect_t *t)
+{
+ int opt;
+ int rc = -1;
+ event_id_t id;
+ int fd;
+ const char *log_prefix = t->log_prefix;
+
+ if (t->status == connection_in_progress ||
+ t->status == connection_established)
+ return rc;
+
+ fd = socket(AF_INET, SOCK_STREAM, 0);
+ if (fd < 0) {
+ EPRINTF("could not create server socket: %d\n", errno);
+ return rc;
+ }
+
+ opt = 1;
+ if (setsockopt(fd, SOL_SOCKET,
+ SO_REUSEADDR, &opt, sizeof(opt)) < 0)
+ DPRINTF("Error setting REUSEADDR on %d: %d\n", fd, errno);
+
+ if (bind(fd, (struct sockaddr *)&t->sa, sizeof(t->sa)) < 0) {
+ DPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
+ fd, inet_ntoa(t->sa.sin_addr),
+ ntohs(t->sa.sin_port), errno, strerror(errno));
+ if (errno == EADDRNOTAVAIL)
+ rc = -2;
+ goto err;
+ }
+
+ if (listen(fd, t->max_connections)) {
+ EPRINTF("could not listen on socket: %d\n", errno);
+ goto err;
+ }
+
+ /*
+ * The socket is now bound to the address and listening so we
+ * may now register the fd with tapdisk
+ */
+ id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+ fd, 0,
+ td_replication_server_accept, t);
+ if (id < 0) {
+ EPRINTF("error registering server connection event handler: %s",
+ strerror(id));
+ goto err;
+ }
+ t->listen_fd = fd;
+ t->id = id;
+ t->status = connection_in_progress;
+
+ return 0;
+
+err:
+ close(fd);
+ return rc;
+}
+
+static void td_replication_server_accept(event_id_t id, char mode,
+ void *private)
+{
+ td_replication_connect_t *t = private;
+ int fd;
+ const char *log_prefix = t->log_prefix;
+
+ /* XXX: add address-based black/white list */
+ fd = accept(t->listen_fd, NULL, NULL);
+ if (fd < 0) {
+ EPRINTF("error accepting connection: %d\n", errno);
+ return;
+ }
+
+ if (t->status == connection_established) {
+ EPRINTF("connection is already established\n");
+ close(fd);
+ return;
+ }
+
+ DPRINTF("server accepted connection\n");
+ t->fd = fd;
+ t->status = connection_established;
+ t->callback(t, 0);
+}
+
+int td_replication_server_restart(td_replication_connect_t *t)
+{
+ switch (t->status) {
+ case connection_in_progress:
+ return 0;
+ case connection_established:
+ CLOSE_FD(t->fd);
+ t->status = connection_in_progress;
+ return 0;
+ case connection_none:
+ case connection_closed:
+ return td_replication_server_start(t);
+ default:
+ /* not reached */
+ return -1;
+ }
+}
+
+/* client */
+static void td_replication_retry_connect_event(event_id_t id, char mode,
+ void *private);
+static void td_replication_connect_event(event_id_t id, char mode,
+ void *private);
+int td_replication_client_start(td_replication_connect_t *t)
+{
+ event_id_t id;
+ int fd;
+ int rc;
+ int flags;
+ const char *log_prefix = t->log_prefix;
+
+ if (t->status == connection_in_progress ||
+ t->status == connection_established)
+ return ERROR_INTERNAL;
+
+ DPRINTF("client connecting to %s:%d...\n",
+ inet_ntoa(t->sa.sin_addr), ntohs(t->sa.sin_port));
+
+ if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+ EPRINTF("could not create client socket: %d\n", errno);
+ return ERROR_INTERNAL;
+ }
+
+ /* make socket nonblocking */
+ if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
+ flags = 0;
+ if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
+ EPRINTF("error setting fd %d to non block mode\n", fd);
+ goto err;
+ }
+
+ /*
+ * once we have created the socket and populated the address,
+ * we can now start our non-blocking connect. rather than
+ * duplicating code we trigger a timeout on the socket fd,
+ * which calls out nonblocking connect code
+ */
+ id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0,
+ td_replication_retry_connect_event,
+ t);
+ if(id < 0) {
+ EPRINTF("error registering timeout client connection event
handler: %s\n",
+ strerror(id));
+ goto err;
+ }
+
+ t->fd = fd;
+ t->id = id;
+ t->status = connection_in_progress;
+ return 0;
+
+err:
+ close(fd);
+ return ERROR_INTERNAL;
+}
+
+static void td_replication_client_failed(td_replication_connect_t *t, int rc)
+{
+ td_replication_connect_kill(t);
+ t->callback(t, rc);
+}
+
+static void td_replication_client_done(td_replication_connect_t *t)
+{
+ UNREGISTER_EVENT(t->id);
+ t->status = connection_established;
+ t->callback(t, 0);
+}
+
+static int td_replication_retry_connect(td_replication_connect_t *t)
+{
+ event_id_t id;
+ const char *log_prefix = t->log_prefix;
+
+ UNREGISTER_EVENT(t->id);
+
+ DPRINTF("connect to server 1 second later");
+ id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
+ t->fd, t->retry_timeout_s,
+ td_replication_retry_connect_event,
+ t);
+ if (id < 0) {
+ EPRINTF("error registering timeout client connection event
handler: %s\n",
+ strerror(id));
+ return ERROR_INTERNAL;
+ }
+
+ t->id = id;
+ return 0;
+}
+
+static int td_replication_wait_connect_done(td_replication_connect_t *t)
+{
+ event_id_t id;
+ const char *log_prefix = t->log_prefix;
+
+ UNREGISTER_EVENT(t->id);
+
+ id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
+ t->fd, 0,
+ td_replication_connect_event, t);
+ if (id < 0) {
+ EPRINTF("error registering client connection event handler:
%s\n",
+ strerror(id));
+ return ERROR_INTERNAL;
+ }
+ t->id = id;
+
+ return 0;
+}
+
+/* return 1 if we need to reconnect to backup server */
+static int check_connect_errno(int err)
+{
+ /*
+ * The fd is non-block, so we will not get ETIMEDOUT
+ * after calling connect(). We only can get this errno
+ * by getsockopt().
+ */
+ if (err == ECONNREFUSED || err == ENETUNREACH ||
+ err == EAGAIN || err == ECONNABORTED ||
+ err == ETIMEDOUT)
+ return 1;
+
+ return 0;
+}
+
+static void td_replication_retry_connect_event(event_id_t id, char mode,
+ void *private)
+{
+ td_replication_connect_t *t = private;
+ int rc, ret;
+ const char *log_prefix = t->log_prefix;
+
+ /* do a non-blocking connect */
+ ret = connect(t->fd, (struct sockaddr *)&t->sa, sizeof(t->sa));
+ if (ret) {
+ if (errno == EINPROGRESS) {
+ /*
+ * the connect returned EINPROGRESS (nonblocking
+ * connect) we must wait for the fd to be writeable
+ * to determine if the connect worked
+ */
+ rc = td_replication_wait_connect_done(t);
+ if (rc)
+ goto fail;
+ return;
+ }
+
+ if (check_connect_errno(errno)) {
+ rc = td_replication_retry_connect(t);
+ if (rc)
+ goto fail;
+ return;
+ }
+
+ /* not recoverable */
+ EPRINTF("error connection to server %s\n", strerror(errno));
+ rc = ERROR_CONNECTION;
+ goto fail;
+ }
+
+ /* The connection is established unexpectedly */
+ td_replication_client_done(t);
+
+ return;
+
+fail:
+ td_replication_client_failed(t, rc);
+}
+
+/* callback when nonblocking connect() is finished */
+static void td_replication_connect_event(event_id_t id, char mode,
+ void *private)
+{
+ int socket_errno;
+ socklen_t socket_errno_size;
+ td_replication_connect_t *t = private;
+ int rc;
+ const char *log_prefix = t->log_prefix;
+
+ /* check to see if the connect succeeded */
+ socket_errno_size = sizeof(socket_errno);
+ if (getsockopt(t->fd, SOL_SOCKET, SO_ERROR,
+ &socket_errno, &socket_errno_size)) {
+ EPRINTF("error getting socket errno\n");
+ return;
+ }
+
+ DPRINTF("socket connect returned %d\n", socket_errno);
+
+ if (socket_errno) {
+ /* the connect did not succeed */
+ if (check_connect_errno(socket_errno)) {
+ /*
+ * we can probably assume that the backup is down.
+ * just try again later
+ */
+ rc = td_replication_retry_connect(t);
+ if (rc)
+ goto fail;
+
+ return;
+ } else {
+ EPRINTF("socket connect returned %d, giving up\n",
+ socket_errno);
+ rc = ERROR_CONNECTION;
+ goto fail;
+ }
+ }
+
+ td_replication_client_done(t);
+
+ return;
+
+fail:
+ td_replication_client_failed(t, rc);
+}
diff --git a/tools/blktap2/drivers/block-replication.h
b/tools/blktap2/drivers/block-replication.h
new file mode 100644
index 0000000..0bd6e71
--- /dev/null
+++ b/tools/blktap2/drivers/block-replication.h
@@ -0,0 +1,113 @@
+/*
+ * Copyright (C) 2014 FUJITSU LIMITED
+ * Author: Wen Congyang <wency@xxxxxxxxxxxxxx>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; version 2.1 only. with the special
+ * exception on linking described in file LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ */
+
+#ifndef BLOCK_REPLICATION_H
+#define BLOCK_REPLICATION_H
+
+#include "scheduler.h"
+#include <sys/socket.h>
+#include <netdb.h>
+
+#define CONTAINER_OF(inner_ptr, outer, member_name) \
+ ({ \
+ typeof(outer) *container_of_; \
+ container_of_ = (void*)((char*)(inner_ptr) - \
+ offsetof(typeof(outer), member_name)); \
+ (void)(&container_of_->member_name == \
+ (typeof(inner_ptr))0) /* type check */; \
+ container_of_; \
+ })
+
+#define UNREGISTER_EVENT(id) \
+ do { \
+ if (id >= 0) { \
+ tapdisk_server_unregister_event(id); \
+ id = -1; \
+ } \
+ } while (0)
+#define CLOSE_FD(fd) \
+ do { \
+ if (fd >= 0) { \
+ close(fd); \
+ fd = -1; \
+ } \
+ } while (0)
+
+enum {
+ ERROR_INTERNAL = -1,
+ ERROR_IO = -2,
+ ERROR_CONNECTION = -3,
+ ERROR_CLOSE = -4,
+};
+
+typedef struct td_replication_connect td_replication_connect_t;
+typedef void td_replication_callback(td_replication_connect_t *r, int rc);
+
+struct td_replication_connect {
+ /*
+ * caller must fill these in before calling
+ * td_replication_connect_init()
+ */
+ const char *log_prefix;
+ td_replication_callback *callback;
+ int retry_timeout_s;
+ int max_connections;
+ /*
+ * The caller uses this fd to read/write after
+ * the connection is established
+ */
+ int fd;
+
+ /* private */
+ struct sockaddr_in sa;
+ int listen_fd;
+ event_id_t id;
+
+ int status;
+};
+
+/* return -errno if failure happened, otherwise return 0 */
+int td_replication_connect_init(td_replication_connect_t *t, const char *name);
+/*
+ * Return value:
+ * -1: connection is closed or not connected
+ * 0: connection is in progress
+ * 1: connection is established
+ */
+int td_replication_connect_status(td_replication_connect_t *t);
+void td_replication_connect_kill(td_replication_connect_t *t);
+
+/*
+ * Return value:
+ * -2: this caller should be client
+ * -1: error
+ * 0: connection is in progress
+ */
+int td_replication_server_start(td_replication_connect_t *t);
+/*
+ * Return value:
+ * -2: this caller should be client
+ * -1: error
+ * 0: connection is in progress
+ */
+int td_replication_server_restart(td_replication_connect_t *t);
+/*
+ * Return value:
+ * -1: error
+ * 0: connection is in progress
+ */
+int td_replication_client_start(td_replication_connect_t *t);
+
+#endif
--
1.9.3
_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel
|
![]() |
Lists.xenproject.org is hosted with RackSpace, monitoring our |