|
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [RFC PATCH 5/7] xc_domain_save: implement save_callbacks for colo
Add a new save callbacks:
1. post_sendstate(): SVM will run only when XC_SAVE_ID_LAST_CHECKPOINT is
sent to slaver. But we only sent XC_SAVE_ID_LAST_CHECKPOINT when we do
live migration now. Add this callback, and we can send it in this
callback.
Update some callbacks for colo:
1. suspend(): In colo mode, both PVM and SVM are running. So we should suspend
both PVM and SVM.
Communicate with slaver like this:
a. write "continue" to notify slaver to suspend SVM
b. suspend PVM and SVM
c. slaver writes "suspend" to tell master that SVM is suspended
2. postcopy(): In colo mode, both PVM and SVM are running, and we have suspended
both PVM and SVM. So we should resume PVM and SVM
Communicate with slaver like this:
a. write "resume" to notify slaver to resume SVM
b. resume PVM and SVM
c. slaver writes "resume" to tell master that SVM is resumed
3. checkpoint(): In colo mode, we do a new checkpoint only when output packet
from PVM and SVM is different. We will block in this callback and return
when a output packet is different.
Signed-off-by: Ye Wei <wei.ye1987@xxxxxxxxx>
Signed-off-by: Jiang Yunhong <yunhong.jiang@xxxxxxxxx>
Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
---
tools/libxc/xc_domain_save.c | 9 +
tools/libxc/xenguest.h | 3 +
tools/python/xen/lowlevel/checkpoint/checkpoint.c | 289 +++++++++++++++++++++-
tools/python/xen/lowlevel/checkpoint/checkpoint.h | 2 +
4 files changed, 299 insertions(+), 4 deletions(-)
diff --git a/tools/libxc/xc_domain_save.c b/tools/libxc/xc_domain_save.c
index cc4004a..870fea5 100644
--- a/tools/libxc/xc_domain_save.c
+++ b/tools/libxc/xc_domain_save.c
@@ -1645,6 +1645,15 @@ int xc_domain_save(xc_interface *xch, int io_fd,
uint32_t dom, uint32_t max_iter
}
}
+ if ( callbacks->post_sendstate )
+ {
+ if ( callbacks->post_sendstate(callbacks->data) < 0)
+ {
+ PERROR("Error: post_sendstate()\n");
+ goto out;
+ }
+ }
+
/* Zero terminate */
i = 0;
if ( wrexact(io_fd, &i, sizeof(int)) )
diff --git a/tools/libxc/xenguest.h b/tools/libxc/xenguest.h
index 709a284..04d2aaf 100644
--- a/tools/libxc/xenguest.h
+++ b/tools/libxc/xenguest.h
@@ -43,6 +43,9 @@ struct save_callbacks {
/* Enable qemu-dm logging dirty pages to xen */
int (*switch_qemu_logdirty)(int domid, unsigned enable, void *data); /*
HVM only */
+ /* called before Zero terminate is sent */
+ int (*post_sendstate)(void *data);
+
/* to be provided as the last argument to each callback function */
void* data;
};
diff --git a/tools/python/xen/lowlevel/checkpoint/checkpoint.c
b/tools/python/xen/lowlevel/checkpoint/checkpoint.c
index 7545d7d..f880f1b 100644
--- a/tools/python/xen/lowlevel/checkpoint/checkpoint.c
+++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.c
@@ -1,14 +1,22 @@
/* python bridge to checkpointing API */
#include <Python.h>
+#include <sys/wait.h>
#include <xs.h>
#include <xenctrl.h>
+#include <xc_private.h>
+#include <xg_save_restore.h>
#include "checkpoint.h"
#define PKG "xen.lowlevel.checkpoint"
+#define COMP_IOC_MAGIC 'k'
+#define COMP_IOCTWAIT _IO(COMP_IOC_MAGIC, 0)
+#define COMP_IOCTFLUSH _IO(COMP_IOC_MAGIC, 1)
+#define COMP_IOCTRESUME _IO(COMP_IOC_MAGIC, 2)
+
static PyObject* CheckpointError;
typedef struct {
@@ -24,11 +32,15 @@ typedef struct {
PyObject* checkpoint_cb;
PyThreadState* threadstate;
+ int colo;
+ int first_time;
+ int dev_fd;
} CheckpointObject;
static int suspend_trampoline(void* data);
static int postcopy_trampoline(void* data);
static int checkpoint_trampoline(void* data);
+static int post_sendstate_trampoline(void *data);
static PyObject* Checkpoint_new(PyTypeObject* type, PyObject* args,
PyObject* kwargs)
@@ -105,6 +117,7 @@ static PyObject* pycheckpoint_start(PyObject* obj,
PyObject* args) {
int fd;
struct save_callbacks callbacks;
int rc;
+ int flags = 0;
if (!PyArg_ParseTuple(args, "O|OOOI", &iofile, &suspend_cb, &postcopy_cb,
&checkpoint_cb, &interval))
@@ -151,9 +164,16 @@ static PyObject* pycheckpoint_start(PyObject* obj,
PyObject* args) {
} else
self->checkpoint_cb = NULL;
+ if (flags & CHECKPOINT_FLAGS_COLO)
+ self->colo = 1;
+ else
+ self->colo = 0;
+ self->first_time = 1;
+
callbacks.suspend = suspend_trampoline;
callbacks.postcopy = postcopy_trampoline;
callbacks.checkpoint = checkpoint_trampoline;
+ callbacks.post_sendstate = post_sendstate_trampoline;
callbacks.data = self;
self->threadstate = PyEval_SaveThread();
@@ -258,6 +278,192 @@ PyMODINIT_FUNC initcheckpoint(void) {
block_timer();
}
+/* colo functions */
+
+/* master slaver comment
+ * "continue" ===>
+ * <=== "suspend" guest is suspended
+ */
+static int notify_slaver_suspend(CheckpointObject *self)
+{
+ int fd = self->cps.fd;
+
+ return write_exact(fd, "continue", 8);
+}
+
+static int wait_slaver_suspend(CheckpointObject *self)
+{
+ int fd = self->cps.fd;
+ xc_interface *xch = self->cps.xch;
+ char buf[8];
+
+ if (self->first_time) {
+ self->first_time = 0;
+ return 0;
+ }
+
+ if ( read_exact(fd, buf, 7) < 0) {
+ PERROR("read: suspend");
+ return -1;
+ }
+
+ buf[7] = '\0';
+ if (strcmp(buf, "suspend")) {
+ PERROR("read \"%s\", expect \"suspend\"", buf);
+ return -1;
+ }
+
+ return 0;
+}
+
+static int notify_slaver_start_checkpoint(CheckpointObject *self)
+{
+ int fd = self->cps.fd;
+ xc_interface *xch = self->cps.xch;
+
+ if ( write_exact(fd, "start", 8) < 0) {
+ PERROR("write start");
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ * master slaver
+ * <==== "finish"
+ * flush packets
+ * "resume" ====>
+ * resume vm resume vm
+ * <==== "resume"
+ */
+static int notify_slaver_resume(CheckpointObject *self)
+{
+ int fd = self->cps.fd;
+ xc_interface *xch = self->cps.xch;
+ char buf[7];
+
+ /* wait slaver to finish update memory, device state... */
+ if ( read_exact(fd, buf, 6) < 0) {
+ PERROR("read: finish");
+ return -1;
+ }
+
+ buf[6] = '\0';
+ if (strcmp(buf, "finish")) {
+ ERROR("read \"%s\", expect \"finish\"", buf);
+ return -1;
+ }
+
+ if (!self->first_time)
+ /* flush queued packets now */
+ ioctl(self->dev_fd, COMP_IOCTFLUSH);
+
+ /* notify slaver to resume vm*/
+ if (write_exact(fd, "resume", 6)) {
+ PERROR("write: resume");
+ return -1;
+ }
+
+ return 0;
+}
+
+static int install_fw_network(CheckpointObject *self)
+{
+ pid_t pid;
+ xc_interface *xch = self->cps.xch;
+ int status;
+ int rc;
+
+ pid = vfork();
+ if (pid < 0) {
+ PERROR("vfork fails");
+ return -1;
+ }
+
+ if (pid > 0) {
+ rc = wait(&status);
+ if (rc != 0 || status != 0) {
+ ERROR("getting child status fails");
+ return -1;
+ }
+
+ return 0;
+ }
+
+ execl("/etc/xen/scripts/HA_fw_runtime.sh", "HA_fw_runtime.sh", "install",
NULL);
+ PERROR("execl fails");
+ return -1;
+}
+
+static int wait_slaver_resume(CheckpointObject *self)
+{
+ int fd = self->cps.fd;
+ xc_interface *xch = self->cps.xch;
+ char buf[7];
+
+ if (read_exact(fd, buf, 6) < 0) {
+ PERROR("read resume");
+ return -1;
+ }
+
+ buf[6] = '\0';
+ if (strcmp(buf, "resume")) {
+ ERROR("read \"%s\", expect \"resume\"", buf);
+ return -1;
+ }
+
+ return 0;
+}
+
+static int colo_postresume(CheckpointObject *self)
+{
+ int rc;
+ int dev_fd = self->dev_fd;
+
+ rc = wait_slaver_resume(self);
+ if (rc < 0)
+ return rc;
+
+ if (self->first_time) {
+ rc = install_fw_network(self);
+ if (rc < 0)
+ return rc;
+ } else {
+ ioctl(dev_fd, COMP_IOCTRESUME);
+ }
+
+ return 0;
+}
+
+static int pre_checkpoint(CheckpointObject *self)
+{
+ xc_interface *xch = self->cps.xch;
+
+ if (!self->first_time)
+ return 0;
+
+ self->dev_fd = open("/dev/HA_compare", O_RDWR);
+ if (self->dev_fd < 0) {
+ PERROR("opening /dev/HA_compare fails");
+ return -1;
+ }
+
+ return 0;
+}
+
+static void wait_new_checkpoint(CheckpointObject *self)
+{
+ int dev_fd = self->dev_fd;
+ int err;
+
+ while (1) {
+ err = ioctl(dev_fd, COMP_IOCTWAIT);
+ if (err == 0 || err == -1)
+ break;
+ }
+}
+
/* private functions */
/* bounce C suspend call into python equivalent.
@@ -268,6 +474,13 @@ static int suspend_trampoline(void* data)
PyObject* result;
+ if (self->colo) {
+ if (notify_slaver_suspend(self) < 0) {
+ fprintf(stderr, "nofitying slaver suspend fails\n");
+ return 0;
+ }
+ }
+
/* call default suspend function, then python hook if available */
if (self->armed) {
if (checkpoint_wait(&self->cps) < 0) {
@@ -286,8 +499,16 @@ static int suspend_trampoline(void* data)
}
}
+ /* suspend_cb() should be called after both sides are suspended */
+ if (self->colo) {
+ if (wait_slaver_suspend(self) < 0) {
+ fprintf(stderr, "waiting slaver suspend fails\n");
+ return 0;
+ }
+ }
+
if (!self->suspend_cb)
- return 1;
+ goto start_checkpoint;
PyEval_RestoreThread(self->threadstate);
result = PyObject_CallFunction(self->suspend_cb, NULL);
@@ -298,12 +519,24 @@ static int suspend_trampoline(void* data)
if (result == Py_None || PyObject_IsTrue(result)) {
Py_DECREF(result);
- return 1;
+ goto start_checkpoint;
}
Py_DECREF(result);
return 0;
+
+start_checkpoint:
+ if (self->colo) {
+ if (notify_slaver_start_checkpoint(self) < 0) {
+ fprintf(stderr, "nofitying slaver to start checkpoint fails\n");
+ return 0;
+ }
+
+ self->first_time = 0;
+ }
+
+ return 1;
}
static int postcopy_trampoline(void* data)
@@ -313,6 +546,13 @@ static int postcopy_trampoline(void* data)
PyObject* result;
int rc = 0;
+ if (self->colo) {
+ if (notify_slaver_resume(self) < 0) {
+ fprintf(stderr, "nofitying slaver resume fails\n");
+ return 0;
+ }
+ }
+
if (!self->postcopy_cb)
goto resume;
@@ -331,6 +571,13 @@ static int postcopy_trampoline(void* data)
return 0;
}
+ if (self->colo) {
+ if (colo_postresume(self) < 0) {
+ fprintf(stderr, "postresume fails\n");
+ return 0;
+ }
+ }
+
return rc;
}
@@ -345,8 +592,15 @@ static int checkpoint_trampoline(void* data)
return -1;
}
+ if (self->colo) {
+ if (pre_checkpoint(self) < 0) {
+ fprintf(stderr, "pre_checkpoint() fails\n");
+ return -1;
+ }
+ }
+
if (!self->checkpoint_cb)
- return 0;
+ goto wait_checkpoint;
PyEval_RestoreThread(self->threadstate);
result = PyObject_CallFunction(self->checkpoint_cb, NULL);
@@ -357,10 +611,37 @@ static int checkpoint_trampoline(void* data)
if (result == Py_None || PyObject_IsTrue(result)) {
Py_DECREF(result);
- return 1;
+ goto wait_checkpoint;
}
Py_DECREF(result);
return 0;
+
+wait_checkpoint:
+ if (self->colo) {
+ wait_new_checkpoint(self);
+ }
+
+ return 1;
+}
+
+static int post_sendstate_trampoline(void* data)
+{
+ CheckpointObject *self = data;
+ int fd = self->cps.fd;
+ int i = XC_SAVE_ID_LAST_CHECKPOINT;
+
+ if (!self->colo)
+ return 0;
+
+ /* In colo mode, guest is running on slaver side, so we should
+ * send XC_SAVE_ID_LAST_CHECKPOINT to slaver.
+ */
+ if (write_exact(fd, &i, sizeof(int)) < 0) {
+ fprintf(stderr, "writing XC_SAVE_ID_LAST_CHECKPOINT fails\n");
+ return -1;
+ }
+
+ return 0;
}
diff --git a/tools/python/xen/lowlevel/checkpoint/checkpoint.h
b/tools/python/xen/lowlevel/checkpoint/checkpoint.h
index 36455fb..5dd6440 100644
--- a/tools/python/xen/lowlevel/checkpoint/checkpoint.h
+++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.h
@@ -40,6 +40,8 @@ typedef struct {
timer_t timer;
} checkpoint_state;
+#define CHECKPOINT_FLAGS_COLO 2
+
char* checkpoint_error(checkpoint_state* s);
void checkpoint_init(checkpoint_state* s);
--
1.8.0
_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel
|
![]() |
Lists.xenproject.org is hosted with RackSpace, monitoring our |