[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [RFC PATCH 5/7] xc_domain_save: implement save_callbacks for colo
Add a new save callbacks: 1. post_sendstate(): SVM will run only when XC_SAVE_ID_LAST_CHECKPOINT is sent to slaver. But we only sent XC_SAVE_ID_LAST_CHECKPOINT when we do live migration now. Add this callback, and we can send it in this callback. Update some callbacks for colo: 1. suspend(): In colo mode, both PVM and SVM are running. So we should suspend both PVM and SVM. Communicate with slaver like this: a. write "continue" to notify slaver to suspend SVM b. suspend PVM and SVM c. slaver writes "suspend" to tell master that SVM is suspended 2. postcopy(): In colo mode, both PVM and SVM are running, and we have suspended both PVM and SVM. So we should resume PVM and SVM Communicate with slaver like this: a. write "resume" to notify slaver to resume SVM b. resume PVM and SVM c. slaver writes "resume" to tell master that SVM is resumed 3. checkpoint(): In colo mode, we do a new checkpoint only when output packet from PVM and SVM is different. We will block in this callback and return when a output packet is different. Signed-off-by: Ye Wei <wei.ye1987@xxxxxxxxx> Signed-off-by: Jiang Yunhong <yunhong.jiang@xxxxxxxxx> Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx> --- tools/libxc/xc_domain_save.c | 9 + tools/libxc/xenguest.h | 3 + tools/python/xen/lowlevel/checkpoint/checkpoint.c | 289 +++++++++++++++++++++- tools/python/xen/lowlevel/checkpoint/checkpoint.h | 2 + 4 files changed, 299 insertions(+), 4 deletions(-) diff --git a/tools/libxc/xc_domain_save.c b/tools/libxc/xc_domain_save.c index cc4004a..870fea5 100644 --- a/tools/libxc/xc_domain_save.c +++ b/tools/libxc/xc_domain_save.c @@ -1645,6 +1645,15 @@ int xc_domain_save(xc_interface *xch, int io_fd, uint32_t dom, uint32_t max_iter } } + if ( callbacks->post_sendstate ) + { + if ( callbacks->post_sendstate(callbacks->data) < 0) + { + PERROR("Error: post_sendstate()\n"); + goto out; + } + } + /* Zero terminate */ i = 0; if ( wrexact(io_fd, &i, sizeof(int)) ) diff --git a/tools/libxc/xenguest.h b/tools/libxc/xenguest.h index 709a284..04d2aaf 100644 --- a/tools/libxc/xenguest.h +++ b/tools/libxc/xenguest.h @@ -43,6 +43,9 @@ struct save_callbacks { /* Enable qemu-dm logging dirty pages to xen */ int (*switch_qemu_logdirty)(int domid, unsigned enable, void *data); /* HVM only */ + /* called before Zero terminate is sent */ + int (*post_sendstate)(void *data); + /* to be provided as the last argument to each callback function */ void* data; }; diff --git a/tools/python/xen/lowlevel/checkpoint/checkpoint.c b/tools/python/xen/lowlevel/checkpoint/checkpoint.c index 7545d7d..f880f1b 100644 --- a/tools/python/xen/lowlevel/checkpoint/checkpoint.c +++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.c @@ -1,14 +1,22 @@ /* python bridge to checkpointing API */ #include <Python.h> +#include <sys/wait.h> #include <xs.h> #include <xenctrl.h> +#include <xc_private.h> +#include <xg_save_restore.h> #include "checkpoint.h" #define PKG "xen.lowlevel.checkpoint" +#define COMP_IOC_MAGIC 'k' +#define COMP_IOCTWAIT _IO(COMP_IOC_MAGIC, 0) +#define COMP_IOCTFLUSH _IO(COMP_IOC_MAGIC, 1) +#define COMP_IOCTRESUME _IO(COMP_IOC_MAGIC, 2) + static PyObject* CheckpointError; typedef struct { @@ -24,11 +32,15 @@ typedef struct { PyObject* checkpoint_cb; PyThreadState* threadstate; + int colo; + int first_time; + int dev_fd; } CheckpointObject; static int suspend_trampoline(void* data); static int postcopy_trampoline(void* data); static int checkpoint_trampoline(void* data); +static int post_sendstate_trampoline(void *data); static PyObject* Checkpoint_new(PyTypeObject* type, PyObject* args, PyObject* kwargs) @@ -105,6 +117,7 @@ static PyObject* pycheckpoint_start(PyObject* obj, PyObject* args) { int fd; struct save_callbacks callbacks; int rc; + int flags = 0; if (!PyArg_ParseTuple(args, "O|OOOI", &iofile, &suspend_cb, &postcopy_cb, &checkpoint_cb, &interval)) @@ -151,9 +164,16 @@ static PyObject* pycheckpoint_start(PyObject* obj, PyObject* args) { } else self->checkpoint_cb = NULL; + if (flags & CHECKPOINT_FLAGS_COLO) + self->colo = 1; + else + self->colo = 0; + self->first_time = 1; + callbacks.suspend = suspend_trampoline; callbacks.postcopy = postcopy_trampoline; callbacks.checkpoint = checkpoint_trampoline; + callbacks.post_sendstate = post_sendstate_trampoline; callbacks.data = self; self->threadstate = PyEval_SaveThread(); @@ -258,6 +278,192 @@ PyMODINIT_FUNC initcheckpoint(void) { block_timer(); } +/* colo functions */ + +/* master slaver comment + * "continue" ===> + * <=== "suspend" guest is suspended + */ +static int notify_slaver_suspend(CheckpointObject *self) +{ + int fd = self->cps.fd; + + return write_exact(fd, "continue", 8); +} + +static int wait_slaver_suspend(CheckpointObject *self) +{ + int fd = self->cps.fd; + xc_interface *xch = self->cps.xch; + char buf[8]; + + if (self->first_time) { + self->first_time = 0; + return 0; + } + + if ( read_exact(fd, buf, 7) < 0) { + PERROR("read: suspend"); + return -1; + } + + buf[7] = '\0'; + if (strcmp(buf, "suspend")) { + PERROR("read \"%s\", expect \"suspend\"", buf); + return -1; + } + + return 0; +} + +static int notify_slaver_start_checkpoint(CheckpointObject *self) +{ + int fd = self->cps.fd; + xc_interface *xch = self->cps.xch; + + if ( write_exact(fd, "start", 8) < 0) { + PERROR("write start"); + return -1; + } + + return 0; +} + +/* + * master slaver + * <==== "finish" + * flush packets + * "resume" ====> + * resume vm resume vm + * <==== "resume" + */ +static int notify_slaver_resume(CheckpointObject *self) +{ + int fd = self->cps.fd; + xc_interface *xch = self->cps.xch; + char buf[7]; + + /* wait slaver to finish update memory, device state... */ + if ( read_exact(fd, buf, 6) < 0) { + PERROR("read: finish"); + return -1; + } + + buf[6] = '\0'; + if (strcmp(buf, "finish")) { + ERROR("read \"%s\", expect \"finish\"", buf); + return -1; + } + + if (!self->first_time) + /* flush queued packets now */ + ioctl(self->dev_fd, COMP_IOCTFLUSH); + + /* notify slaver to resume vm*/ + if (write_exact(fd, "resume", 6)) { + PERROR("write: resume"); + return -1; + } + + return 0; +} + +static int install_fw_network(CheckpointObject *self) +{ + pid_t pid; + xc_interface *xch = self->cps.xch; + int status; + int rc; + + pid = vfork(); + if (pid < 0) { + PERROR("vfork fails"); + return -1; + } + + if (pid > 0) { + rc = wait(&status); + if (rc != 0 || status != 0) { + ERROR("getting child status fails"); + return -1; + } + + return 0; + } + + execl("/etc/xen/scripts/HA_fw_runtime.sh", "HA_fw_runtime.sh", "install", NULL); + PERROR("execl fails"); + return -1; +} + +static int wait_slaver_resume(CheckpointObject *self) +{ + int fd = self->cps.fd; + xc_interface *xch = self->cps.xch; + char buf[7]; + + if (read_exact(fd, buf, 6) < 0) { + PERROR("read resume"); + return -1; + } + + buf[6] = '\0'; + if (strcmp(buf, "resume")) { + ERROR("read \"%s\", expect \"resume\"", buf); + return -1; + } + + return 0; +} + +static int colo_postresume(CheckpointObject *self) +{ + int rc; + int dev_fd = self->dev_fd; + + rc = wait_slaver_resume(self); + if (rc < 0) + return rc; + + if (self->first_time) { + rc = install_fw_network(self); + if (rc < 0) + return rc; + } else { + ioctl(dev_fd, COMP_IOCTRESUME); + } + + return 0; +} + +static int pre_checkpoint(CheckpointObject *self) +{ + xc_interface *xch = self->cps.xch; + + if (!self->first_time) + return 0; + + self->dev_fd = open("/dev/HA_compare", O_RDWR); + if (self->dev_fd < 0) { + PERROR("opening /dev/HA_compare fails"); + return -1; + } + + return 0; +} + +static void wait_new_checkpoint(CheckpointObject *self) +{ + int dev_fd = self->dev_fd; + int err; + + while (1) { + err = ioctl(dev_fd, COMP_IOCTWAIT); + if (err == 0 || err == -1) + break; + } +} + /* private functions */ /* bounce C suspend call into python equivalent. @@ -268,6 +474,13 @@ static int suspend_trampoline(void* data) PyObject* result; + if (self->colo) { + if (notify_slaver_suspend(self) < 0) { + fprintf(stderr, "nofitying slaver suspend fails\n"); + return 0; + } + } + /* call default suspend function, then python hook if available */ if (self->armed) { if (checkpoint_wait(&self->cps) < 0) { @@ -286,8 +499,16 @@ static int suspend_trampoline(void* data) } } + /* suspend_cb() should be called after both sides are suspended */ + if (self->colo) { + if (wait_slaver_suspend(self) < 0) { + fprintf(stderr, "waiting slaver suspend fails\n"); + return 0; + } + } + if (!self->suspend_cb) - return 1; + goto start_checkpoint; PyEval_RestoreThread(self->threadstate); result = PyObject_CallFunction(self->suspend_cb, NULL); @@ -298,12 +519,24 @@ static int suspend_trampoline(void* data) if (result == Py_None || PyObject_IsTrue(result)) { Py_DECREF(result); - return 1; + goto start_checkpoint; } Py_DECREF(result); return 0; + +start_checkpoint: + if (self->colo) { + if (notify_slaver_start_checkpoint(self) < 0) { + fprintf(stderr, "nofitying slaver to start checkpoint fails\n"); + return 0; + } + + self->first_time = 0; + } + + return 1; } static int postcopy_trampoline(void* data) @@ -313,6 +546,13 @@ static int postcopy_trampoline(void* data) PyObject* result; int rc = 0; + if (self->colo) { + if (notify_slaver_resume(self) < 0) { + fprintf(stderr, "nofitying slaver resume fails\n"); + return 0; + } + } + if (!self->postcopy_cb) goto resume; @@ -331,6 +571,13 @@ static int postcopy_trampoline(void* data) return 0; } + if (self->colo) { + if (colo_postresume(self) < 0) { + fprintf(stderr, "postresume fails\n"); + return 0; + } + } + return rc; } @@ -345,8 +592,15 @@ static int checkpoint_trampoline(void* data) return -1; } + if (self->colo) { + if (pre_checkpoint(self) < 0) { + fprintf(stderr, "pre_checkpoint() fails\n"); + return -1; + } + } + if (!self->checkpoint_cb) - return 0; + goto wait_checkpoint; PyEval_RestoreThread(self->threadstate); result = PyObject_CallFunction(self->checkpoint_cb, NULL); @@ -357,10 +611,37 @@ static int checkpoint_trampoline(void* data) if (result == Py_None || PyObject_IsTrue(result)) { Py_DECREF(result); - return 1; + goto wait_checkpoint; } Py_DECREF(result); return 0; + +wait_checkpoint: + if (self->colo) { + wait_new_checkpoint(self); + } + + return 1; +} + +static int post_sendstate_trampoline(void* data) +{ + CheckpointObject *self = data; + int fd = self->cps.fd; + int i = XC_SAVE_ID_LAST_CHECKPOINT; + + if (!self->colo) + return 0; + + /* In colo mode, guest is running on slaver side, so we should + * send XC_SAVE_ID_LAST_CHECKPOINT to slaver. + */ + if (write_exact(fd, &i, sizeof(int)) < 0) { + fprintf(stderr, "writing XC_SAVE_ID_LAST_CHECKPOINT fails\n"); + return -1; + } + + return 0; } diff --git a/tools/python/xen/lowlevel/checkpoint/checkpoint.h b/tools/python/xen/lowlevel/checkpoint/checkpoint.h index 36455fb..5dd6440 100644 --- a/tools/python/xen/lowlevel/checkpoint/checkpoint.h +++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.h @@ -40,6 +40,8 @@ typedef struct { timer_t timer; } checkpoint_state; +#define CHECKPOINT_FLAGS_COLO 2 + char* checkpoint_error(checkpoint_state* s); void checkpoint_init(checkpoint_state* s); -- 1.8.0 _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxx http://lists.xen.org/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |