[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [PATCH 12/17] tools: blktap2: implement an API to create a connection asynchronously



tapdisk2 is a single thread process. If we use remus,
we will block in primary_blocking_connect(). The
user will not have any chance to talk with tapdisk2.
So we should connect to backup asynchronously. The patch
only implements an API to create a connection asynchronously.

Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
---
 tools/blktap2/drivers/Makefile            |   1 +
 tools/blktap2/drivers/block-replication.c | 468 ++++++++++++++++++++++++++++++
 tools/blktap2/drivers/block-replication.h | 111 +++++++
 3 files changed, 580 insertions(+)
 create mode 100644 tools/blktap2/drivers/block-replication.c
 create mode 100644 tools/blktap2/drivers/block-replication.h

diff --git a/tools/blktap2/drivers/Makefile b/tools/blktap2/drivers/Makefile
index 3476fc1..a7f45c7 100644
--- a/tools/blktap2/drivers/Makefile
+++ b/tools/blktap2/drivers/Makefile
@@ -29,6 +29,7 @@ REMUS-OBJS  := block-remus.o
 REMUS-OBJS  += hashtable.o
 REMUS-OBJS  += hashtable_itr.o
 REMUS-OBJS  += hashtable_utility.o
+REMUS-OBJS  += block-replication.o
 
 tapdisk2 tapdisk-stream tapdisk-diff $(QCOW_UTIL): AIOLIBS := -laio
 
diff --git a/tools/blktap2/drivers/block-replication.c 
b/tools/blktap2/drivers/block-replication.c
new file mode 100644
index 0000000..e4b2679
--- /dev/null
+++ b/tools/blktap2/drivers/block-replication.c
@@ -0,0 +1,468 @@
+/*
+ * Copyright (C) 2014 FUJITSU LIMITED
+ * Author: Wen Congyang <wency@xxxxxxxxxxxxxx>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; version 2.1 only. with the special
+ * exception on linking described in file LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Lesser General Public License for more details.
+ */
+
+#include "tapdisk-server.h"
+#include "block-replication.h"
+
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <syslog.h>
+#include <stdlib.h>
+#include <arpa/inet.h>
+
+#undef DPRINTF
+#undef EPRINTF
+#define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _f, log_prefix, ## _a)
+#define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f, log_prefix, ## _a)
+
+/* connection status */
+enum {
+       connection_none,
+       connection_in_progress,
+       connection_established,
+       connection_closed,
+};
+
+/* common functions */
+/* args should be host:port */
+static int get_args(td_replication_connect_t *t, const char* name)
+{
+       char* host;
+       const char* port;
+       int gai_status;
+       int valid_addr;
+       struct addrinfo gai_hints;
+       struct addrinfo *servinfo, *servinfo_itr;
+       const char *log_prefix = t->log_prefix;
+
+       memset(&gai_hints, 0, sizeof gai_hints);
+       gai_hints.ai_family = AF_UNSPEC;
+       gai_hints.ai_socktype = SOCK_STREAM;
+
+       port = strchr(name, ':');
+       if (!port) {
+               EPRINTF("missing host in %s\n", name);
+               return -ENOENT;
+       }
+       if (!(host = strndup(name, port - name))) {
+               EPRINTF("unable to allocate host\n");
+               return -ENOMEM;
+       }
+       port++;
+       if ((gai_status = getaddrinfo(host, port,
+                                     &gai_hints, &servinfo)) != 0) {
+               EPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
+               free(host);
+               return -ENOENT;
+       }
+       free(host);
+
+       /* TODO: do something smarter here */
+       valid_addr = 0;
+       for (servinfo_itr = servinfo; servinfo_itr != NULL;
+            servinfo_itr = servinfo_itr->ai_next) {
+               if (servinfo_itr->ai_family == AF_INET) {
+                       valid_addr = 1;
+                       memset(&t->sa, 0, sizeof(t->sa));
+                       t->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr;
+                       break;
+               }
+       }
+       freeaddrinfo(servinfo);
+
+       if (!valid_addr)
+               return -ENOENT;
+
+       DPRINTF("host: %s, port: %d\n", inet_ntoa(t->sa.sin_addr),
+               ntohs(t->sa.sin_port));
+
+       return 0;
+}
+
+int td_replication_connect_init(td_replication_connect_t *t, const char *name)
+{
+       int rc;
+
+       rc = get_args(t, name);
+       if (rc)
+               return rc;
+
+       t->listen_fd = -1;
+       t->id = -1;
+       t->status = connection_none;
+       return 0;
+}
+
+int td_replication_connect_status(td_replication_connect_t *t)
+{
+       const char *log_prefix = t->log_prefix;
+
+       switch (t->status) {
+       case connection_none:
+       case connection_closed:
+               return -1;
+       case connection_in_progress:
+               return 0;
+       case connection_established:
+               return 1;
+       default:
+               EPRINTF("td_replication_connect is corruptted\n");
+               return -2;
+       }
+}
+
+void td_replication_connect_kill(td_replication_connect_t *t)
+{
+       if (t->status != connection_in_progress &&
+           t->status != connection_established)
+               return;
+
+       UNREGISTER_EVENT(t->id);
+       CLOSE_FD(t->fd);
+       CLOSE_FD(t->listen_fd);
+       t->status = connection_closed;
+}
+
+/* server */
+static void td_replication_server_accept(event_id_t id, char mode,
+                                        void *private);
+
+int td_replication_server_start(td_replication_connect_t *t)
+{
+       int opt;
+       int rc = -1;
+       event_id_t id;
+       int fd;
+       const char *log_prefix = t->log_prefix;
+
+       if (t->status == connection_in_progress ||
+           t->status == connection_established)
+               return rc;
+
+       fd = socket(AF_INET, SOCK_STREAM, 0);
+       if (fd < 0) {
+               EPRINTF("could not create server socket: %d\n", errno);
+               return rc;
+       }
+
+       opt = 1;
+       if (setsockopt(fd, SOL_SOCKET,
+                      SO_REUSEADDR, &opt, sizeof(opt)) < 0)
+               DPRINTF("Error setting REUSEADDR on %d: %d\n", fd, errno);
+
+       if (bind(fd, (struct sockaddr *)&t->sa, sizeof(t->sa)) < 0) {
+               DPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
+                       fd, inet_ntoa(t->sa.sin_addr),
+                       ntohs(t->sa.sin_port), errno, strerror(errno));
+               if (errno == EADDRNOTAVAIL)
+                       rc = -2;
+               goto err;
+       }
+
+       if (listen(fd, t->max_connections)) {
+               EPRINTF("could not listen on socket: %d\n", errno);
+               goto err;
+       }
+
+       /*
+        * The socket is now bound to the address and listening so we
+        * may now register the fd with tapdisk
+        */
+       id =  tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+                                           fd, 0,
+                                           td_replication_server_accept, t);
+       if (id < 0) {
+               EPRINTF("error registering server connection event handler: %s",
+                       strerror(id));
+               goto err;
+       }
+       t->listen_fd = fd;
+       t->id = id;
+       t->status = connection_in_progress;
+
+       return 0;
+
+err:
+       close(fd);
+       return rc;
+}
+
+static void td_replication_server_accept(event_id_t id, char mode,
+                                        void *private)
+{
+       td_replication_connect_t *t = private;
+       int fd;
+       const char *log_prefix = t->log_prefix;
+
+       /* XXX: add address-based black/white list */
+       fd = accept(t->listen_fd, NULL, NULL);
+       if (fd < 0) {
+               EPRINTF("error accepting connection: %d\n", errno);
+               return;
+       }
+
+       if (t->status == connection_established) {
+               EPRINTF("connection is already established\n");
+               close(fd);
+               return;
+       }
+
+       DPRINTF("server accepted connection\n");
+       t->fd = fd;
+       t->status = connection_established;
+       t->callback(t, 0);
+}
+
+int td_replication_server_restart(td_replication_connect_t *t)
+{
+       switch (t->status) {
+       case connection_in_progress:
+               return 0;
+       case connection_established:
+               CLOSE_FD(t->fd);
+               t->status = connection_in_progress;
+               return 0;
+       case connection_none:
+       case connection_closed:
+               return td_replication_server_start(t);
+       default:
+               /* not reached */
+               return -1;
+       }
+}
+
+/* client */
+static void td_replication_retry_connect_event(event_id_t id, char mode,
+                                              void *private);
+static void td_replication_connect_event(event_id_t id, char mode,
+                                        void *private);
+int td_replication_client_start(td_replication_connect_t *t)
+{
+       event_id_t id;
+       int fd;
+       int rc;
+       int flags;
+       const char *log_prefix = t->log_prefix;
+
+       if (t->status == connection_in_progress ||
+           t->status == connection_established)
+               return ERROR_INTERNAL;
+
+       DPRINTF("client connecting to %s:%d...\n",
+               inet_ntoa(t->sa.sin_addr), ntohs(t->sa.sin_port));
+
+       if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+               EPRINTF("could not create client socket: %d\n", errno);
+               return ERROR_INTERNAL;
+       }
+
+       /* make socket nonblocking */
+       if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
+               flags = 0;
+       if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
+               EPRINTF("error setting fd %d to non block mode\n", fd);
+               goto err;
+       }
+
+       /*
+        * once we have created the socket and populated the address,
+        * we can now start our non-blocking connect. rather than
+        * duplicating code we trigger a timeout on the socket fd,
+        * which calls out nonblocking connect code
+        */
+       id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0,
+                                          td_replication_retry_connect_event,
+                                          t);
+       if(id < 0) {
+               EPRINTF("error registering timeout client connection event 
handler: %s\n",
+                       strerror(id));
+               goto err;
+       }
+
+       t->fd = fd;
+       t->id = id;
+       t->status = connection_in_progress;
+       return 0;
+
+err:
+       close(fd);
+       return ERROR_INTERNAL;
+}
+
+static void td_replication_client_failed(td_replication_connect_t *t, int rc)
+{
+       td_replication_connect_kill(t);
+       t->callback(t, rc);
+}
+
+static void td_replication_client_done(td_replication_connect_t *t)
+{
+       UNREGISTER_EVENT(t->id);
+       t->status = connection_established;
+       t->callback(t, 0);
+}
+
+static int td_replication_retry_connect(td_replication_connect_t *t)
+{
+       event_id_t id;
+       const char *log_prefix = t->log_prefix;
+
+       UNREGISTER_EVENT(t->id);
+
+       DPRINTF("connect to server 1 second later");
+       id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
+                                          t->fd, t->retry_timeout_s,
+                                          td_replication_retry_connect_event,
+                                          t);
+       if (id < 0) {
+               EPRINTF("error registering timeout client connection event 
handler: %s\n",
+                       strerror(id));
+               return ERROR_INTERNAL;
+       }
+
+       t->id = id;
+       return 0;
+}
+
+static int td_replication_wait_connect_done(td_replication_connect_t *t)
+{
+       event_id_t id;
+       const char *log_prefix = t->log_prefix;
+
+       UNREGISTER_EVENT(t->id);
+
+       id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
+                                          t->fd, 0,
+                                          td_replication_connect_event, t);
+       if (id < 0) {
+               EPRINTF("error registering client connection event handler: 
%s\n",
+                       strerror(id));
+               return ERROR_INTERNAL;
+       }
+       t->id = id;
+
+       return 0;
+}
+
+/* return 1 if we need to reconnect to backup server */
+static int check_connect_errno(int err)
+{
+       /*
+        * The fd is non-block, so we will not get ETIMEDOUT
+        * after calling connect(). We only can get this errno
+        * by getsockopt().
+        */
+       if (err == ECONNREFUSED || err == ENETUNREACH ||
+           err == EAGAIN || err == ECONNABORTED ||
+           err == ETIMEDOUT)
+           return 1;
+
+       return 0;
+}
+
+static void td_replication_retry_connect_event(event_id_t id, char mode,
+                                              void *private)
+{
+       td_replication_connect_t *t = private;
+       int rc, ret;
+       const char *log_prefix = t->log_prefix;
+
+       /* do a non-blocking connect */
+       ret = connect(t->fd, (struct sockaddr *)&t->sa, sizeof(t->sa));
+       if (ret) {
+               if (errno == EINPROGRESS) {
+                       /*
+                        * the connect returned EINPROGRESS (nonblocking
+                        * connect) we must wait for the fd to be writeable
+                        * to determine if the connect worked
+                        */
+                       rc = td_replication_wait_connect_done(t);
+                       if (rc)
+                               goto fail;
+                       return;
+               }
+
+               if (check_connect_errno(errno)) {
+                       rc = td_replication_retry_connect(t);
+                       if (rc)
+                               goto fail;
+                       return;
+               }
+
+               /* not recoverable */
+               EPRINTF("error connection to server %s\n", strerror(errno));
+               rc = ERROR_CONNECTION;
+               goto fail;
+       }
+
+       /* The connection is established unexpectedly */
+       td_replication_client_done(t);
+
+       return;
+
+fail:
+       td_replication_client_failed(t, rc);
+}
+
+/* callback when nonblocking connect() is finished */
+static void td_replication_connect_event(event_id_t id, char mode,
+                                        void *private)
+{
+       int socket_errno;
+       socklen_t socket_errno_size;
+       td_replication_connect_t *t = private;
+       int rc;
+       const char *log_prefix = t->log_prefix;
+
+       /* check to see if the connect succeeded */
+       socket_errno_size = sizeof(socket_errno);
+       if (getsockopt(t->fd, SOL_SOCKET, SO_ERROR,
+                      &socket_errno, &socket_errno_size)) {
+               EPRINTF("error getting socket errno\n");
+               return;
+       }
+
+       DPRINTF("socket connect returned %d\n", socket_errno);
+
+       if (socket_errno) {
+               /* the connect did not succeed */
+               if (check_connect_errno(socket_errno)) {
+                       /*
+                        * we can probably assume that the backup is down.
+                        * just try again later
+                        */
+                       rc = td_replication_retry_connect(t);
+                       if (rc)
+                               goto fail;
+
+                       return;
+               } else {
+                       EPRINTF("socket connect returned %d, giving up\n",
+                               socket_errno);
+                       rc = ERROR_CONNECTION;
+                       goto fail;
+               }
+       }
+
+       td_replication_client_done(t);
+
+       return;
+
+fail:
+       td_replication_client_failed(t, rc);
+}
diff --git a/tools/blktap2/drivers/block-replication.h 
b/tools/blktap2/drivers/block-replication.h
new file mode 100644
index 0000000..9e051cc
--- /dev/null
+++ b/tools/blktap2/drivers/block-replication.h
@@ -0,0 +1,111 @@
+/*
+ * Copyright (C) 2014 FUJITSU LIMITED
+ * Author: Wen Congyang <wency@xxxxxxxxxxxxxx>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; version 2.1 only. with the special
+ * exception on linking described in file LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Lesser General Public License for more details.
+ */
+
+#ifndef BLOCK_REPLICATION_H
+#define BLOCK_REPLICATION_H
+
+#include "scheduler.h"
+#include <sys/socket.h>
+#include <netdb.h>
+
+#define CONTAINER_OF(inner_ptr, outer, member_name)                    \
+       ({                                                              \
+               typeof(outer) *container_of_;                           \
+               container_of_ = (void*)((char*)(inner_ptr) -            \
+                               offsetof(typeof(outer), member_name));  \
+               (void)(&container_of_->member_name ==                   \
+                      (typeof(inner_ptr))0) /* type check */;          \
+               container_of_;                                          \
+       })
+
+#define UNREGISTER_EVENT(id)                                   \
+       do {                                                    \
+               if (id >= 0) {                                  \
+                       tapdisk_server_unregister_event(id);    \
+                       id = -1;                                \
+               }                                               \
+       } while (0)
+#define CLOSE_FD(fd)                   \
+       do {                            \
+               if (fd >= 0) {          \
+                       close(fd);      \
+                       fd = -1;        \
+               }                       \
+       } while (0)
+
+enum {
+       ERROR_INTERNAL = -1,
+       ERROR_CONNECTION = -2,
+};
+
+typedef struct td_replication_connect td_replication_connect_t;
+typedef void td_replication_callback(td_replication_connect_t *r, int rc);
+
+struct td_replication_connect {
+       /*
+        * caller must fill these in before calling
+        * td_replication_connect_init()
+        */
+       const char *log_prefix;
+       td_replication_callback *callback;
+       int retry_timeout_s;
+       int max_connections;
+       /*
+        * The caller uses this fd to read/write after
+        * the connection is established
+        */
+       int fd;
+
+       /* private */
+       struct sockaddr_in sa;
+       int listen_fd;
+       event_id_t id;
+
+       int status;
+};
+
+/* return -errno if failure happened, otherwise return 0 */
+int td_replication_connect_init(td_replication_connect_t *t, const char *name);
+/*
+ * Return value:
+ *   -1: connection is closed or not connected
+ *    0: connection is in progress
+ *    1: connection is established
+ */
+int td_replication_connect_status(td_replication_connect_t *t);
+void td_replication_connect_kill(td_replication_connect_t *t);
+
+/*
+ * Return value:
+ *   -2: this caller should be client
+ *   -1: error
+ *    0: connection is in progress
+ */
+int td_replication_server_start(td_replication_connect_t *t);
+/*
+ * Return value:
+ *   -2: this caller should be client
+ *   -1: error
+ *    0: connection is in progress
+ */
+int td_replication_server_restart(td_replication_connect_t *t);
+/*
+ * Return value:
+ *   -1: error
+ *    0: connection is in progress
+ */
+int td_replication_client_start(td_replication_connect_t *t);
+
+#endif
-- 
1.9.3


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.