[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [PATCH v4 27/29] tools/libxl: Handle checkpoint records in a libxl migration v2 stream
This is the final bit of untangling for Remus. When libxc issues a checkpoint callback, start reading and buffering all libxl records from the stream. Once a CHECKPOINT_END record is encountered, start processing all records. Signed-off-by: Andrew Cooper <andrew.cooper3@xxxxxxxxxx> Acked-by: Ian Campbell <Ian.Campbell@xxxxxxxxxx> CC: Ian Jackson <Ian.Jackson@xxxxxxxxxxxxx> CC: Wei Liu <wei.liu2@xxxxxxxxxx> --- v3: Simplify, use named constants for API --- tools/libxl/libxl_create.c | 25 ++++++++ tools/libxl/libxl_internal.h | 8 +++ tools/libxl/libxl_stream_read.c | 123 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 156 insertions(+) diff --git a/tools/libxl/libxl_create.c b/tools/libxl/libxl_create.c index c51d64f..5b4d333 100644 --- a/tools/libxl/libxl_create.c +++ b/tools/libxl/libxl_create.c @@ -672,6 +672,27 @@ static int store_libxl_entry(libxl__gc *gc, uint32_t domid, libxl_device_model_version_to_string(b_info->device_model_version)); } +/*----- remus asynchronous checkpoint callback -----*/ + +static void remus_checkpoint_stream_done( + libxl__egc *egc, libxl__stream_read_state *srs, int rc); + +static void libxl__remus_domain_checkpoint_callback(void *data) +{ + libxl__save_helper_state *shs = data; + libxl__domain_create_state *dcs = shs->caller_state; + libxl__egc *egc = shs->egc; + STATE_AO_GC(dcs->ao); + + libxl__stream_read_start_checkpoint(egc, &dcs->srs); +} + +static void remus_checkpoint_stream_done( + libxl__egc *egc, libxl__stream_read_state *stream, int rc) +{ + libxl__xc_domain_saverestore_async_callback_done(egc, &stream->shs, rc); +} + /*----- main domain creation -----*/ /* We have a linear control flow; only one event callback is @@ -939,6 +960,8 @@ static void domcreate_bootloader_done(libxl__egc *egc, libxl_domain_config *const d_config = dcs->guest_config; const int restore_fd = dcs->restore_fd; libxl__domain_build_state *const state = &dcs->build_state; + libxl__srm_restore_autogen_callbacks *const callbacks = + &dcs->srs.shs.callbacks.restore.a; if (rc) { domcreate_rebuild_done(egc, dcs, rc); @@ -966,6 +989,7 @@ static void domcreate_bootloader_done(libxl__egc *egc, } /* Restore */ + callbacks->checkpoint = libxl__remus_domain_checkpoint_callback; rc = libxl__build_pre(gc, domid, d_config, state); if (rc) @@ -976,6 +1000,7 @@ static void domcreate_bootloader_done(libxl__egc *egc, dcs->srs.fd = restore_fd; dcs->srs.legacy = (dcs->restore_params.stream_version == 1); dcs->srs.completion_callback = domcreate_stream_done; + dcs->srs.checkpoint_callback = remus_checkpoint_stream_done; libxl__stream_read_start(egc, &dcs->srs); return; diff --git a/tools/libxl/libxl_internal.h b/tools/libxl/libxl_internal.h index 9961a53..7aa4f16 100644 --- a/tools/libxl/libxl_internal.h +++ b/tools/libxl/libxl_internal.h @@ -3309,9 +3309,13 @@ struct libxl__stream_read_state { void (*completion_callback)(libxl__egc *egc, libxl__stream_read_state *srs, int rc); + void (*checkpoint_callback)(libxl__egc *egc, + libxl__stream_read_state *srs, + int rc); /* Private */ int rc; bool running; + bool in_checkpoint; libxl__save_helper_state shs; libxl__conversion_helper_state chs; @@ -3321,6 +3325,8 @@ struct libxl__stream_read_state { LIBXL_STAILQ_HEAD(, libxl__sr_record_buf) record_queue; /* NOGC */ enum { SRS_PHASE_NORMAL, + SRS_PHASE_BUFFERING, + SRS_PHASE_UNBUFFERING, } phase; bool recursion_guard; @@ -3335,6 +3341,8 @@ struct libxl__stream_read_state { _hidden void libxl__stream_read_init(libxl__stream_read_state *stream); _hidden void libxl__stream_read_start(libxl__egc *egc, libxl__stream_read_state *stream); +_hidden void libxl__stream_read_start_checkpoint(libxl__egc *egc, + libxl__stream_read_state *stream); _hidden void libxl__stream_read_abort(libxl__egc *egc, libxl__stream_read_state *stream, int rc); static inline bool diff --git a/tools/libxl/libxl_stream_read.c b/tools/libxl/libxl_stream_read.c index 3051a2b..2d17403 100644 --- a/tools/libxl/libxl_stream_read.c +++ b/tools/libxl/libxl_stream_read.c @@ -35,6 +35,8 @@ * Undefined undef undef undef undef undef * Idle false undef false 0 0 * Active true NORMAL false 0/1 0/partial + * Active true BUFFERING true any 0/partial + * Active true UNBUFFERING true any 0 * * While reading data from the stream, 'dc' is active and a callback * is expected. Most actions in process_record() start a callback of @@ -46,6 +48,15 @@ * Records are read one at time and immediately processed. (The * record queue will not contain more than a single record.) * + * PHASE_BUFFERING: + * This phase is used in checkpointed streams, when libxc signals + * the presence of a checkpoint in the stream. Records are read and + * buffered until a CHECKPOINT_END record has been read. + * + * PHASE_UNBUFFERING: + * Once a CHECKPOINT_END record has been read, all buffered records + * are processed. + * * Note: * Record buffers are not allocated from a GC; they are allocated * and tracked manually. This is to avoid OOM with Remus where the @@ -57,6 +68,9 @@ * - Initialises state. Must be called once before _start() * - libxl__stream_read_start() * - Starts reading records from the stream, and acting on them. + * - libxl__stream_read_start_checkpoint() + * - Starts buffering records at a checkpoint. Must be called on + * a running stream. * * There are several chains of event: * @@ -95,6 +109,8 @@ /* Success/error/cleanup handling. */ static void stream_complete(libxl__egc *egc, libxl__stream_read_state *stream, int rc); +static void checkpoint_done(libxl__egc *egc, + libxl__stream_read_state *stream, int rc); static void stream_done(libxl__egc *egc, libxl__stream_read_state *stream); static void conversion_done(libxl__egc *egc, @@ -159,6 +175,7 @@ void libxl__stream_read_init(libxl__stream_read_state *stream) { stream->rc = 0; stream->running = false; + stream->in_checkpoint = false; libxl__save_helper_init(&stream->shs); libxl__conversion_helper_init(&stream->chs); FILLZERO(stream->dc); @@ -225,6 +242,22 @@ void libxl__stream_read_start(libxl__egc *egc, stream_complete(egc, stream, rc); } +void libxl__stream_read_start_checkpoint(libxl__egc *egc, + libxl__stream_read_state *stream) +{ + assert(stream->running); + assert(!stream->in_checkpoint); + + stream->in_checkpoint = true; + stream->phase = SRS_PHASE_BUFFERING; + + /* + * Libxc has handed control of the fd to us. Start reading some + * libxl records out of it. + */ + stream_continue(egc, stream); +} + void libxl__stream_read_abort(libxl__egc *egc, libxl__stream_read_state *stream, int rc) { @@ -332,6 +365,54 @@ static void stream_continue(libxl__egc *egc, } break; + case SRS_PHASE_BUFFERING: { + /* + * Buffering phase (checkpointed streams only): + * + * logically: + * do { read_record(); } while ( not CHECKPOINT_END ); + * + * Read and buffer all records from the stream until a + * CHECKPOINT_END record is encountered. We need to peek at + * the tail to spot the CHECKPOINT_END record, and switch to + * the unbuffering phase. + */ + libxl__sr_record_buf *rec = LIBXL_STAILQ_LAST( + &stream->record_queue, libxl__sr_record_buf, entry); + + assert(stream->in_checkpoint); + + if (!rec || (rec->hdr.type != REC_TYPE_CHECKPOINT_END)) { + setup_read_record(egc, stream); + break; + } + + /* + * There are now some number of buffered records, with a + * CHECKPOINT_END at the end. Start processing them all. + */ + stream->phase = SRS_PHASE_UNBUFFERING; + } + /* FALLTHROUGH */ + case SRS_PHASE_UNBUFFERING: + /* + * Unbuffering phase (checkpointed streams only): + * + * logically: + * do { process_record(); } while ( not CHECKPOINT_END ); + * + * Process all records collected during the buffering phase. + */ + assert(stream->in_checkpoint); + + while (process_record(egc, stream)) + ; /* + * Nothing! process_record() helpfully tells us if no specific + * futher actions have been set up, in which case we want to go + * ahead and process the next record. + */ + break; + default: abort(); } @@ -467,6 +548,15 @@ static bool process_record(libxl__egc *egc, write_emulator_blob(egc, stream, rec); break; + case REC_TYPE_CHECKPOINT_END: + if (!stream->in_checkpoint) { + LOG(ERROR, "Unexpected CHECKPOINT_END record in stream"); + rc = ERROR_FAIL; + goto err; + } + checkpoint_done(egc, stream, 0); + break; + default: LOG(ERROR, "Unrecognised record 0x%08x", rec->hdr.type); rc = ERROR_FAIL; @@ -564,17 +654,50 @@ static void stream_complete(libxl__egc *egc, { assert(stream->running); + if (stream->in_checkpoint) { + assert(rc); + + /* + * If an error is encountered while in a checkpoint, pass it + * back to libxc. The failure will come back around to us via + * libxl__xc_domain_restore_done() + */ + checkpoint_done(egc, stream, rc); + return; + } + if (!stream->rc) stream->rc = rc; stream_done(egc, stream); } +static void checkpoint_done(libxl__egc *egc, + libxl__stream_read_state *stream, int rc) +{ + int ret; + + assert(stream->in_checkpoint); + + if (rc == 0) + ret = XGR_CHECKPOINT_SUCCESS; + else if (stream->phase == SRS_PHASE_BUFFERING) + ret = XGR_CHECKPOINT_FAILOVER; + else + ret = XGR_CHECKPOINT_ERROR; + + stream->checkpoint_callback(egc, stream, ret); + + stream->in_checkpoint = false; + stream->phase = SRS_PHASE_NORMAL; +} + static void stream_done(libxl__egc *egc, libxl__stream_read_state *stream) { libxl__sr_record_buf *rec, *trec; assert(stream->running); + assert(!stream->in_checkpoint); stream->running = false; if (stream->incoming_record) -- 1.7.10.4 _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxx http://lists.xen.org/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |