[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Xen-devel] [PATCH 16/27] tools/libxl: Infrastructure for reading a libxl migration v2 stream



On 06/15/2015 09:44 PM, Andrew Cooper wrote:
> From: Ross Lagerwall <ross.lagerwall@xxxxxxxxxx>
> 
> Signed-off-by: Ross Lagerwall <ross.lagerwall@xxxxxxxxxx>
> Signed-off-by: Andrew Cooper <andrew.cooper3@xxxxxxxxxx>
> CC: Ian Campbell <Ian.Campbell@xxxxxxxxxx>
> CC: Ian Jackson <Ian.Jackson@xxxxxxxxxxxxx>
> CC: Wei Liu <wei.liu2@xxxxxxxxxx>
> ---
>  tools/libxl/Makefile            |    1 +
>  tools/libxl/libxl_internal.h    |   39 ++++
>  tools/libxl/libxl_stream_read.c |  485 
> +++++++++++++++++++++++++++++++++++++++
>  3 files changed, 525 insertions(+)
>  create mode 100644 tools/libxl/libxl_stream_read.c
> 
> diff --git a/tools/libxl/Makefile b/tools/libxl/Makefile
> index cc9c152..c71c5fe 100644
> --- a/tools/libxl/Makefile
> +++ b/tools/libxl/Makefile
> @@ -94,6 +94,7 @@ LIBXL_OBJS = flexarray.o libxl.o libxl_create.o libxl_dm.o 
> libxl_pci.o \
>                       libxl_dom.o libxl_exec.o libxl_xshelp.o libxl_device.o \
>                       libxl_internal.o libxl_utils.o libxl_uuid.o \
>                       libxl_json.o libxl_aoutils.o libxl_numa.o libxl_vnuma.o 
> \
> +                     libxl_stream_read.o \
>                       libxl_save_callout.o _libxl_save_msgs_callout.o \
>                       libxl_qmp.o libxl_event.o libxl_fork.o $(LIBXL_OBJS-y)
>  LIBXL_OBJS += libxl_genid.o
> diff --git a/tools/libxl/libxl_internal.h b/tools/libxl/libxl_internal.h
> index 101994f..4f33cb8 100644
> --- a/tools/libxl/libxl_internal.h
> +++ b/tools/libxl/libxl_internal.h
> @@ -19,6 +19,8 @@
>  
>  #include "libxl_osdeps.h" /* must come before any other headers */
>  
> +#include "libxl_sr_stream_format.h"
> +
>  #include <assert.h>
>  #include <dirent.h>
>  #include <errno.h>
> @@ -3121,6 +3123,42 @@ typedef void libxl__domain_create_cb(libxl__egc *egc,
>                                       libxl__domain_create_state*,
>                                       int rc, uint32_t domid);
>  
> +/* State for manipulating a libxl migration v2 stream */
> +typedef struct libxl__stream_read_state libxl__stream_read_state;
> +
> +struct libxl__stream_read_state {
> +    /* filled by the user */
> +    libxl__ao *ao;
> +    int fd;
> +    void (*completion_callback)(libxl__egc *egc,
> +                                libxl__domain_create_state *dcs,
> +                                int rc);
> +    /* Private */
> +    int rc;
> +    bool running;
> +    libxl__datacopier_state dc;
> +    size_t expected_len;
> +    libxl_sr_hdr hdr;
> +    libxl_sr_rec_hdr rec_hdr;
> +    void *rec_body;
> +};
> +
> +_hidden void libxl__stream_read_start(libxl__egc *egc,
> +                                      libxl__stream_read_state *stream);
> +
> +_hidden void libxl__stream_read_continue(libxl__egc *egc,
> +                                         libxl__stream_read_state *stream);
> +
> +_hidden void libxl__stream_read_abort(libxl__egc *egc,
> +                                      libxl__stream_read_state *stream, int 
> rc);
> +
> +static inline bool libxl__stream_read_inuse(
> +    const libxl__stream_read_state *stream)
> +{
> +    return stream->running;
> +}
> +
> +
>  struct libxl__domain_create_state {
>      /* filled in by user */
>      libxl__ao *ao;
> @@ -3137,6 +3175,7 @@ struct libxl__domain_create_state {
>      libxl__stub_dm_spawn_state dmss;
>          /* If we're not doing stubdom, we use only dmss.dm,
>           * for the non-stubdom device model. */
> +    libxl__stream_read_state srs;
>      libxl__save_helper_state shs;
>      /* necessary if the domain creation failed and we have to destroy it */
>      libxl__domain_destroy_state dds;
> diff --git a/tools/libxl/libxl_stream_read.c b/tools/libxl/libxl_stream_read.c
> new file mode 100644
> index 0000000..9cdaadf
> --- /dev/null
> +++ b/tools/libxl/libxl_stream_read.c
> @@ -0,0 +1,485 @@
> +/*
> + * Copyright (C) 2015      Citrix Ltd.
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU Lesser General Public License as published
> + * by the Free Software Foundation; version 2.1 only. with the special
> + * exception on linking described in file LICENSE.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> + * GNU Lesser General Public License for more details.
> + */
> +
> +#include "libxl_osdeps.h" /* must come before any other headers */
> +
> +#include "libxl_internal.h"
> +
> +/*
> + * Infrastructure for reading and acting on the contents of a libxl migration
> + * stream. There are a lot of moving parts here.
> + *
> + * Entry points from outside:
> + *  - libxl__stream_read_start()
> + *     - Set up reading a stream from the start.
> + *
> + *  - libxl__stream_read_continue()
> + *     - Set up reading the next record from a started stream.
> + *
> + * The principle loop functionality involves reading the stream header, then
> + * reading a record at time and acting upon it.  It follows the callbacks:
> + *
> + *  - stream_header_done()
> + *  - stream_record_header_done()
> + *  - stream_record_body_done()
> + *  - process_record()
> + *
> + * process_record() will choose the correct next action based upon the
> + * record.  Upon completion of the action, the next record header will be 
> read
> + * from the stream.
> + */
> +
> +static void stream_success(libxl__egc *egc,
> +                           libxl__stream_read_state *stream);
> +static void stream_failed(libxl__egc *egc,
> +                          libxl__stream_read_state *stream, int rc);
> +static void stream_done(libxl__egc *egc,
> +                        libxl__stream_read_state *stream);
> +
> +/* Event callbacks for main reading loop. */
> +static void stream_header_done(libxl__egc *egc,
> +                               libxl__datacopier_state *dc,
> +                               int onwrite, int errnoval);
> +static void record_header_done(libxl__egc *egc,
> +                               libxl__datacopier_state *dc,
> +                               int onwrite, int errnoval);
> +static void record_body_done(libxl__egc *egc,
> +                             libxl__datacopier_state *dc,
> +                             int onwrite, int errnoval);
> +static void process_record(libxl__egc *egc,
> +                           libxl__stream_read_state *stream);
> +
> +/* Mini-event loop for splicing a emulator record out of the stream. */
> +static void read_emulator_body(libxl__egc *egc,
> +                               libxl__stream_read_state *stream);
> +static void emulator_body_done(libxl__egc *egc,
> +                               libxl__datacopier_state *dc,
> +                               int onwrite, int errnoval);
> +static void emulator_padding_done(libxl__egc *egc,
> +                                  libxl__datacopier_state *dc,
> +                                  int onwrite, int errnoval);
> +
> +void libxl__stream_read_start(libxl__egc *egc,
> +                              libxl__stream_read_state *stream)
> +{
> +    libxl__datacopier_state *dc = &stream->dc;
> +    int ret = 0;
> +
> +    /* State initialisation. */
> +    assert(!stream->running);
> +
> +    memset(dc, 0, sizeof(*dc));
> +    dc->ao = stream->ao;
> +    dc->readfd = stream->fd;
> +    dc->writefd = -1;
> +
> +    /* Start reading the stream header. */
> +    dc->readwhat = "stream header";
> +    dc->readbuf = &stream->hdr;
> +    stream->expected_len = dc->bytes_to_read = sizeof(stream->hdr);
> +    dc->used = 0;
> +    dc->callback = stream_header_done;
> +
> +    ret = libxl__datacopier_start(dc);
> +    if (ret)
> +        goto err;
> +
> +    stream->running = true;
> +    assert(!ret);
> +    return;
> +
> + err:
> +    assert(ret);
> +    stream_failed(egc, stream, ret);
> +}
> +
> +void libxl__stream_read_continue(libxl__egc *egc,
> +                                 libxl__stream_read_state *stream)
> +{
> +    libxl__datacopier_state *dc = &stream->dc;
> +    int ret = 0;
> +
> +    assert(stream->running);
> +
> +    /* Read a record header. */
> +    dc->readwhat = "record header";
> +    dc->readbuf = &stream->rec_hdr;
> +    stream->expected_len = dc->bytes_to_read = sizeof(stream->rec_hdr);
> +    dc->used = 0;
> +    dc->callback = record_header_done;
> +
> +    ret = libxl__datacopier_start(dc);
> +    if (ret)
> +        goto err;
> +
> +    assert(!ret);
> +    return;
> +
> + err:
> +    assert(ret);
> +    stream_failed(egc, stream, ret);
> +}
> +
> +void libxl__stream_read_abort(libxl__egc *egc,
> +                              libxl__stream_read_state *stream, int rc)
> +{
> +    stream_failed(egc, stream, rc);
> +}
> +
> +static void stream_success(libxl__egc *egc, libxl__stream_read_state *stream)
> +{
> +    stream->rc = 0;
> +    stream->running = false;
> +
> +    stream_done(egc, stream);
> +}
> +
> +static void stream_failed(libxl__egc *egc,
> +                          libxl__stream_read_state *stream, int rc)
> +{
> +    assert(rc);
> +    stream->rc = rc;
> +
> +    if (stream->running) {
> +        stream->running = false;
> +        stream_done(egc, stream);
> +    }
> +}
> +
> +static void stream_done(libxl__egc *egc,
> +                        libxl__stream_read_state *stream)
> +{
> +    libxl__domain_create_state *dcs = CONTAINER_OF(stream, *dcs, srs);
> +
> +    assert(!stream->running);
> +
> +    stream->completion_callback(egc, dcs, stream->rc);
> +}
> +
> +static void stream_header_done(libxl__egc *egc,
> +                               libxl__datacopier_state *dc,
> +                               int onwrite, int errnoval)
> +{
> +    libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
> +    libxl_sr_hdr *hdr = &stream->hdr;
> +    STATE_AO_GC(dc->ao);
> +    int ret = 0;
> +
> +    if (onwrite || dc->used != stream->expected_len) {
> +        ret = ERROR_FAIL;
> +        LOG(ERROR, "write %d, err %d, expected %zu, got %zu",
> +            onwrite, errnoval, stream->expected_len, dc->used);
> +        goto err;
> +    }
> +
> +    hdr->ident   = be64toh(hdr->ident);
> +    hdr->version = be32toh(hdr->version);
> +    hdr->options = be32toh(hdr->options);
> +
> +    if (hdr->ident != RESTORE_STREAM_IDENT) {
> +        ret = ERROR_FAIL;
> +        LOG(ERROR,
> +            "Invalid ident: expected 0x%016"PRIx64", got 0x%016"PRIx64,
> +            RESTORE_STREAM_IDENT, hdr->ident);
> +        goto err;
> +    }
> +    if (hdr->version != RESTORE_STREAM_VERSION) {
> +        ret = ERROR_FAIL;
> +        LOG(ERROR, "Unexpected Version: expected %u, got %u",
> +            RESTORE_STREAM_VERSION, hdr->version);
> +        goto err;
> +    }
> +    if (hdr->options & RESTORE_OPT_BIG_ENDIAN) {
> +        ret = ERROR_FAIL;
> +        LOG(ERROR, "Unable to handle big endian streams");
> +        goto err;
> +    }
> +
> +    LOG(INFO, "Stream v%u%s", hdr->version,
> +        hdr->options & RESTORE_OPT_LEGACY ? " (from legacy)" : "");
> +
> +    libxl__stream_read_continue(egc, stream);
> +    return;
> +
> + err:
> +    assert(ret);
> +    stream_failed(egc, stream, ret);
> +}
> +
> +static void record_header_done(libxl__egc *egc,
> +                               libxl__datacopier_state *dc,
> +                               int onwrite, int errnoval)
> +{
> +    libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
> +    libxl_sr_rec_hdr *rec_hdr = &stream->rec_hdr;
> +    STATE_AO_GC(dc->ao);
> +    int ret = 0;
> +
> +    if (onwrite || dc->used != stream->expected_len) {
> +        ret = ERROR_FAIL;
> +        LOG(ERROR, "write %d, err %d, expected %zu, got %zu",
> +            onwrite, errnoval, stream->expected_len, dc->used);
> +        goto err;
> +    }
> +
> +    assert(stream->rec_body == NULL);
> +
> +    /* No body? Process straight away. */
> +    if (rec_hdr->length == 0) {
> +        process_record(egc, stream);
> +        return;
> +    }
> +
> +    /* Queue up reading the body. */
> +    size_t bytes_to_read;
> +
> +    switch (rec_hdr->type) {
> +        /*
> +         * Emulator records want to retain the blob in the pipe, for a 
> further
> +         * datacopier call to move elsewhere.  Just read the emulator header.
> +         */

In this case, we should not call ROUNDUP().

> +    case REC_TYPE_EMULATOR_CONTEXT:
> +        bytes_to_read = sizeof(struct libxl_sr_emulator_hdr);
> +        break;
> +
> +    default:
> +        bytes_to_read = rec_hdr->length;
> +        break;
> +    }
> +
> +    bytes_to_read = ROUNDUP(bytes_to_read, REC_ALIGN_ORDER);

So, I think it is better to move ROUNDUP to default case.

Thanks
Wen Congyang

> +
> +    dc->readwhat = "record body";
> +    stream->rec_body = dc->readbuf = libxl__malloc(NOGC, bytes_to_read);
> +    stream->expected_len = dc->bytes_to_read = bytes_to_read;
> +    dc->used = 0;
> +    dc->callback = record_body_done;
> +
> +    ret = libxl__datacopier_start(dc);
> +    if (ret)
> +        goto err;
> +    return;
> +
> + err:
> +    assert(ret);
> +    stream_failed(egc, stream, ret);
> +}
> +
> +static void record_body_done(libxl__egc *egc,
> +                             libxl__datacopier_state *dc,
> +                             int onwrite, int errnoval)
> +{
> +    libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
> +    STATE_AO_GC(dc->ao);
> +    int ret = 0;
> +
> +    if (onwrite || dc->used != stream->expected_len) {
> +        ret = ERROR_FAIL;
> +        LOG(ERROR, "write %d, err %d, expected %zu, got %zu",
> +            onwrite, errnoval, stream->expected_len, dc->used);
> +
> +        free(stream->rec_body);
> +        stream->rec_body = dc->readbuf = NULL;
> +
> +        goto err;
> +    }
> +
> +    process_record(egc, stream);
> +    return;
> +
> + err:
> +    assert(ret);
> +    stream_failed(egc, stream, ret);
> +}
> +
> +static void process_record(libxl__egc *egc,
> +                           libxl__stream_read_state *stream)
> +{
> +    libxl__domain_create_state *dcs = CONTAINER_OF(stream, *dcs, srs);
> +    libxl_sr_rec_hdr *rec_hdr = &stream->rec_hdr;
> +    STATE_AO_GC(stream->ao);
> +    int ret = 0;
> +
> +    LOG(DEBUG, "Record: 0x%08x, length %u", rec_hdr->type, rec_hdr->length);
> +
> +    switch (rec_hdr->type) {
> +
> +    case REC_TYPE_END:
> +        /* Handled later, after cleanup. */
> +        break;
> +
> +    case REC_TYPE_XENSTORE_DATA:
> +        ret = libxl__toolstack_restore(dcs->guest_domid, stream->rec_body,
> +                                       rec_hdr->length, &dcs->shs);
> +        if (ret)
> +            goto err;
> +
> +        /*
> +         * libxl__toolstack_restore() is a synchronous function.  Manually
> +         * start looking for the next record.
> +         */
> +        libxl__stream_read_continue(egc, &dcs->srs);
> +        break;
> +
> +    case REC_TYPE_EMULATOR_CONTEXT:
> +        read_emulator_body(egc, stream);
> +        break;
> +
> +    default:
> +        LOG(ERROR, "Unrecognised record 0x%08x", rec_hdr->type);
> +        ret = ERROR_FAIL;
> +        goto err;
> +    }
> +
> +    assert(!ret);
> +    if (rec_hdr->length) {
> +        free(stream->rec_body);
> +        stream->rec_body = NULL;
> +    }
> +
> +    if (rec_hdr->type == REC_TYPE_END)
> +        stream_success(egc, stream);
> +    return;
> +
> + err:
> +    assert(ret);
> +    if (rec_hdr->length) {
> +        free(stream->rec_body);
> +        stream->rec_body = NULL;
> +    }
> +    stream_failed(egc, stream, ret);
> +}
> +
> +static void read_emulator_body(libxl__egc *egc,
> +                               libxl__stream_read_state *stream)
> +{
> +    libxl__domain_create_state *dcs = CONTAINER_OF(stream, *dcs, srs);
> +    libxl__datacopier_state *dc = &stream->dc;
> +    libxl_sr_rec_hdr *rec_hdr = &stream->rec_hdr;
> +    libxl_sr_emulator_hdr *emu_hdr = stream->rec_body;
> +    STATE_AO_GC(stream->ao);
> +    char path[256];
> +    int ret = 0;
> +
> +    sprintf(path, XC_DEVICE_MODEL_RESTORE_FILE".%u", dcs->guest_domid);
> +
> +    dc->readwhat = "save/migration stream";
> +    dc->copywhat = "emulator context";
> +    dc->writewhat = "qemu save file";
> +    dc->readbuf = NULL;
> +    dc->writefd = open(path, O_WRONLY | O_CREAT | O_TRUNC, 0666);
> +    if (dc->writefd == -1) {
> +        ret = ERROR_FAIL;
> +        LOGE(ERROR, "Unable to open '%s'", path);
> +        goto err;
> +    }
> +    dc->maxsz = dc->bytes_to_read = rec_hdr->length - sizeof(*emu_hdr);
> +    stream->expected_len = dc->used = 0;
> +    dc->callback = emulator_body_done;
> +
> +    ret = libxl__datacopier_start(dc);
> +    if (ret)
> +        goto err;
> +    return;
> +
> + err:
> +    assert(ret);
> +    stream_failed(egc, stream, ret);
> +}
> +
> +static void emulator_body_done(libxl__egc *egc,
> +                               libxl__datacopier_state *dc,
> +                               int onwrite, int errnoval)
> +{
> +    /* Safe to be static, as it is a write-only discard buffer. */
> +    static char padding[1U << REC_ALIGN_ORDER];
> +
> +    libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
> +    libxl_sr_rec_hdr *rec_hdr = &stream->rec_hdr;
> +    STATE_AO_GC(dc->ao);
> +    unsigned int nr_padding_bytes = (1U << REC_ALIGN_ORDER);
> +    int ret = 0;
> +
> +    if (onwrite || dc->used != stream->expected_len) {
> +        ret = ERROR_FAIL;
> +        LOG(ERROR, "write %d, err %d, expected %zu, got %zu",
> +            onwrite, errnoval, stream->expected_len, dc->used);
> +        goto err;
> +    }
> +
> +    /* Undo modifications for splicing the emulator context. */
> +    memset(dc, 0, sizeof(*dc));
> +    dc->ao = stream->ao;
> +    dc->readfd = stream->fd;
> +    dc->writefd = -1;
> +
> +    /* Do we need to eat some padding out of the stream? */
> +    if (rec_hdr->length & (nr_padding_bytes - 1)) {
> +        unsigned int bytes_to_discard =
> +            nr_padding_bytes - (rec_hdr->length & (nr_padding_bytes - 1));
> +
> +        dc->readwhat = "padding bytes";
> +        dc->readbuf = padding;
> +        stream->expected_len = dc->bytes_to_read = bytes_to_discard;
> +        dc->used = 0;
> +        dc->callback = emulator_padding_done;
> +
> +        ret = libxl__datacopier_start(dc);
> +        if (ret)
> +            goto err;
> +    }
> +    else
> +    {
> +        stream->expected_len = dc->bytes_to_read = 0;
> +        dc->used = 0;
> +
> +        emulator_padding_done(egc, dc, 0, 0);
> +    }
> +    return;
> +
> + err:
> +    assert(ret);
> +    stream_failed(egc, stream, ret);
> +}
> +
> +static void emulator_padding_done(libxl__egc *egc,
> +                                  libxl__datacopier_state *dc,
> +                                  int onwrite, int errnoval)
> +{
> +    libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
> +    STATE_AO_GC(dc->ao);
> +    int ret = 0;
> +
> +    if (onwrite || dc->used != stream->expected_len) {
> +        ret = ERROR_FAIL;
> +        LOG(ERROR, "write %d, err %d, expected %zu, got %zu",
> +            onwrite, errnoval, stream->expected_len, dc->used);
> +        goto err;
> +    }
> +
> +    libxl__stream_read_continue(egc, stream);
> +    return;
> +
> + err:
> +    assert(ret);
> +    stream_failed(egc, stream, ret);
> +}
> +
> +/*
> + * Local variables:
> + * mode: C
> + * c-basic-offset: 4
> + * indent-tabs-mode: nil
> + * End:
> + */
> 


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.