[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [PATCH VERY RFC 1/5] tools/libxl: Add support for writing a set of buffers asynchronously
From: Ross Lagerwall <ross.lagerwall@xxxxxxxxxx> Signed-off-by: Ross Lagerwall <ross.lagerwall@xxxxxxxxxx> --- tools/libxl/libxl_aoutils.c | 118 ++++++++++++++++++++++++++++++++++++++++++ tools/libxl/libxl_internal.h | 44 ++++++++++++++++ 2 files changed, 162 insertions(+) diff --git a/tools/libxl/libxl_aoutils.c b/tools/libxl/libxl_aoutils.c index b10d2e1..6027d05 100644 --- a/tools/libxl/libxl_aoutils.c +++ b/tools/libxl/libxl_aoutils.c @@ -324,6 +324,124 @@ int libxl__datacopier_start(libxl__datacopier_state *dc) return rc; } + +/*----- writer -----*/ + +void libxl__writer_kill(libxl__writer_state *dw) +{ + STATE_AO_GC(dw->ao); + libxl__writer_buf *buf, *tbuf; + + libxl__ev_fd_deregister(gc, &dw->towrite); + LIBXL_TAILQ_FOREACH_SAFE(buf, &dw->bufs, entry, tbuf) + free(buf); + LIBXL_TAILQ_INIT(&dw->bufs); +} + +void libxl__writer_append(libxl__egc *egc, libxl__writer_state *dw, + const void *data, size_t len) +{ + EGC_GC; + libxl__writer_buf *buf; + + assert(len < dw->maxsz - dw->used); + + buf = libxl__zalloc(NOGC, sizeof(*buf)); + buf->used = len; + memcpy(buf->buf, data, len); + + dw->used += len; + LIBXL_TAILQ_INSERT_TAIL(&dw->bufs, buf, entry); +} + +static int writer_pollhup_handled(libxl__egc *egc, + libxl__writer_state *dw, + short revents) +{ + STATE_AO_GC(dw->ao); + + if (dw->callback_pollhup && (revents & POLLHUP)) { + LOG(DEBUG, "received POLLHUP on %s during writing of %s", + dw->towhat, dw->writewhat); + libxl__writer_kill(dw); + dw->callback_pollhup(egc, dw, 1, -1); + return 1; + } + return 0; +} + +static void writer_writable(libxl__egc *egc, libxl__ev_fd *ev, + int fd, short events, short revents) { + libxl__writer_state *dw = CONTAINER_OF(ev, *dw, towrite); + STATE_AO_GC(dw->ao); + + if (writer_pollhup_handled(egc, dw, revents)) + return; + + if (revents & ~POLLOUT) { + LOG(ERROR, "unexpected poll event 0x%x (should be POLLOUT)" + " during writing %s to %s", revents, dw->writewhat, dw->towhat); + libxl__writer_kill(dw); + dw->callback(egc, dw, -1, 0); + return; + } + assert(revents & POLLOUT); + for (;;) { + libxl__writer_buf *buf = LIBXL_TAILQ_FIRST(&dw->bufs); + if (!buf) { + libxl__writer_kill(dw); + dw->callback(egc, dw, 0, 0); + break; + } + if (!buf->used) { + LIBXL_TAILQ_REMOVE(&dw->bufs, buf, entry); + free(buf); + continue; + } + int r = write(ev->fd, buf->buf, buf->used); + if (r < 0) { + if (errno == EINTR) continue; + if (errno == EWOULDBLOCK) break; + LOGE(ERROR, "error writing %s to %s", + dw->writewhat, dw->towhat); + libxl__writer_kill(dw); + dw->callback(egc, dw, 1, errno); + return; + } + assert(r > 0); + assert(r <= buf->used); + buf->used -= r; + dw->used -= r; + assert(dw->used >= 0); + memmove(buf->buf, buf->buf+r, buf->used); + } +} + +void libxl__writer_init(libxl__writer_state *dw) +{ + assert(dw->ao); + libxl__ev_fd_init(&dw->towrite); + LIBXL_TAILQ_INIT(&dw->bufs); +} + +int libxl__writer_start(libxl__writer_state *dw) +{ + int rc; + STATE_AO_GC(dw->ao); + + libxl__writer_init(dw); + + rc = libxl__ev_fd_register(gc, &dw->towrite, writer_writable, + dw->writefd, POLLOUT); + if (rc) goto out; + + return 0; + + out: + libxl__writer_kill(dw); + return rc; +} + /*----- openpty -----*/ /* implementation */ diff --git a/tools/libxl/libxl_internal.h b/tools/libxl/libxl_internal.h index 04c9378..47fbf45 100644 --- a/tools/libxl/libxl_internal.h +++ b/tools/libxl/libxl_internal.h @@ -2471,6 +2471,50 @@ typedef struct libxl__save_helper_state { * marshalling and xc callback functions */ } libxl__save_helper_state; +/*----- writer: writes a set of buffers to an fd asynchronously -----*/ + +typedef struct libxl__writer_state libxl__writer_state; +typedef struct libxl__writer_buf libxl__writer_buf; + +/* onwrite==1 means failure happened when writing, logged, errnoval is valid + * onwrite==-1 means some other internal failure, errnoval not valid, logged + * If we get POLLHUP, we call callback_pollhup(..., 1, -1); + * or if callback_pollhup==0 this is an internal failure, as above. + * In all cases copier is killed before calling this callback */ +typedef void libxl__writer_callback(libxl__egc *egc, + libxl__writer_state *dw, int state, int errnoval); + +struct libxl__writer_buf { + /* private to writer */ + LIBXL_TAILQ_ENTRY(libxl__writer_buf) entry; + int used; + char buf[1000]; +}; + +struct libxl__writer_state { + /* caller must fill these in, and they must all remain valid */ + libxl__ao *ao; + int writefd; + ssize_t maxsz; + const char *towhat, *writewhat; /* for error msgs */ + libxl__writer_callback *callback; + libxl__writer_callback *callback_pollhup; + /* remaining fields are private to writer */ + libxl__ev_fd towrite; + ssize_t used; + LIBXL_TAILQ_HEAD(libxl__writer_bufs, libxl__writer_buf) bufs; +}; + +_hidden void libxl__writer_init(libxl__writer_state *dc); +_hidden void libxl__writer_kill(libxl__writer_state *dc); +_hidden int libxl__writer_start(libxl__writer_state *dc); + +/* Inserts literal data into the output stream. The data is copied. + * May safely be used only immediately after libxl__writer_start + * (before the ctx is unlocked). But may be called multiple times. + * NB exceeding maxsz will fail an assertion! */ +_hidden void libxl__writer_append(libxl__egc*, libxl__writer_state*, + const void *data, size_t len); /*----- Domain suspend (save) state structure -----*/ -- 1.7.10.4 _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxx http://lists.xen.org/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |