[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [PATCH v3 17/28] tools/libxl: Infrastructure for reading a libxl migration v2 stream
From: Ross Lagerwall <ross.lagerwall@xxxxxxxxxx> This contains the event machinery and state machines to read an act on a non-checkpointed migration v2 stream (with the exception of the xc_domain_restore() handling which is spliced later in a bisectable way). It also contains some boilerplate to help support checkpointed streams, which shall be introduced in a later patch. Signed-off-by: Ross Lagerwall <ross.lagerwall@xxxxxxxxxx> Signed-off-by: Andrew Cooper <andrew.cooper3@xxxxxxxxxx> CC: Ian Campbell <Ian.Campbell@xxxxxxxxxx> CC: Ian Jackson <Ian.Jackson@xxxxxxxxxxxxx> CC: Wei Liu <wei.liu2@xxxxxxxxxx> --- v3: More descriptions of the internal state, shuffle function order, add an _init() call, condense error handling --- tools/libxl/Makefile | 1 + tools/libxl/libxl_internal.h | 53 ++++ tools/libxl/libxl_stream_read.c | 541 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 595 insertions(+) create mode 100644 tools/libxl/libxl_stream_read.c diff --git a/tools/libxl/Makefile b/tools/libxl/Makefile index cc9c152..c71c5fe 100644 --- a/tools/libxl/Makefile +++ b/tools/libxl/Makefile @@ -94,6 +94,7 @@ LIBXL_OBJS = flexarray.o libxl.o libxl_create.o libxl_dm.o libxl_pci.o \ libxl_dom.o libxl_exec.o libxl_xshelp.o libxl_device.o \ libxl_internal.o libxl_utils.o libxl_uuid.o \ libxl_json.o libxl_aoutils.o libxl_numa.o libxl_vnuma.o \ + libxl_stream_read.o \ libxl_save_callout.o _libxl_save_msgs_callout.o \ libxl_qmp.o libxl_event.o libxl_fork.o $(LIBXL_OBJS-y) LIBXL_OBJS += libxl_genid.o diff --git a/tools/libxl/libxl_internal.h b/tools/libxl/libxl_internal.h index 3f1fed8..c0e7720 100644 --- a/tools/libxl/libxl_internal.h +++ b/tools/libxl/libxl_internal.h @@ -19,6 +19,8 @@ #include "libxl_osdeps.h" /* must come before any other headers */ +#include "libxl_sr_stream_format.h" + #include <assert.h> #include <dirent.h> #include <errno.h> @@ -3211,6 +3213,57 @@ typedef void libxl__domain_create_cb(libxl__egc *egc, libxl__domain_create_state*, int rc, uint32_t domid); +/* State for manipulating a libxl migration v2 stream */ +typedef struct libxl__stream_read_state libxl__stream_read_state; + +typedef struct libxl__sr_record_buf { + /* private to stream read helper */ + LIBXL_STAILQ_ENTRY(struct libxl__sr_record_buf) entry; + libxl__sr_rec_hdr hdr; + void *body; /* iff hdr.length != 0 */ +} libxl__sr_record_buf; + +struct libxl__stream_read_state { + /* filled by the user */ + libxl__ao *ao; + libxl__domain_create_state *dcs; + int fd; + void (*completion_callback)(libxl__egc *egc, + libxl__stream_read_state *srs, + int rc); + /* Private */ + int rc; + bool running; + + /* Main stream-reading data. */ + libxl__datacopier_state dc; + libxl__sr_hdr hdr; + LIBXL_STAILQ_HEAD(, libxl__sr_record_buf) record_queue; + enum { + SRS_PHASE_NORMAL, + } phase; + bool recursion_guard; + + /* Only used while actively reading a record from the stream. */ + libxl__sr_record_buf *incoming_record; + + /* Only used when processing an EMULATOR record. */ + libxl__datacopier_state emu_dc; + libxl__carefd *emu_carefd; +}; + +_hidden void libxl__stream_read_init(libxl__stream_read_state *stream); +_hidden void libxl__stream_read_start(libxl__egc *egc, + libxl__stream_read_state *stream); +_hidden void libxl__stream_read_abort(libxl__egc *egc, + libxl__stream_read_state *stream, int rc); +static inline bool +libxl__stream_read_inuse(const libxl__stream_read_state *stream) +{ + return stream->running; +} + + struct libxl__domain_create_state { /* filled in by user */ libxl__ao *ao; diff --git a/tools/libxl/libxl_stream_read.c b/tools/libxl/libxl_stream_read.c new file mode 100644 index 0000000..e70c05a --- /dev/null +++ b/tools/libxl/libxl_stream_read.c @@ -0,0 +1,541 @@ +/* + * Copyright (C) 2015 Citrix Ltd. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + */ + +#include "libxl_osdeps.h" /* must come before any other headers */ + +#include "libxl_internal.h" + +/* + * Infrastructure for reading and acting on the contents of a libxl + * migration stream. There are a lot of moving parts here. + * + * The logic revolves around two actions; reading another record from + * the stream, and processing the records. The stream_continue() + * function is responsible for choosing the next action to perform. + * + * The exact order of reading and processing is controlled by 'phase'. + * All complete records are held in the record_queue before being + * processed, and all records will be processed in queue order. + * + * PHASE_NORMAL: + * This phase is used for regular migration or resume from file. + * Records are read one at time and immediately processed. (The + * record queue will not contain more than a single record.) + * + * Note: + * Record buffers are not allocated from a GC; they are allocated + * and tracked manually. This is to avoid OOM with Remus where the + * AO lives for the lifetime of the process. Per-checkpoint AO's + * might be an avenue to explore. + * + * Entry points from outside: + * - libxl__stream_read_init() + * - Initialises state. Must be called once before _start() + * - libxl__stream_read_start() + * - Starts reading records from the stream, and acting on them. + * + * There are several chains of event: + * + * 1) Starting a stream follows: + * - libxl__stream_read_start() + * - stream_header_done() + * - stream_continue() + * + * 2) Reading a record follows: + * - stream_continue() + * - record_header_done() + * - record_body_done() + * - stream_continue() + * + * 3) Processing a record had several chains to follow, depending on + * the record in question. + * 3a) "Simple" record: + * - process_record() + * - stream_continue() + * 3b) LIBXC record: + * - process_record() + * - libxl__xc_domain_restore() + * - libxl__xc_domain_restore_done() + * - stream_continue() + * 3c) EMULATOR record: + * - process_record() + * - stream_write_emulator() + * - stream_write_emulator_done() + * - stream_continue() + */ + +/* Success/error/cleanup handling. */ +static void stream_complete(libxl__egc *egc, + libxl__stream_read_state *stream, int rc); +static void stream_done(libxl__egc *egc, + libxl__stream_read_state *stream); + +/* Event chain for first iteration, from _start(). */ +static void stream_header_done(libxl__egc *egc, + libxl__datacopier_state *dc, + int rc, int onwrite, int errnoval); +static void stream_continue(libxl__egc *egc, + libxl__stream_read_state *stream); +static void setup_read_record(libxl__egc *egc, + libxl__stream_read_state *stream); +static void record_header_done(libxl__egc *egc, + libxl__datacopier_state *dc, + int rc, int onwrite, int errnoval); +static void record_body_done(libxl__egc *egc, + libxl__datacopier_state *dc, + int rc, int onwrite, int errnoval); +static bool process_record(libxl__egc *egc, + libxl__stream_read_state *stream); + +/* Event chain for processing an emulator blob. */ +static void write_emulator_blob(libxl__egc *egc, + libxl__stream_read_state *stream, + libxl__sr_record_buf *rec); +static void write_emulator_done(libxl__egc *egc, + libxl__datacopier_state *dc, + int rc, int onwrite, int errnoval); + +/*----- Helpers -----*/ + +/* Helper to set up reading some data from the stream. */ +static int setup_read(libxl__stream_read_state *stream, + const char *what, void *ptr, size_t nr_bytes, + libxl__datacopier_callback cb) +{ + libxl__datacopier_state *dc = &stream->dc; + + dc->readwhat = what; + dc->readbuf = ptr; + dc->bytes_to_read = nr_bytes; + dc->used = 0; + dc->callback = cb; + + return libxl__datacopier_start(dc); +} + +static void free_record(libxl__sr_record_buf *rec) +{ + if (rec) { + free(rec->body); + free(rec); + } +} + +/*----- Entrypoints -----*/ + +void libxl__stream_read_init(libxl__stream_read_state *stream) +{ + FILLZERO(*stream); + LIBXL_STAILQ_INIT(&stream->record_queue); +} + +void libxl__stream_read_start(libxl__egc *egc, + libxl__stream_read_state *stream) +{ + libxl__datacopier_state *dc = &stream->dc; + int rc = 0; + + stream->running = true; + stream->phase = SRS_PHASE_NORMAL; + + dc->ao = stream->ao; + dc->readfd = stream->fd; + dc->writefd = -1; + + /* Start reading the stream header. */ + rc = setup_read(stream, "stream header", + &stream->hdr, sizeof(stream->hdr), + stream_header_done); + if (rc) + goto err; + + assert(!rc); + return; + + err: + assert(rc); + stream_complete(egc, stream, rc); +} + +void libxl__stream_read_abort(libxl__egc *egc, + libxl__stream_read_state *stream, int rc) +{ + assert(rc); + + if (stream->running) + stream_complete(egc, stream, rc); +} + +/*----- Event logic -----*/ + +static void stream_header_done(libxl__egc *egc, + libxl__datacopier_state *dc, + int rc, int onwrite, int errnoval) +{ + libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc); + libxl__sr_hdr *hdr = &stream->hdr; + STATE_AO_GC(dc->ao); + + if (rc || onwrite || errnoval) { + rc = ERROR_FAIL; + goto err; + } + + hdr->ident = be64toh(hdr->ident); + hdr->version = be32toh(hdr->version); + hdr->options = be32toh(hdr->options); + + if (hdr->ident != RESTORE_STREAM_IDENT) { + rc = ERROR_FAIL; + LOG(ERROR, + "Invalid ident: expected 0x%016"PRIx64", got 0x%016"PRIx64, + RESTORE_STREAM_IDENT, hdr->ident); + goto err; + } + if (hdr->version != RESTORE_STREAM_VERSION) { + rc = ERROR_FAIL; + LOG(ERROR, "Unexpected Version: expected %"PRIu32", got %"PRIu32, + RESTORE_STREAM_VERSION, hdr->version); + goto err; + } + if (hdr->options & RESTORE_OPT_BIG_ENDIAN) { + rc = ERROR_FAIL; + LOG(ERROR, "Unable to handle big endian streams"); + goto err; + } + + LOG(DEBUG, "Stream v%"PRIu32"%s", hdr->version, + hdr->options & RESTORE_OPT_LEGACY ? " (from legacy)" : ""); + + stream_continue(egc, stream); + return; + + err: + assert(rc); + stream_complete(egc, stream, rc); +} + +static void stream_continue(libxl__egc *egc, + libxl__stream_read_state *stream) +{ + STATE_AO_GC(stream->ao); + + /* + * Must not mutually recurse with process_record(). + * + * For records whose processing function is synchronous + * (e.g. TOOLSTACK), process_record() does not start another async + * operation, and a further operation should be started. + * + * A naive solution, which would function in general, would be for + * process_record() to call stream_continue(). However, this + * would allow the content of the stream to cause mutual + * recursion, and possibly for us to fall off our stack. + * + * Instead, process_record() indicates with its return value + * whether it a further operation needs to start, and the + * recursion_guard is in place to catch any code paths which get + * this wrong. + */ + assert(stream->recursion_guard == false); + stream->recursion_guard = true; + + switch (stream->phase) { + case SRS_PHASE_NORMAL: + /* + * Normal phase (regular migration or restore from file): + * + * logically: + * do { read_record(); process_record(); } while ( not END ); + * + * Alternate between reading a record from the stream, and + * processing the record. There should never be two records + * in the queue. + */ + if (LIBXL_STAILQ_EMPTY(&stream->record_queue)) + setup_read_record(egc, stream); + else { + if (process_record(egc, stream)) + setup_read_record(egc, stream); + } + break; + + default: + abort(); + } + + assert(stream->recursion_guard == true); + stream->recursion_guard = false; +} + +static void setup_read_record(libxl__egc *egc, + libxl__stream_read_state *stream) +{ + libxl__sr_record_buf *rec = NULL; + STATE_AO_GC(stream->ao); + int rc; + + assert(stream->incoming_record == NULL); + stream->incoming_record = rec = libxl__zalloc(NOGC, sizeof(*rec)); + + rc = setup_read(stream, "record header", + &rec->hdr, sizeof(rec->hdr), + record_header_done); + if (rc) + goto err; + return; + + err: + assert(rc); + stream_complete(egc, stream, rc); +} + +static void record_header_done(libxl__egc *egc, + libxl__datacopier_state *dc, + int rc, int onwrite, int errnoval) +{ + libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc); + libxl__sr_record_buf *rec = stream->incoming_record; + STATE_AO_GC(dc->ao); + + if (rc || onwrite || errnoval) { + rc = ERROR_FAIL; + goto err; + } + + /* No body? All done. */ + if (rec->hdr.length == 0) { + record_body_done(egc, dc, 0, 0, 0); + return; + } + + size_t bytes_to_read = ROUNDUP(rec->hdr.length, REC_ALIGN_ORDER); + rec->body = libxl__malloc(NOGC, bytes_to_read); + + rc = setup_read(stream, "record body", + rec->body, bytes_to_read, + record_body_done); + if (rc) + goto err; + return; + + err: + assert(rc); + stream_complete(egc, stream, rc); +} + +static void record_body_done(libxl__egc *egc, + libxl__datacopier_state *dc, + int rc, int onwrite, int errnoval) +{ + libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc); + libxl__sr_record_buf *rec = stream->incoming_record; + STATE_AO_GC(dc->ao); + + if (rc || onwrite || errnoval) { + rc = ERROR_FAIL; + goto err; + } + + LIBXL_STAILQ_INSERT_TAIL(&stream->record_queue, rec, entry); + stream->incoming_record = NULL; + + stream_continue(egc, stream); + return; + + err: + assert(rc); + stream_complete(egc, stream, rc); +} + +/* + * Returns a boolean indicating whether a further action should be set + * up by the caller. This is needed to prevent mutual recursion with + * stream_continue(). + */ +static bool process_record(libxl__egc *egc, + libxl__stream_read_state *stream) +{ + STATE_AO_GC(stream->ao); + libxl__domain_create_state *dcs = stream->dcs; + libxl__sr_record_buf *rec; + bool further_action_needed = false; + int rc = 0; + + /* Pop a record from the head of the queue. */ + assert(!LIBXL_STAILQ_EMPTY(&stream->record_queue)); + rec = LIBXL_STAILQ_FIRST(&stream->record_queue); + LIBXL_STAILQ_REMOVE_HEAD(&stream->record_queue, entry); + + LOG(DEBUG, "Record: %u, length %u", rec->hdr.type, rec->hdr.length); + + switch (rec->hdr.type) { + + case REC_TYPE_END: + stream_complete(egc, stream, 0); + break; + + case REC_TYPE_XENSTORE_DATA: + rc = libxl__toolstack_restore(dcs->guest_domid, rec->body, + rec->hdr.length, &dcs->shs); + if (rc) + goto err; + + /* + * libxl__toolstack_restore() is a synchronous function. + * Request that our caller queues another action for us. + */ + further_action_needed = true; + break; + + case REC_TYPE_EMULATOR_CONTEXT: + write_emulator_blob(egc, stream, rec); + break; + + default: + LOG(ERROR, "Unrecognised record 0x%08x", rec->hdr.type); + rc = ERROR_FAIL; + goto err; + } + + assert(!rc); + free_record(rec); + return further_action_needed; + + err: + assert(rc); + free_record(rec); + stream_complete(egc, stream, rc); + return false; +} + +static void write_emulator_blob(libxl__egc *egc, + libxl__stream_read_state *stream, + libxl__sr_record_buf *rec) +{ + libxl__domain_create_state *dcs = stream->dcs; + libxl__datacopier_state *dc = &stream->emu_dc; + libxl__sr_emulator_hdr *emu_hdr; + STATE_AO_GC(stream->ao); + char path[256]; + int rc = 0, writefd = -1; + + if (rec->hdr.length < sizeof(*emu_hdr)) { + rc = ERROR_FAIL; + LOG(ERROR, "Emulator record too short to contain header"); + goto err; + } + emu_hdr = rec->body; + + sprintf(path, XC_DEVICE_MODEL_RESTORE_FILE".%u", dcs->guest_domid); + + libxl__carefd_begin(); + writefd = open(path, O_WRONLY | O_CREAT | O_TRUNC, 0600); + if (writefd == -1) { + rc = ERROR_FAIL; + LOGE(ERROR, "unable to open %s", path); + libxl__carefd_unlock(); + goto err; + } + + assert(stream->emu_carefd == NULL); + stream->emu_carefd = libxl__carefd_opened(CTX, writefd); + + FILLZERO(*dc); + dc->ao = stream->ao; + dc->writewhat = "qemu save file"; + dc->writefd = writefd; + dc->maxsz = -1; + dc->callback = write_emulator_done; + + rc = libxl__datacopier_start(dc); + if (rc) + goto err; + + libxl__datacopier_prefixdata(egc, dc, + rec->body + sizeof(*emu_hdr), + rec->hdr.length - sizeof(*emu_hdr)); + return; + + err: + assert(rc); + stream_complete(egc, stream, rc); +} + +static void write_emulator_done(libxl__egc *egc, + libxl__datacopier_state *dc, + int rc, int onwrite, int errnoval) +{ + libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, emu_dc); + STATE_AO_GC(dc->ao); + + libxl__carefd_close(stream->emu_carefd); + stream->emu_carefd = NULL; + + if (rc || errnoval) { + rc = ERROR_FAIL; + goto err; + } + + stream_continue(egc, stream); + return; + + err: + assert(rc); + stream_complete(egc, stream, rc); +} + +/*----- Success/error/cleanup handling. -----*/ + +static void stream_complete(libxl__egc *egc, + libxl__stream_read_state *stream, int rc) +{ + assert(stream->running); + + if (!stream->rc) + stream->rc = rc; + stream_done(egc, stream); +} + +static void stream_done(libxl__egc *egc, + libxl__stream_read_state *stream) +{ + libxl__sr_record_buf *rec, *trec; + + assert(stream->running); + stream->running = false; + + if (stream->incoming_record) + free_record(stream->incoming_record); + + if (stream->emu_carefd) + libxl__carefd_close(stream->emu_carefd); + + /* The record queue had better be empty if the stream believes + * itself to have been successful. */ + assert(LIBXL_STAILQ_EMPTY(&stream->record_queue) || stream->rc); + + LIBXL_STAILQ_FOREACH_SAFE(rec, &stream->record_queue, entry, trec) + free_record(rec); + + stream->completion_callback(egc, stream, stream->rc); +} + +/* + * Local variables: + * mode: C + * c-basic-offset: 4 + * indent-tabs-mode: nil + * End: + */ -- 1.7.10.4 _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxx http://lists.xen.org/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |