|
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [RFC Patch v2 15/16] xc_domain_save: implement save_callbacks for colo
Add a new save callbacks:
1. post_sendstate(): SVM will run only when XC_SAVE_ID_LAST_CHECKPOINT is
sent to slaver. But we only sent XC_SAVE_ID_LAST_CHECKPOINT when we do
live migration now. Add this callback, and we can send it in this
callback.
Update some callbacks for colo:
1. suspend(): In colo mode, both PVM and SVM are running. So we should suspend
both PVM and SVM.
Communicate with slaver like this:
a. write "continue" to notify slaver to suspend SVM
b. suspend PVM and SVM
c. slaver writes "suspend" to tell master that SVM is suspended
2. postcopy(): In colo mode, both PVM and SVM are running, and we have suspended
both PVM and SVM. So we should resume PVM and SVM
Communicate with slaver like this:
a. write "resume" to notify slaver to resume SVM
b. resume PVM and SVM
c. slaver writes "resume" to tell master that SVM is resumed
3. checkpoint(): In colo mode, we do a new checkpoint only when output packet
from PVM and SVM is different. We will block in this callback and return
when a output packet is different.
Signed-off-by: Ye Wei <wei.ye1987@xxxxxxxxx>
Signed-off-by: Jiang Yunhong <yunhong.jiang@xxxxxxxxx>
Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
---
tools/libxc/xc_domain_save.c | 17 ++
tools/libxc/xenguest.h | 3 +
tools/python/xen/lowlevel/checkpoint/checkpoint.c | 302 ++++++++++++++++++++-
tools/python/xen/lowlevel/checkpoint/checkpoint.h | 1 +
4 files changed, 319 insertions(+), 4 deletions(-)
diff --git a/tools/libxc/xc_domain_save.c b/tools/libxc/xc_domain_save.c
index b477188..8f84c9b 100644
--- a/tools/libxc/xc_domain_save.c
+++ b/tools/libxc/xc_domain_save.c
@@ -1785,6 +1785,23 @@ int xc_domain_save(xc_interface *xch, int io_fd,
uint32_t dom, uint32_t max_iter
}
}
+ /* Flush last write and discard cache for file. */
+ if ( outbuf_flush(xch, ob, io_fd) < 0 ) {
+ PERROR("Error when flushing output buffer");
+ rc = 1;
+ }
+
+ discard_file_cache(xch, io_fd, 1 /* flush */);
+
+ if ( callbacks->post_sendstate )
+ {
+ if ( callbacks->post_sendstate(callbacks->data) < 0)
+ {
+ PERROR("Error: post_sendstate()\n");
+ goto out;
+ }
+ }
+
/* Zero terminate */
i = 0;
if ( wrexact(io_fd, &i, sizeof(int)) )
diff --git a/tools/libxc/xenguest.h b/tools/libxc/xenguest.h
index 4bb444a..9d7d03c 100644
--- a/tools/libxc/xenguest.h
+++ b/tools/libxc/xenguest.h
@@ -72,6 +72,9 @@ struct save_callbacks {
*/
int (*toolstack_save)(uint32_t domid, uint8_t **buf, uint32_t *len, void
*data);
+ /* called before Zero terminate is sent */
+ int (*post_sendstate)(void *data);
+
/* to be provided as the last argument to each callback function */
void* data;
};
diff --git a/tools/python/xen/lowlevel/checkpoint/checkpoint.c
b/tools/python/xen/lowlevel/checkpoint/checkpoint.c
index ec14b27..28bdb23 100644
--- a/tools/python/xen/lowlevel/checkpoint/checkpoint.c
+++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.c
@@ -1,14 +1,22 @@
/* python bridge to checkpointing API */
#include <Python.h>
+#include <sys/wait.h>
#include <xenstore.h>
#include <xenctrl.h>
+#include <xc_private.h>
+#include <xg_save_restore.h>
#include "checkpoint.h"
#define PKG "xen.lowlevel.checkpoint"
+#define COMP_IOC_MAGIC 'k'
+#define COMP_IOCTWAIT _IO(COMP_IOC_MAGIC, 0)
+#define COMP_IOCTFLUSH _IO(COMP_IOC_MAGIC, 1)
+#define COMP_IOCTRESUME _IO(COMP_IOC_MAGIC, 2)
+
static PyObject* CheckpointError;
typedef struct {
@@ -25,11 +33,15 @@ typedef struct {
PyObject* setup_cb;
PyThreadState* threadstate;
+ int colo;
+ int first_time;
+ int dev_fd;
} CheckpointObject;
static int suspend_trampoline(void* data);
static int postcopy_trampoline(void* data);
static int checkpoint_trampoline(void* data);
+static int post_sendstate_trampoline(void *data);
static PyObject* Checkpoint_new(PyTypeObject* type, PyObject* args,
PyObject* kwargs)
@@ -169,10 +181,17 @@ static PyObject* pycheckpoint_start(PyObject* obj,
PyObject* args) {
} else
self->setup_cb = NULL;
+ if (flags & CHECKPOINT_FLAGS_COLO)
+ self->colo = 1;
+ else
+ self->colo = 0;
+ self->first_time = 1;
+
memset(&callbacks, 0, sizeof(callbacks));
callbacks.suspend = suspend_trampoline;
callbacks.postcopy = postcopy_trampoline;
callbacks.checkpoint = checkpoint_trampoline;
+ callbacks.post_sendstate = post_sendstate_trampoline;
callbacks.data = self;
self->threadstate = PyEval_SaveThread();
@@ -279,6 +298,196 @@ PyMODINIT_FUNC initcheckpoint(void) {
block_timer();
}
+/* colo functions */
+
+/* master slaver comment
+ * "continue" ===>
+ * <=== "suspend" guest is suspended
+ */
+static int notify_slaver_suspend(CheckpointObject *self)
+{
+ int fd = self->cps.fd;
+
+ if (self->first_time == 1)
+ return 0;
+
+ return write_exact(fd, "continue", 8);
+}
+
+static int wait_slaver_suspend(CheckpointObject *self)
+{
+ int fd = self->cps.fd;
+ xc_interface *xch = self->cps.xch;
+ char buf[8];
+
+ if (self->first_time == 1)
+ return 0;
+
+ if ( read_exact(fd, buf, 7) < 0) {
+ PERROR("read: suspend");
+ return -1;
+ }
+
+ buf[7] = '\0';
+ if (strcmp(buf, "suspend")) {
+ PERROR("read \"%s\", expect \"suspend\"", buf);
+ return -1;
+ }
+
+ return 0;
+}
+
+static int notify_slaver_start_checkpoint(CheckpointObject *self)
+{
+ int fd = self->cps.fd;
+ xc_interface *xch = self->cps.xch;
+
+ if (self->first_time == 1)
+ return 0;
+
+ if ( write_exact(fd, "start", 5) < 0) {
+ PERROR("write start");
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ * master slaver
+ * <==== "finish"
+ * flush packets
+ * "resume" ====>
+ * resume vm resume vm
+ * <==== "resume"
+ */
+static int notify_slaver_resume(CheckpointObject *self)
+{
+ int fd = self->cps.fd;
+ xc_interface *xch = self->cps.xch;
+ char buf[7];
+
+ /* wait slaver to finish update memory, device state... */
+ if ( read_exact(fd, buf, 6) < 0) {
+ PERROR("read: finish");
+ return -1;
+ }
+
+ buf[6] = '\0';
+ if (strcmp(buf, "finish")) {
+ ERROR("read \"%s\", expect \"finish\"", buf);
+ return -1;
+ }
+
+ if (!self->first_time)
+ /* flush queued packets now */
+ ioctl(self->dev_fd, COMP_IOCTFLUSH);
+
+ /* notify slaver to resume vm*/
+ if (write_exact(fd, "resume", 6) < 0) {
+ PERROR("write: resume");
+ return -1;
+ }
+
+ return 0;
+}
+
+static int install_fw_network(CheckpointObject *self)
+{
+ int rc;
+ PyObject* result;
+
+ PyEval_RestoreThread(self->threadstate);
+ result = PyObject_CallFunction(self->setup_cb, NULL);
+ self->threadstate = PyEval_SaveThread();
+
+ if (!result)
+ return -1;
+
+ if (result == Py_None || PyObject_IsTrue(result))
+ rc = 0;
+ else
+ rc = -1;
+
+ Py_DECREF(result);
+
+ return rc;
+}
+
+static int wait_slaver_resume(CheckpointObject *self)
+{
+ int fd = self->cps.fd;
+ xc_interface *xch = self->cps.xch;
+ char buf[7];
+
+ if (read_exact(fd, buf, 6) < 0) {
+ PERROR("read resume");
+ return -1;
+ }
+
+ buf[6] = '\0';
+ if (strcmp(buf, "resume")) {
+ ERROR("read \"%s\", expect \"resume\"", buf);
+ return -1;
+ }
+
+ return 0;
+}
+
+static int colo_postresume(CheckpointObject *self)
+{
+ int rc;
+ int dev_fd = self->dev_fd;
+
+ rc = wait_slaver_resume(self);
+ if (rc < 0)
+ return rc;
+
+ if (self->first_time) {
+ rc = install_fw_network(self);
+ if (rc < 0) {
+ fprintf(stderr, "install network fails\n");
+ return rc;
+ }
+ } else {
+ ioctl(dev_fd, COMP_IOCTRESUME);
+ }
+
+ return 0;
+}
+
+static int pre_checkpoint(CheckpointObject *self)
+{
+ xc_interface *xch = self->cps.xch;
+
+ if (!self->first_time)
+ return 0;
+
+ self->dev_fd = open("/dev/HA_compare", O_RDWR);
+ if (self->dev_fd < 0) {
+ PERROR("opening /dev/HA_compare fails");
+ return -1;
+ }
+
+ return 0;
+}
+
+static void wait_new_checkpoint(CheckpointObject *self)
+{
+ int dev_fd = self->dev_fd;
+ int err;
+
+ while (1) {
+ err = ioctl(dev_fd, COMP_IOCTWAIT);
+ if (err == 0)
+ break;
+
+ if (err == -1 && errno != ERESTART && errno != ETIME) {
+ fprintf(stderr, "ioctl() returns -1, errno: %d\n", errno);
+ }
+ }
+}
+
/* private functions */
/* bounce C suspend call into python equivalent.
@@ -289,6 +498,13 @@ static int suspend_trampoline(void* data)
PyObject* result;
+ if (self->colo) {
+ if (notify_slaver_suspend(self) < 0) {
+ fprintf(stderr, "nofitying slaver suspend fails\n");
+ return 0;
+ }
+ }
+
/* call default suspend function, then python hook if available */
if (self->armed) {
if (checkpoint_wait(&self->cps) < 0) {
@@ -307,8 +523,16 @@ static int suspend_trampoline(void* data)
}
}
+ /* suspend_cb() should be called after both sides are suspended */
+ if (self->colo) {
+ if (wait_slaver_suspend(self) < 0) {
+ fprintf(stderr, "waiting slaver suspend fails\n");
+ return 0;
+ }
+ }
+
if (!self->suspend_cb)
- return 1;
+ goto start_checkpoint;
PyEval_RestoreThread(self->threadstate);
result = PyObject_CallFunction(self->suspend_cb, NULL);
@@ -319,12 +543,32 @@ static int suspend_trampoline(void* data)
if (result == Py_None || PyObject_IsTrue(result)) {
Py_DECREF(result);
- return 1;
+ goto start_checkpoint;
}
Py_DECREF(result);
return 0;
+
+start_checkpoint:
+ if (self->colo) {
+ if (notify_slaver_start_checkpoint(self) < 0) {
+ fprintf(stderr, "nofitying slaver to start checkpoint fails\n");
+ return 0;
+ }
+
+ /* PVM is suspended first when doing live migration,
+ * and then it is suspended for a new checkpoint.
+ */
+ if (self->first_time == 1)
+ /* live migration */
+ self->first_time = 2;
+ else if (self->first_time == 2)
+ /* the first checkpoint */
+ self->first_time = 0;
+ }
+
+ return 1;
}
static int postcopy_trampoline(void* data)
@@ -334,6 +578,13 @@ static int postcopy_trampoline(void* data)
PyObject* result;
int rc = 0;
+ if (self->colo) {
+ if (notify_slaver_resume(self) < 0) {
+ fprintf(stderr, "nofitying slaver resume fails\n");
+ return 0;
+ }
+ }
+
if (!self->postcopy_cb)
goto resume;
@@ -352,6 +603,13 @@ static int postcopy_trampoline(void* data)
return 0;
}
+ if (self->colo) {
+ if (colo_postresume(self) < 0) {
+ fprintf(stderr, "postresume fails\n");
+ return 0;
+ }
+ }
+
return rc;
}
@@ -366,8 +624,15 @@ static int checkpoint_trampoline(void* data)
return -1;
}
+ if (self->colo) {
+ if (pre_checkpoint(self) < 0) {
+ fprintf(stderr, "pre_checkpoint() fails\n");
+ return -1;
+ }
+ }
+
if (!self->checkpoint_cb)
- return 0;
+ goto wait_checkpoint;
PyEval_RestoreThread(self->threadstate);
result = PyObject_CallFunction(self->checkpoint_cb, NULL);
@@ -378,10 +643,39 @@ static int checkpoint_trampoline(void* data)
if (result == Py_None || PyObject_IsTrue(result)) {
Py_DECREF(result);
- return 1;
+ goto wait_checkpoint;
}
Py_DECREF(result);
return 0;
+
+wait_checkpoint:
+ if (self->colo) {
+ wait_new_checkpoint(self);
+ }
+
+ fprintf(stderr, "\n\nnew checkpoint..........\n");
+
+ return 1;
+}
+
+static int post_sendstate_trampoline(void* data)
+{
+ CheckpointObject *self = data;
+ int fd = self->cps.fd;
+ int i = XC_SAVE_ID_LAST_CHECKPOINT;
+
+ if (!self->colo)
+ return 0;
+
+ /* In colo mode, guest is running on slaver side, so we should
+ * send XC_SAVE_ID_LAST_CHECKPOINT to slaver.
+ */
+ if (write_exact(fd, &i, sizeof(int)) < 0) {
+ fprintf(stderr, "writing XC_SAVE_ID_LAST_CHECKPOINT fails\n");
+ return -1;
+ }
+
+ return 0;
}
diff --git a/tools/python/xen/lowlevel/checkpoint/checkpoint.h
b/tools/python/xen/lowlevel/checkpoint/checkpoint.h
index 187d9d7..96fc949 100644
--- a/tools/python/xen/lowlevel/checkpoint/checkpoint.h
+++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.h
@@ -41,6 +41,7 @@ typedef struct {
} checkpoint_state;
#define CHECKPOINT_FLAGS_COMPRESSION 1
+#define CHECKPOINT_FLAGS_COLO 2
char* checkpoint_error(checkpoint_state* s);
void checkpoint_init(checkpoint_state* s);
--
1.7.4
_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel
|
![]() |
Lists.xenproject.org is hosted with RackSpace, monitoring our |