[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Xen-devel] [PATCH VERY RFC 1/5] tools/libxl: Add support for writing a set of buffers asynchronously



At 09/04/2014 01:14 AM, Andrew Cooper Write:
> From: Ross Lagerwall <ross.lagerwall@xxxxxxxxxx>

If we allow datacopier's rfd is -1, we can use datacopier to implement it.

> 
> Signed-off-by: Ross Lagerwall <ross.lagerwall@xxxxxxxxxx>
> ---
>  tools/libxl/libxl_aoutils.c  |  118 
> ++++++++++++++++++++++++++++++++++++++++++
>  tools/libxl/libxl_internal.h |   44 ++++++++++++++++
>  2 files changed, 162 insertions(+)
> 
> diff --git a/tools/libxl/libxl_aoutils.c b/tools/libxl/libxl_aoutils.c
> index b10d2e1..6027d05 100644
> --- a/tools/libxl/libxl_aoutils.c
> +++ b/tools/libxl/libxl_aoutils.c
> @@ -324,6 +324,124 @@ int libxl__datacopier_start(libxl__datacopier_state *dc)
>      return rc;
>  }
>  
> +
> +/*----- writer -----*/
> +
> +void libxl__writer_kill(libxl__writer_state *dw)
> +{
> +    STATE_AO_GC(dw->ao);
> +    libxl__writer_buf *buf, *tbuf;
> +
> +    libxl__ev_fd_deregister(gc, &dw->towrite);
> +    LIBXL_TAILQ_FOREACH_SAFE(buf, &dw->bufs, entry, tbuf)
> +        free(buf);
> +    LIBXL_TAILQ_INIT(&dw->bufs);
> +}
> +
> +void libxl__writer_append(libxl__egc *egc, libxl__writer_state *dw,
> +                          const void *data, size_t len)
> +{
> +    EGC_GC;
> +    libxl__writer_buf *buf;
> +
> +    assert(len < dw->maxsz - dw->used);
> +
> +    buf = libxl__zalloc(NOGC, sizeof(*buf));
> +    buf->used = len;
> +    memcpy(buf->buf, data, len);
> +
> +    dw->used += len;
> +    LIBXL_TAILQ_INSERT_TAIL(&dw->bufs, buf, entry);
> +}
> +
> +static int writer_pollhup_handled(libxl__egc *egc,
> +                                  libxl__writer_state *dw,
> +                                  short revents)
> +{
> +    STATE_AO_GC(dw->ao);
> +
> +    if (dw->callback_pollhup && (revents & POLLHUP)) {
> +        LOG(DEBUG, "received POLLHUP on %s during writing of %s",
> +            dw->towhat, dw->writewhat);
> +        libxl__writer_kill(dw);
> +        dw->callback_pollhup(egc, dw, 1, -1);
> +        return 1;
> +    }
> +    return 0;
> +}
> +
> +static void writer_writable(libxl__egc *egc, libxl__ev_fd *ev,
> +                            int fd, short events, short revents) {
> +    libxl__writer_state *dw = CONTAINER_OF(ev, *dw, towrite);
> +    STATE_AO_GC(dw->ao);
> +
> +    if (writer_pollhup_handled(egc, dw, revents))
> +        return;
> +
> +    if (revents & ~POLLOUT) {
> +        LOG(ERROR, "unexpected poll event 0x%x (should be POLLOUT)"
> +            " during writing %s to %s", revents, dw->writewhat, dw->towhat);
> +        libxl__writer_kill(dw);
> +        dw->callback(egc, dw, -1, 0);
> +        return;
> +    }
> +    assert(revents & POLLOUT);
> +    for (;;) {
> +        libxl__writer_buf *buf = LIBXL_TAILQ_FIRST(&dw->bufs);
> +        if (!buf) {
> +            libxl__writer_kill(dw);
> +            dw->callback(egc, dw, 0, 0);
> +            break;
> +        }
> +        if (!buf->used) {
> +            LIBXL_TAILQ_REMOVE(&dw->bufs, buf, entry);
> +            free(buf);
> +            continue;
> +        }
> +        int r = write(ev->fd, buf->buf, buf->used);
> +        if (r < 0) {
> +            if (errno == EINTR) continue;
> +            if (errno == EWOULDBLOCK) break;
> +            LOGE(ERROR, "error writing %s to %s",
> +                 dw->writewhat, dw->towhat);
> +            libxl__writer_kill(dw);
> +            dw->callback(egc, dw, 1, errno);
> +            return;
> +        }
> +        assert(r > 0);
> +        assert(r <= buf->used);
> +        buf->used -= r;
> +        dw->used -= r;
> +        assert(dw->used >= 0);
> +        memmove(buf->buf, buf->buf+r, buf->used);
> +    }
> +}
> +
> +void libxl__writer_init(libxl__writer_state *dw)
> +{
> +    assert(dw->ao);
> +    libxl__ev_fd_init(&dw->towrite);
> +    LIBXL_TAILQ_INIT(&dw->bufs);
> +}
> +
> +int libxl__writer_start(libxl__writer_state *dw)
> +{
> +    int rc;
> +    STATE_AO_GC(dw->ao);
> +
> +    libxl__writer_init(dw);
> +
> +    rc = libxl__ev_fd_register(gc, &dw->towrite, writer_writable,
> +                               dw->writefd, POLLOUT);
> +    if (rc) goto out;
> +
> +    return 0;
> +
> + out:
> +    libxl__writer_kill(dw);
> +    return rc;
> +}
> +
>  /*----- openpty -----*/
>  
>  /* implementation */
> diff --git a/tools/libxl/libxl_internal.h b/tools/libxl/libxl_internal.h
> index 04c9378..47fbf45 100644
> --- a/tools/libxl/libxl_internal.h
> +++ b/tools/libxl/libxl_internal.h
> @@ -2471,6 +2471,50 @@ typedef struct libxl__save_helper_state {
>                        * marshalling and xc callback functions */
>  } libxl__save_helper_state;
>  
> +/*----- writer: writes a set of buffers to an fd asynchronously -----*/
> +
> +typedef struct libxl__writer_state libxl__writer_state;
> +typedef struct libxl__writer_buf libxl__writer_buf;
> +
> +/* onwrite==1 means failure happened when writing, logged, errnoval is valid
> + * onwrite==-1 means some other internal failure, errnoval not valid, logged
> + * If we get POLLHUP, we call callback_pollhup(..., 1, -1);
> + * or if callback_pollhup==0 this is an internal failure, as above.
> + * In all cases copier is killed before calling this callback */
> +typedef void libxl__writer_callback(libxl__egc *egc,
> +     libxl__writer_state *dw, int state, int errnoval);
> +
> +struct libxl__writer_buf {
> +    /* private to writer */
> +    LIBXL_TAILQ_ENTRY(libxl__writer_buf) entry;
> +    int used;
> +    char buf[1000];
> +};
> +
> +struct libxl__writer_state {
> +    /* caller must fill these in, and they must all remain valid */
> +    libxl__ao *ao;
> +    int writefd;
> +    ssize_t maxsz;
> +    const char *towhat, *writewhat; /* for error msgs */
> +    libxl__writer_callback *callback;
> +    libxl__writer_callback *callback_pollhup;
> +    /* remaining fields are private to writer */
> +    libxl__ev_fd towrite;
> +    ssize_t used;
> +    LIBXL_TAILQ_HEAD(libxl__writer_bufs, libxl__writer_buf) bufs;
> +};
> +
> +_hidden void libxl__writer_init(libxl__writer_state *dc);
> +_hidden void libxl__writer_kill(libxl__writer_state *dc);
> +_hidden int libxl__writer_start(libxl__writer_state *dc);
> +
> +/* Inserts literal data into the output stream.  The data is copied.
> + * May safely be used only immediately after libxl__writer_start
> + * (before the ctx is unlocked).  But may be called multiple times.
> + * NB exceeding maxsz will fail an assertion! */
> +_hidden void libxl__writer_append(libxl__egc*, libxl__writer_state*,
> +                                  const void *data, size_t len);
>  
>  /*----- Domain suspend (save) state structure -----*/
>  
> 


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.