[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [PATCH v3 26/28] tools/libxl: Handle checkpoint records in a libxl migration v2 stream
This is the final bit of untangling for Remus. When libxc issues a checkpoint callback, start reading and buffering all libxl records from the stream. Once a CHECKPOINT_END record is encountered, start processing all records. Signed-off-by: Andrew Cooper <andrew.cooper3@xxxxxxxxxx> CC: Ian Campbell <Ian.Campbell@xxxxxxxxxx> CC: Ian Jackson <Ian.Jackson@xxxxxxxxxxxxx> CC: Wei Liu <wei.liu2@xxxxxxxxxx> --- v3: Simplify, use named constants for API --- tools/libxl/libxl_create.c | 25 ++++++++ tools/libxl/libxl_internal.h | 8 +++ tools/libxl/libxl_stream_read.c | 130 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 163 insertions(+) diff --git a/tools/libxl/libxl_create.c b/tools/libxl/libxl_create.c index f2b5ffb..e42de47 100644 --- a/tools/libxl/libxl_create.c +++ b/tools/libxl/libxl_create.c @@ -672,6 +672,27 @@ static int store_libxl_entry(libxl__gc *gc, uint32_t domid, libxl_device_model_version_to_string(b_info->device_model_version)); } +/*----- remus asynchronous checkpoint callback -----*/ + +static void remus_checkpoint_stream_done( + libxl__egc *egc, libxl__stream_read_state *srs, int rc); + +static void libxl__remus_domain_checkpoint_callback(void *data) +{ + libxl__save_helper_state *shs = data; + libxl__domain_create_state *dcs = shs->caller_state; + libxl__egc *egc = shs->egc; + STATE_AO_GC(dcs->ao); + + libxl__stream_read_start_checkpoint(egc, &dcs->srs); +} + +static void remus_checkpoint_stream_done( + libxl__egc *egc, libxl__stream_read_state *stream, int rc) +{ + libxl__xc_domain_saverestore_async_callback_done(egc, &stream->shs, rc); +} + /*----- main domain creation -----*/ /* We have a linear control flow; only one event callback is @@ -939,6 +960,8 @@ static void domcreate_bootloader_done(libxl__egc *egc, libxl_domain_config *const d_config = dcs->guest_config; const int restore_fd = dcs->restore_fd; libxl__domain_build_state *const state = &dcs->build_state; + libxl__srm_restore_autogen_callbacks *const callbacks = + &dcs->srs.shs.callbacks.restore.a; if (rc) { domcreate_rebuild_done(egc, dcs, rc); @@ -966,6 +989,7 @@ static void domcreate_bootloader_done(libxl__egc *egc, } /* Restore */ + callbacks->checkpoint = libxl__remus_domain_checkpoint_callback; rc = libxl__build_pre(gc, domid, d_config, state); if (rc) @@ -978,6 +1002,7 @@ static void domcreate_bootloader_done(libxl__egc *egc, dcs->srs.fd = restore_fd; dcs->srs.legacy = (dcs->restore_params.stream_version == 1); dcs->srs.completion_callback = domcreate_stream_done; + dcs->srs.checkpoint_callback = remus_checkpoint_stream_done; libxl__stream_read_start(egc, &dcs->srs); return; diff --git a/tools/libxl/libxl_internal.h b/tools/libxl/libxl_internal.h index 7540ab3..176c360 100644 --- a/tools/libxl/libxl_internal.h +++ b/tools/libxl/libxl_internal.h @@ -3311,9 +3311,13 @@ struct libxl__stream_read_state { void (*completion_callback)(libxl__egc *egc, libxl__stream_read_state *srs, int rc); + void (*checkpoint_callback)(libxl__egc *egc, + libxl__stream_read_state *srs, + int rc); /* Private */ int rc; bool running; + bool in_checkpoint; libxl__save_helper_state shs; libxl__conversion_helper_state chs; @@ -3323,6 +3327,8 @@ struct libxl__stream_read_state { LIBXL_STAILQ_HEAD(, libxl__sr_record_buf) record_queue; enum { SRS_PHASE_NORMAL, + SRS_PHASE_BUFFERING, + SRS_PHASE_UNBUFFERING, } phase; bool recursion_guard; @@ -3337,6 +3343,8 @@ struct libxl__stream_read_state { _hidden void libxl__stream_read_init(libxl__stream_read_state *stream); _hidden void libxl__stream_read_start(libxl__egc *egc, libxl__stream_read_state *stream); +_hidden void libxl__stream_read_start_checkpoint(libxl__egc *egc, + libxl__stream_read_state *stream); _hidden void libxl__stream_read_abort(libxl__egc *egc, libxl__stream_read_state *stream, int rc); static inline bool diff --git a/tools/libxl/libxl_stream_read.c b/tools/libxl/libxl_stream_read.c index 229108b..803ac8a 100644 --- a/tools/libxl/libxl_stream_read.c +++ b/tools/libxl/libxl_stream_read.c @@ -28,11 +28,30 @@ * All complete records are held in the record_queue before being * processed, and all records will be processed in queue order. * + * Internal states: + * running in_checkpoint phase + * Undefined - - - + * Idle false false - + * Active true false normal + * Checkpoint(buf) true true buffering + * Checkpoint(unbuf) true true unbuffering + * Active true false normal + * Finished false false - + * * PHASE_NORMAL: * This phase is used for regular migration or resume from file. * Records are read one at time and immediately processed. (The * record queue will not contain more than a single record.) * + * PHASE_BUFFERING: + * This phase is used in checkpointed streams, when libxc signals + * the presence of a checkpoint in the stream. Records are read and + * buffered until a CHECKPOINT_END record has been read. + * + * PHASE_UNBUFFERING: + * Once a CHECKPOINT_END record has been read, all buffered records + * are processed. + * * Note: * Record buffers are not allocated from a GC; they are allocated * and tracked manually. This is to avoid OOM with Remus where the @@ -44,6 +63,9 @@ * - Initialises state. Must be called once before _start() * - libxl__stream_read_start() * - Starts reading records from the stream, and acting on them. + * - libxl__stream_read_start_checkpoint() + * - Starts buffering records at a checkpoint. Must be called on + * a running stream. * * There are several chains of event: * @@ -82,6 +104,8 @@ /* Success/error/cleanup handling. */ static void stream_complete(libxl__egc *egc, libxl__stream_read_state *stream, int rc); +static void checkpoint_done(libxl__egc *egc, + libxl__stream_read_state *stream, int rc); static void stream_done(libxl__egc *egc, libxl__stream_read_state *stream); static void conversion_done(libxl__egc *egc, @@ -202,6 +226,22 @@ void libxl__stream_read_start(libxl__egc *egc, stream_complete(egc, stream, rc); } +void libxl__stream_read_start_checkpoint(libxl__egc *egc, + libxl__stream_read_state *stream) +{ + assert(stream->running); + assert(!stream->in_checkpoint); + + stream->in_checkpoint = true; + stream->phase = SRS_PHASE_BUFFERING; + + /* + * Libxc has handed control of the fd to us. Start reading some + * libxl records out of it. + */ + stream_continue(egc, stream); +} + void libxl__stream_read_abort(libxl__egc *egc, libxl__stream_read_state *stream, int rc) { @@ -305,6 +345,54 @@ static void stream_continue(libxl__egc *egc, } break; + case SRS_PHASE_BUFFERING: { + /* + * Buffering phase (checkpointed streams only): + * + * logically: + * do { read_record(); } while ( not CHECKPOINT_END ); + * + * Read and buffer all records from the stream until a + * CHECKPOINT_END record is encountered. We need to peek at + * the tail to spot the CHECKPOINT_END record, and switch to + * the unbuffering phase. + */ + libxl__sr_record_buf *rec = LIBXL_STAILQ_LAST( + &stream->record_queue, libxl__sr_record_buf, entry); + + assert(stream->in_checkpoint); + + if (!rec || (rec->hdr.type != REC_TYPE_CHECKPOINT_END)) { + setup_read_record(egc, stream); + break; + } + + /* + * There are now some number of buffered records, with a + * CHECKPOINT_END at the end. Start processing them all. + */ + stream->phase = SRS_PHASE_UNBUFFERING; + } + /* FALLTHROUGH */ + case SRS_PHASE_UNBUFFERING: + /* + * Unbuffering phase (checkpointed streams only): + * + * logically: + * do { process_record(); } while ( not CHECKPOINT_END ); + * + * Process all records collected during the buffering phase. + */ + assert(stream->in_checkpoint); + + while (process_record(egc, stream)) + ; /* + * Nothing! process_record() helpfully tells us if no specific + * futher actions have been set up, in which case we want to go + * ahead and process the next record. + */ + break; + default: abort(); } @@ -441,6 +529,15 @@ static bool process_record(libxl__egc *egc, write_emulator_blob(egc, stream, rec); break; + case REC_TYPE_CHECKPOINT_END: + if (!stream->in_checkpoint) { + LOG(ERROR, "Unexpected CHECKPOINT_END record in stream"); + rc = ERROR_FAIL; + goto err; + } + checkpoint_done(egc, stream, 0); + break; + default: LOG(ERROR, "Unrecognised record 0x%08x", rec->hdr.type); rc = ERROR_FAIL; @@ -541,17 +638,50 @@ static void stream_complete(libxl__egc *egc, { assert(stream->running); + if (stream->in_checkpoint) { + assert(rc); + + /* + * If an error is encountered while in a checkpoint, pass it + * back to libxc. The failure will come back around to us via + * libxl__xc_domain_restore_done() + */ + checkpoint_done(egc, stream, rc); + return; + } + if (!stream->rc) stream->rc = rc; stream_done(egc, stream); } +static void checkpoint_done(libxl__egc *egc, + libxl__stream_read_state *stream, int rc) +{ + int ret; + + assert(stream->in_checkpoint); + + if (rc == 0) + ret = XGR_CHECKPOINT_SUCCESS; + else if (stream->phase == SRS_PHASE_BUFFERING) + ret = XGR_CHECKPOINT_FAILOVER; + else + ret = XGR_CHECKPOINT_ERROR; + + stream->checkpoint_callback(egc, stream, ret); + + stream->in_checkpoint = false; + stream->phase = SRS_PHASE_NORMAL; +} + static void stream_done(libxl__egc *egc, libxl__stream_read_state *stream) { libxl__sr_record_buf *rec, *trec; assert(stream->running); + assert(!stream->in_checkpoint); stream->running = false; if (stream->incoming_record) -- 1.7.10.4 _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxx http://lists.xen.org/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |