[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [PATCH RFC v2 17/23] libxl/libxl_stream_read.c: track callback chains with an explicit phase
From: Joshua Otto <jtotto@xxxxxxxxxxxx> As the previous patch did for libxl_stream_write, do for libxl_stream_read. libxl_stream_read already has a notion of phase for its record-buffering behaviour - this is combined with the callback chain phase. Again, this is done to support the addition of a new callback chain for postcopy live migration. No functional change. Signed-off-by: Joshua Otto <jtotto@xxxxxxxxxxxx> --- tools/libxl/libxl_internal.h | 7 ++-- tools/libxl/libxl_stream_read.c | 83 +++++++++++++++++++++-------------------- 2 files changed, 45 insertions(+), 45 deletions(-) diff --git a/tools/libxl/libxl_internal.h b/tools/libxl/libxl_internal.h index cef2f39..30d5492 100644 --- a/tools/libxl/libxl_internal.h +++ b/tools/libxl/libxl_internal.h @@ -3133,9 +3133,7 @@ struct libxl__stream_read_state { /* Private */ int rc; bool running; - bool in_checkpoint; bool sync_teardown; /* Only used to coordinate shutdown on error path. */ - bool in_checkpoint_state; libxl__save_helper_state shs; libxl__conversion_helper_state chs; @@ -3145,8 +3143,9 @@ struct libxl__stream_read_state { LIBXL_STAILQ_HEAD(, libxl__sr_record_buf) record_queue; /* NOGC */ enum { SRS_PHASE_NORMAL, - SRS_PHASE_BUFFERING, - SRS_PHASE_UNBUFFERING, + SRS_PHASE_CHECKPOINT_BUFFERING, + SRS_PHASE_CHECKPOINT_UNBUFFERING, + SRS_PHASE_CHECKPOINT_STATE } phase; bool recursion_guard; diff --git a/tools/libxl/libxl_stream_read.c b/tools/libxl/libxl_stream_read.c index 89c2f21..4cb553e 100644 --- a/tools/libxl/libxl_stream_read.c +++ b/tools/libxl/libxl_stream_read.c @@ -29,14 +29,15 @@ * processed, and all records will be processed in queue order. * * Internal states: - * running phase in_ record incoming - * checkpoint _queue _record + * running phase record incoming + * _queue _record * - * Undefined undef undef undef undef undef - * Idle false undef false 0 0 - * Active true NORMAL false 0/1 0/partial - * Active true BUFFERING true any 0/partial - * Active true UNBUFFERING true any 0 + * Undefined undef undef undef undef + * Idle false undef 0 0 + * Active true NORMAL 0/1 0/partial + * Active true CHECKPOINT_BUFFERING any 0/partial + * Active true CHECKPOINT_UNBUFFERING any 0 + * Active true CHECKPOINT_STATE 0/1 0/partial * * While reading data from the stream, 'dc' is active and a callback * is expected. Most actions in process_record() start a callback of @@ -48,12 +49,12 @@ * Records are read one at time and immediately processed. (The * record queue will not contain more than a single record.) * - * PHASE_BUFFERING: + * PHASE_CHECKPOINT_BUFFERING: * This phase is used in checkpointed streams, when libxc signals * the presence of a checkpoint in the stream. Records are read and * buffered until a CHECKPOINT_END record has been read. * - * PHASE_UNBUFFERING: + * PHASE_CHECKPOINT_UNBUFFERING: * Once a CHECKPOINT_END record has been read, all buffered records * are processed. * @@ -172,6 +173,12 @@ static void checkpoint_state_done(libxl__egc *egc, /*----- Helpers -----*/ +static inline bool stream_in_checkpoint(libxl__stream_read_state *stream) +{ + return stream->phase == SRS_PHASE_CHECKPOINT_BUFFERING || + stream->phase == SRS_PHASE_CHECKPOINT_UNBUFFERING; +} + /* Helper to set up reading some data from the stream. */ static int setup_read(libxl__stream_read_state *stream, const char *what, void *ptr, size_t nr_bytes, @@ -210,7 +217,6 @@ void libxl__stream_read_init(libxl__stream_read_state *stream) stream->rc = 0; stream->running = false; - stream->in_checkpoint = false; stream->sync_teardown = false; FILLZERO(stream->dc); FILLZERO(stream->hdr); @@ -297,10 +303,9 @@ void libxl__stream_read_start_checkpoint(libxl__egc *egc, libxl__stream_read_state *stream) { assert(stream->running); - assert(!stream->in_checkpoint); + assert(stream->phase == SRS_PHASE_NORMAL); - stream->in_checkpoint = true; - stream->phase = SRS_PHASE_BUFFERING; + stream->phase = SRS_PHASE_CHECKPOINT_BUFFERING; /* * Libxc has handed control of the fd to us. Start reading some @@ -392,6 +397,7 @@ static void stream_continue(libxl__egc *egc, switch (stream->phase) { case SRS_PHASE_NORMAL: + case SRS_PHASE_CHECKPOINT_STATE: /* * Normal phase (regular migration or restore from file): * @@ -416,9 +422,9 @@ static void stream_continue(libxl__egc *egc, } break; - case SRS_PHASE_BUFFERING: { + case SRS_PHASE_CHECKPOINT_BUFFERING: { /* - * Buffering phase (checkpointed streams only): + * Buffering phase: * * logically: * do { read_record(); } while ( not CHECKPOINT_END ); @@ -431,8 +437,6 @@ static void stream_continue(libxl__egc *egc, libxl__sr_record_buf *rec = LIBXL_STAILQ_LAST( &stream->record_queue, libxl__sr_record_buf, entry); - assert(stream->in_checkpoint); - if (!rec || (rec->hdr.type != REC_TYPE_CHECKPOINT_END)) { setup_read_record(egc, stream); break; @@ -442,19 +446,18 @@ static void stream_continue(libxl__egc *egc, * There are now some number of buffered records, with a * CHECKPOINT_END at the end. Start processing them all. */ - stream->phase = SRS_PHASE_UNBUFFERING; + stream->phase = SRS_PHASE_CHECKPOINT_UNBUFFERING; } /* FALLTHROUGH */ - case SRS_PHASE_UNBUFFERING: + case SRS_PHASE_CHECKPOINT_UNBUFFERING: /* - * Unbuffering phase (checkpointed streams only): + * Unbuffering phase: * * logically: * do { process_record(); } while ( not CHECKPOINT_END ); * * Process all records collected during the buffering phase. */ - assert(stream->in_checkpoint); while (process_record(egc, stream)) ; /* @@ -625,7 +628,7 @@ static bool process_record(libxl__egc *egc, break; case REC_TYPE_CHECKPOINT_END: - if (!stream->in_checkpoint) { + if (!stream_in_checkpoint(stream)) { LOG(ERROR, "Unexpected CHECKPOINT_END record in stream"); rc = ERROR_FAIL; goto err; @@ -634,7 +637,7 @@ static bool process_record(libxl__egc *egc, break; case REC_TYPE_CHECKPOINT_STATE: - if (!stream->in_checkpoint_state) { + if (stream->phase != SRS_PHASE_CHECKPOINT_STATE) { LOG(ERROR, "Unexpected CHECKPOINT_STATE record in stream"); rc = ERROR_FAIL; goto err; @@ -743,7 +746,12 @@ static void stream_complete(libxl__egc *egc, { assert(stream->running); - if (stream->in_checkpoint) { + switch (stream->phase) { + case SRS_PHASE_NORMAL: + stream_done(egc, stream, rc); + break; + case SRS_PHASE_CHECKPOINT_BUFFERING: + case SRS_PHASE_CHECKPOINT_UNBUFFERING: assert(rc); /* @@ -752,10 +760,8 @@ static void stream_complete(libxl__egc *egc, * libxl__xc_domain_restore_done() */ checkpoint_done(egc, stream, rc); - return; - } - - if (stream->in_checkpoint_state) { + break; + case SRS_PHASE_CHECKPOINT_STATE: assert(rc); /* @@ -767,10 +773,8 @@ static void stream_complete(libxl__egc *egc, * libxl__stream_read_abort() */ checkpoint_state_done(egc, stream, rc); - return; + break; } - - stream_done(egc, stream, rc); } static void checkpoint_done(libxl__egc *egc, @@ -778,18 +782,17 @@ static void checkpoint_done(libxl__egc *egc, { int ret; - assert(stream->in_checkpoint); + assert(stream_in_checkpoint(stream)); if (rc == 0) ret = XGR_CHECKPOINT_SUCCESS; - else if (stream->phase == SRS_PHASE_BUFFERING) + else if (stream->phase == SRS_PHASE_CHECKPOINT_BUFFERING) ret = XGR_CHECKPOINT_FAILOVER; else ret = XGR_CHECKPOINT_ERROR; stream->checkpoint_callback(egc, stream, ret); - stream->in_checkpoint = false; stream->phase = SRS_PHASE_NORMAL; } @@ -799,8 +802,7 @@ static void stream_done(libxl__egc *egc, libxl__sr_record_buf *rec, *trec; assert(stream->running); - assert(!stream->in_checkpoint); - assert(!stream->in_checkpoint_state); + assert(stream->phase == SRS_PHASE_NORMAL); stream->running = false; if (stream->incoming_record) @@ -955,9 +957,8 @@ void libxl__stream_read_checkpoint_state(libxl__egc *egc, libxl__stream_read_state *stream) { assert(stream->running); - assert(!stream->in_checkpoint); - assert(!stream->in_checkpoint_state); - stream->in_checkpoint_state = true; + assert(stream->phase == SRS_PHASE_NORMAL); + stream->phase = SRS_PHASE_CHECKPOINT_STATE; setup_read_record(egc, stream); } @@ -965,8 +966,8 @@ void libxl__stream_read_checkpoint_state(libxl__egc *egc, static void checkpoint_state_done(libxl__egc *egc, libxl__stream_read_state *stream, int rc) { - assert(stream->in_checkpoint_state); - stream->in_checkpoint_state = false; + assert(stream->phase == SRS_PHASE_CHECKPOINT_STATE); + stream->phase = SRS_PHASE_NORMAL; stream->checkpoint_callback(egc, stream, rc); } -- 2.7.4 _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxxxxxxxxx https://lists.xenproject.org/mailman/listinfo/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |