[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [RFC Patch v2 15/16] xc_domain_save: implement save_callbacks for colo
Add a new save callbacks: 1. post_sendstate(): SVM will run only when XC_SAVE_ID_LAST_CHECKPOINT is sent to slaver. But we only sent XC_SAVE_ID_LAST_CHECKPOINT when we do live migration now. Add this callback, and we can send it in this callback. Update some callbacks for colo: 1. suspend(): In colo mode, both PVM and SVM are running. So we should suspend both PVM and SVM. Communicate with slaver like this: a. write "continue" to notify slaver to suspend SVM b. suspend PVM and SVM c. slaver writes "suspend" to tell master that SVM is suspended 2. postcopy(): In colo mode, both PVM and SVM are running, and we have suspended both PVM and SVM. So we should resume PVM and SVM Communicate with slaver like this: a. write "resume" to notify slaver to resume SVM b. resume PVM and SVM c. slaver writes "resume" to tell master that SVM is resumed 3. checkpoint(): In colo mode, we do a new checkpoint only when output packet from PVM and SVM is different. We will block in this callback and return when a output packet is different. Signed-off-by: Ye Wei <wei.ye1987@xxxxxxxxx> Signed-off-by: Jiang Yunhong <yunhong.jiang@xxxxxxxxx> Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx> --- tools/libxc/xc_domain_save.c | 17 ++ tools/libxc/xenguest.h | 3 + tools/python/xen/lowlevel/checkpoint/checkpoint.c | 302 ++++++++++++++++++++- tools/python/xen/lowlevel/checkpoint/checkpoint.h | 1 + 4 files changed, 319 insertions(+), 4 deletions(-) diff --git a/tools/libxc/xc_domain_save.c b/tools/libxc/xc_domain_save.c index b477188..8f84c9b 100644 --- a/tools/libxc/xc_domain_save.c +++ b/tools/libxc/xc_domain_save.c @@ -1785,6 +1785,23 @@ int xc_domain_save(xc_interface *xch, int io_fd, uint32_t dom, uint32_t max_iter } } + /* Flush last write and discard cache for file. */ + if ( outbuf_flush(xch, ob, io_fd) < 0 ) { + PERROR("Error when flushing output buffer"); + rc = 1; + } + + discard_file_cache(xch, io_fd, 1 /* flush */); + + if ( callbacks->post_sendstate ) + { + if ( callbacks->post_sendstate(callbacks->data) < 0) + { + PERROR("Error: post_sendstate()\n"); + goto out; + } + } + /* Zero terminate */ i = 0; if ( wrexact(io_fd, &i, sizeof(int)) ) diff --git a/tools/libxc/xenguest.h b/tools/libxc/xenguest.h index 4bb444a..9d7d03c 100644 --- a/tools/libxc/xenguest.h +++ b/tools/libxc/xenguest.h @@ -72,6 +72,9 @@ struct save_callbacks { */ int (*toolstack_save)(uint32_t domid, uint8_t **buf, uint32_t *len, void *data); + /* called before Zero terminate is sent */ + int (*post_sendstate)(void *data); + /* to be provided as the last argument to each callback function */ void* data; }; diff --git a/tools/python/xen/lowlevel/checkpoint/checkpoint.c b/tools/python/xen/lowlevel/checkpoint/checkpoint.c index ec14b27..28bdb23 100644 --- a/tools/python/xen/lowlevel/checkpoint/checkpoint.c +++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.c @@ -1,14 +1,22 @@ /* python bridge to checkpointing API */ #include <Python.h> +#include <sys/wait.h> #include <xenstore.h> #include <xenctrl.h> +#include <xc_private.h> +#include <xg_save_restore.h> #include "checkpoint.h" #define PKG "xen.lowlevel.checkpoint" +#define COMP_IOC_MAGIC 'k' +#define COMP_IOCTWAIT _IO(COMP_IOC_MAGIC, 0) +#define COMP_IOCTFLUSH _IO(COMP_IOC_MAGIC, 1) +#define COMP_IOCTRESUME _IO(COMP_IOC_MAGIC, 2) + static PyObject* CheckpointError; typedef struct { @@ -25,11 +33,15 @@ typedef struct { PyObject* setup_cb; PyThreadState* threadstate; + int colo; + int first_time; + int dev_fd; } CheckpointObject; static int suspend_trampoline(void* data); static int postcopy_trampoline(void* data); static int checkpoint_trampoline(void* data); +static int post_sendstate_trampoline(void *data); static PyObject* Checkpoint_new(PyTypeObject* type, PyObject* args, PyObject* kwargs) @@ -169,10 +181,17 @@ static PyObject* pycheckpoint_start(PyObject* obj, PyObject* args) { } else self->setup_cb = NULL; + if (flags & CHECKPOINT_FLAGS_COLO) + self->colo = 1; + else + self->colo = 0; + self->first_time = 1; + memset(&callbacks, 0, sizeof(callbacks)); callbacks.suspend = suspend_trampoline; callbacks.postcopy = postcopy_trampoline; callbacks.checkpoint = checkpoint_trampoline; + callbacks.post_sendstate = post_sendstate_trampoline; callbacks.data = self; self->threadstate = PyEval_SaveThread(); @@ -279,6 +298,196 @@ PyMODINIT_FUNC initcheckpoint(void) { block_timer(); } +/* colo functions */ + +/* master slaver comment + * "continue" ===> + * <=== "suspend" guest is suspended + */ +static int notify_slaver_suspend(CheckpointObject *self) +{ + int fd = self->cps.fd; + + if (self->first_time == 1) + return 0; + + return write_exact(fd, "continue", 8); +} + +static int wait_slaver_suspend(CheckpointObject *self) +{ + int fd = self->cps.fd; + xc_interface *xch = self->cps.xch; + char buf[8]; + + if (self->first_time == 1) + return 0; + + if ( read_exact(fd, buf, 7) < 0) { + PERROR("read: suspend"); + return -1; + } + + buf[7] = '\0'; + if (strcmp(buf, "suspend")) { + PERROR("read \"%s\", expect \"suspend\"", buf); + return -1; + } + + return 0; +} + +static int notify_slaver_start_checkpoint(CheckpointObject *self) +{ + int fd = self->cps.fd; + xc_interface *xch = self->cps.xch; + + if (self->first_time == 1) + return 0; + + if ( write_exact(fd, "start", 5) < 0) { + PERROR("write start"); + return -1; + } + + return 0; +} + +/* + * master slaver + * <==== "finish" + * flush packets + * "resume" ====> + * resume vm resume vm + * <==== "resume" + */ +static int notify_slaver_resume(CheckpointObject *self) +{ + int fd = self->cps.fd; + xc_interface *xch = self->cps.xch; + char buf[7]; + + /* wait slaver to finish update memory, device state... */ + if ( read_exact(fd, buf, 6) < 0) { + PERROR("read: finish"); + return -1; + } + + buf[6] = '\0'; + if (strcmp(buf, "finish")) { + ERROR("read \"%s\", expect \"finish\"", buf); + return -1; + } + + if (!self->first_time) + /* flush queued packets now */ + ioctl(self->dev_fd, COMP_IOCTFLUSH); + + /* notify slaver to resume vm*/ + if (write_exact(fd, "resume", 6) < 0) { + PERROR("write: resume"); + return -1; + } + + return 0; +} + +static int install_fw_network(CheckpointObject *self) +{ + int rc; + PyObject* result; + + PyEval_RestoreThread(self->threadstate); + result = PyObject_CallFunction(self->setup_cb, NULL); + self->threadstate = PyEval_SaveThread(); + + if (!result) + return -1; + + if (result == Py_None || PyObject_IsTrue(result)) + rc = 0; + else + rc = -1; + + Py_DECREF(result); + + return rc; +} + +static int wait_slaver_resume(CheckpointObject *self) +{ + int fd = self->cps.fd; + xc_interface *xch = self->cps.xch; + char buf[7]; + + if (read_exact(fd, buf, 6) < 0) { + PERROR("read resume"); + return -1; + } + + buf[6] = '\0'; + if (strcmp(buf, "resume")) { + ERROR("read \"%s\", expect \"resume\"", buf); + return -1; + } + + return 0; +} + +static int colo_postresume(CheckpointObject *self) +{ + int rc; + int dev_fd = self->dev_fd; + + rc = wait_slaver_resume(self); + if (rc < 0) + return rc; + + if (self->first_time) { + rc = install_fw_network(self); + if (rc < 0) { + fprintf(stderr, "install network fails\n"); + return rc; + } + } else { + ioctl(dev_fd, COMP_IOCTRESUME); + } + + return 0; +} + +static int pre_checkpoint(CheckpointObject *self) +{ + xc_interface *xch = self->cps.xch; + + if (!self->first_time) + return 0; + + self->dev_fd = open("/dev/HA_compare", O_RDWR); + if (self->dev_fd < 0) { + PERROR("opening /dev/HA_compare fails"); + return -1; + } + + return 0; +} + +static void wait_new_checkpoint(CheckpointObject *self) +{ + int dev_fd = self->dev_fd; + int err; + + while (1) { + err = ioctl(dev_fd, COMP_IOCTWAIT); + if (err == 0) + break; + + if (err == -1 && errno != ERESTART && errno != ETIME) { + fprintf(stderr, "ioctl() returns -1, errno: %d\n", errno); + } + } +} + /* private functions */ /* bounce C suspend call into python equivalent. @@ -289,6 +498,13 @@ static int suspend_trampoline(void* data) PyObject* result; + if (self->colo) { + if (notify_slaver_suspend(self) < 0) { + fprintf(stderr, "nofitying slaver suspend fails\n"); + return 0; + } + } + /* call default suspend function, then python hook if available */ if (self->armed) { if (checkpoint_wait(&self->cps) < 0) { @@ -307,8 +523,16 @@ static int suspend_trampoline(void* data) } } + /* suspend_cb() should be called after both sides are suspended */ + if (self->colo) { + if (wait_slaver_suspend(self) < 0) { + fprintf(stderr, "waiting slaver suspend fails\n"); + return 0; + } + } + if (!self->suspend_cb) - return 1; + goto start_checkpoint; PyEval_RestoreThread(self->threadstate); result = PyObject_CallFunction(self->suspend_cb, NULL); @@ -319,12 +543,32 @@ static int suspend_trampoline(void* data) if (result == Py_None || PyObject_IsTrue(result)) { Py_DECREF(result); - return 1; + goto start_checkpoint; } Py_DECREF(result); return 0; + +start_checkpoint: + if (self->colo) { + if (notify_slaver_start_checkpoint(self) < 0) { + fprintf(stderr, "nofitying slaver to start checkpoint fails\n"); + return 0; + } + + /* PVM is suspended first when doing live migration, + * and then it is suspended for a new checkpoint. + */ + if (self->first_time == 1) + /* live migration */ + self->first_time = 2; + else if (self->first_time == 2) + /* the first checkpoint */ + self->first_time = 0; + } + + return 1; } static int postcopy_trampoline(void* data) @@ -334,6 +578,13 @@ static int postcopy_trampoline(void* data) PyObject* result; int rc = 0; + if (self->colo) { + if (notify_slaver_resume(self) < 0) { + fprintf(stderr, "nofitying slaver resume fails\n"); + return 0; + } + } + if (!self->postcopy_cb) goto resume; @@ -352,6 +603,13 @@ static int postcopy_trampoline(void* data) return 0; } + if (self->colo) { + if (colo_postresume(self) < 0) { + fprintf(stderr, "postresume fails\n"); + return 0; + } + } + return rc; } @@ -366,8 +624,15 @@ static int checkpoint_trampoline(void* data) return -1; } + if (self->colo) { + if (pre_checkpoint(self) < 0) { + fprintf(stderr, "pre_checkpoint() fails\n"); + return -1; + } + } + if (!self->checkpoint_cb) - return 0; + goto wait_checkpoint; PyEval_RestoreThread(self->threadstate); result = PyObject_CallFunction(self->checkpoint_cb, NULL); @@ -378,10 +643,39 @@ static int checkpoint_trampoline(void* data) if (result == Py_None || PyObject_IsTrue(result)) { Py_DECREF(result); - return 1; + goto wait_checkpoint; } Py_DECREF(result); return 0; + +wait_checkpoint: + if (self->colo) { + wait_new_checkpoint(self); + } + + fprintf(stderr, "\n\nnew checkpoint..........\n"); + + return 1; +} + +static int post_sendstate_trampoline(void* data) +{ + CheckpointObject *self = data; + int fd = self->cps.fd; + int i = XC_SAVE_ID_LAST_CHECKPOINT; + + if (!self->colo) + return 0; + + /* In colo mode, guest is running on slaver side, so we should + * send XC_SAVE_ID_LAST_CHECKPOINT to slaver. + */ + if (write_exact(fd, &i, sizeof(int)) < 0) { + fprintf(stderr, "writing XC_SAVE_ID_LAST_CHECKPOINT fails\n"); + return -1; + } + + return 0; } diff --git a/tools/python/xen/lowlevel/checkpoint/checkpoint.h b/tools/python/xen/lowlevel/checkpoint/checkpoint.h index 187d9d7..96fc949 100644 --- a/tools/python/xen/lowlevel/checkpoint/checkpoint.h +++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.h @@ -41,6 +41,7 @@ typedef struct { } checkpoint_state; #define CHECKPOINT_FLAGS_COMPRESSION 1 +#define CHECKPOINT_FLAGS_COLO 2 char* checkpoint_error(checkpoint_state* s); void checkpoint_init(checkpoint_state* s); -- 1.7.4 _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxx http://lists.xen.org/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |