[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] Re: [Xen-devel] [PATCH VERY RFC 1/5] tools/libxl: Add support for writing a set of buffers asynchronously
At 09/04/2014 01:14 AM, Andrew Cooper Write: > From: Ross Lagerwall <ross.lagerwall@xxxxxxxxxx> If we allow datacopier's rfd is -1, we can use datacopier to implement it. > > Signed-off-by: Ross Lagerwall <ross.lagerwall@xxxxxxxxxx> > --- > tools/libxl/libxl_aoutils.c | 118 > ++++++++++++++++++++++++++++++++++++++++++ > tools/libxl/libxl_internal.h | 44 ++++++++++++++++ > 2 files changed, 162 insertions(+) > > diff --git a/tools/libxl/libxl_aoutils.c b/tools/libxl/libxl_aoutils.c > index b10d2e1..6027d05 100644 > --- a/tools/libxl/libxl_aoutils.c > +++ b/tools/libxl/libxl_aoutils.c > @@ -324,6 +324,124 @@ int libxl__datacopier_start(libxl__datacopier_state *dc) > return rc; > } > > + > +/*----- writer -----*/ > + > +void libxl__writer_kill(libxl__writer_state *dw) > +{ > + STATE_AO_GC(dw->ao); > + libxl__writer_buf *buf, *tbuf; > + > + libxl__ev_fd_deregister(gc, &dw->towrite); > + LIBXL_TAILQ_FOREACH_SAFE(buf, &dw->bufs, entry, tbuf) > + free(buf); > + LIBXL_TAILQ_INIT(&dw->bufs); > +} > + > +void libxl__writer_append(libxl__egc *egc, libxl__writer_state *dw, > + const void *data, size_t len) > +{ > + EGC_GC; > + libxl__writer_buf *buf; > + > + assert(len < dw->maxsz - dw->used); > + > + buf = libxl__zalloc(NOGC, sizeof(*buf)); > + buf->used = len; > + memcpy(buf->buf, data, len); > + > + dw->used += len; > + LIBXL_TAILQ_INSERT_TAIL(&dw->bufs, buf, entry); > +} > + > +static int writer_pollhup_handled(libxl__egc *egc, > + libxl__writer_state *dw, > + short revents) > +{ > + STATE_AO_GC(dw->ao); > + > + if (dw->callback_pollhup && (revents & POLLHUP)) { > + LOG(DEBUG, "received POLLHUP on %s during writing of %s", > + dw->towhat, dw->writewhat); > + libxl__writer_kill(dw); > + dw->callback_pollhup(egc, dw, 1, -1); > + return 1; > + } > + return 0; > +} > + > +static void writer_writable(libxl__egc *egc, libxl__ev_fd *ev, > + int fd, short events, short revents) { > + libxl__writer_state *dw = CONTAINER_OF(ev, *dw, towrite); > + STATE_AO_GC(dw->ao); > + > + if (writer_pollhup_handled(egc, dw, revents)) > + return; > + > + if (revents & ~POLLOUT) { > + LOG(ERROR, "unexpected poll event 0x%x (should be POLLOUT)" > + " during writing %s to %s", revents, dw->writewhat, dw->towhat); > + libxl__writer_kill(dw); > + dw->callback(egc, dw, -1, 0); > + return; > + } > + assert(revents & POLLOUT); > + for (;;) { > + libxl__writer_buf *buf = LIBXL_TAILQ_FIRST(&dw->bufs); > + if (!buf) { > + libxl__writer_kill(dw); > + dw->callback(egc, dw, 0, 0); > + break; > + } > + if (!buf->used) { > + LIBXL_TAILQ_REMOVE(&dw->bufs, buf, entry); > + free(buf); > + continue; > + } > + int r = write(ev->fd, buf->buf, buf->used); > + if (r < 0) { > + if (errno == EINTR) continue; > + if (errno == EWOULDBLOCK) break; > + LOGE(ERROR, "error writing %s to %s", > + dw->writewhat, dw->towhat); > + libxl__writer_kill(dw); > + dw->callback(egc, dw, 1, errno); > + return; > + } > + assert(r > 0); > + assert(r <= buf->used); > + buf->used -= r; > + dw->used -= r; > + assert(dw->used >= 0); > + memmove(buf->buf, buf->buf+r, buf->used); > + } > +} > + > +void libxl__writer_init(libxl__writer_state *dw) > +{ > + assert(dw->ao); > + libxl__ev_fd_init(&dw->towrite); > + LIBXL_TAILQ_INIT(&dw->bufs); > +} > + > +int libxl__writer_start(libxl__writer_state *dw) > +{ > + int rc; > + STATE_AO_GC(dw->ao); > + > + libxl__writer_init(dw); > + > + rc = libxl__ev_fd_register(gc, &dw->towrite, writer_writable, > + dw->writefd, POLLOUT); > + if (rc) goto out; > + > + return 0; > + > + out: > + libxl__writer_kill(dw); > + return rc; > +} > + > /*----- openpty -----*/ > > /* implementation */ > diff --git a/tools/libxl/libxl_internal.h b/tools/libxl/libxl_internal.h > index 04c9378..47fbf45 100644 > --- a/tools/libxl/libxl_internal.h > +++ b/tools/libxl/libxl_internal.h > @@ -2471,6 +2471,50 @@ typedef struct libxl__save_helper_state { > * marshalling and xc callback functions */ > } libxl__save_helper_state; > > +/*----- writer: writes a set of buffers to an fd asynchronously -----*/ > + > +typedef struct libxl__writer_state libxl__writer_state; > +typedef struct libxl__writer_buf libxl__writer_buf; > + > +/* onwrite==1 means failure happened when writing, logged, errnoval is valid > + * onwrite==-1 means some other internal failure, errnoval not valid, logged > + * If we get POLLHUP, we call callback_pollhup(..., 1, -1); > + * or if callback_pollhup==0 this is an internal failure, as above. > + * In all cases copier is killed before calling this callback */ > +typedef void libxl__writer_callback(libxl__egc *egc, > + libxl__writer_state *dw, int state, int errnoval); > + > +struct libxl__writer_buf { > + /* private to writer */ > + LIBXL_TAILQ_ENTRY(libxl__writer_buf) entry; > + int used; > + char buf[1000]; > +}; > + > +struct libxl__writer_state { > + /* caller must fill these in, and they must all remain valid */ > + libxl__ao *ao; > + int writefd; > + ssize_t maxsz; > + const char *towhat, *writewhat; /* for error msgs */ > + libxl__writer_callback *callback; > + libxl__writer_callback *callback_pollhup; > + /* remaining fields are private to writer */ > + libxl__ev_fd towrite; > + ssize_t used; > + LIBXL_TAILQ_HEAD(libxl__writer_bufs, libxl__writer_buf) bufs; > +}; > + > +_hidden void libxl__writer_init(libxl__writer_state *dc); > +_hidden void libxl__writer_kill(libxl__writer_state *dc); > +_hidden int libxl__writer_start(libxl__writer_state *dc); > + > +/* Inserts literal data into the output stream. The data is copied. > + * May safely be used only immediately after libxl__writer_start > + * (before the ctx is unlocked). But may be called multiple times. > + * NB exceeding maxsz will fail an assertion! */ > +_hidden void libxl__writer_append(libxl__egc*, libxl__writer_state*, > + const void *data, size_t len); > > /*----- Domain suspend (save) state structure -----*/ > > _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxx http://lists.xen.org/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |