# HG changeset patch
# User Brendan Cully <brendan@xxxxxxxxx>
# Date 1258073720 28800
# Node ID 213fb814acf431d2a382e8f9c09b4cea106c0958
# Parent accded2f185f4178f875b170a5c01544648a68d2
Remus: add python control extensions
Signed-off-by: Brendan Cully <brendan@xxxxxxxxx>
diff --git a/tools/python/setup.py b/tools/python/setup.py
--- a/tools/python/setup.py
+++ b/tools/python/setup.py
@@ -67,10 +67,28 @@
libraries = libraries,
sources = [ "ptsname/ptsname.c" ])
+checkpoint = Extension("checkpoint",
+ extra_compile_args = extra_compile_args,
+ include_dirs = include_dirs,
+ library_dirs = library_dirs,
+ libraries = libraries + [ "rt" ],
+ sources = [
"xen/lowlevel/checkpoint/checkpoint.c",
+
"xen/lowlevel/checkpoint/libcheckpoint.c"])
+
+netlink = Extension("netlink",
+ extra_compile_args = extra_compile_args,
+ include_dirs = include_dirs,
+ library_dirs = library_dirs,
+ libraries = libraries,
+ sources = [ "xen/lowlevel/netlink/netlink.c",
+
"xen/lowlevel/netlink/libnetlink.c"])
+
modules = [ xc, xs, ptsname, acm, flask ]
-if os.uname()[0] == 'SunOS':
- modules.append(scf)
- modules.append(process)
+plat = os.uname()[0]
+if plat == 'SunOS':
+ modules.extend([ scf, process ])
+if plat == 'Linux':
+ modules.extend([ checkpoint, netlink ])
setup(name = 'xen',
version = '3.0',
@@ -89,6 +107,7 @@
'xen.web',
'xen.sv',
'xen.xsview',
+ 'xen.remus',
'xen.xend.tests',
'xen.xend.server.tests',
diff --git a/tools/python/xen/lowlevel/checkpoint/checkpoint.c
b/tools/python/xen/lowlevel/checkpoint/checkpoint.c
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.c
@@ -0,0 +1,363 @@
+/* python bridge to checkpointing API */
+
+#include <Python.h>
+
+#include <xs.h>
+#include <xenctrl.h>
+
+#include "checkpoint.h"
+
+#define PKG "xen.lowlevel.checkpoint"
+
+static PyObject* CheckpointError;
+
+typedef struct {
+ PyObject_HEAD
+ checkpoint_state cps;
+
+ /* milliseconds between checkpoints */
+ unsigned int interval;
+ int armed;
+
+ PyObject* suspend_cb;
+ PyObject* postcopy_cb;
+ PyObject* checkpoint_cb;
+
+ PyThreadState* threadstate;
+} CheckpointObject;
+
+static int suspend_trampoline(void* data);
+static int postcopy_trampoline(void* data);
+static int checkpoint_trampoline(void* data);
+
+static PyObject* Checkpoint_new(PyTypeObject* type, PyObject* args,
+ PyObject* kwargs)
+{
+ CheckpointObject* self = (CheckpointObject*)type->tp_alloc(type, 0);
+
+ if (!self)
+ return NULL;
+
+ checkpoint_init(&self->cps);
+ self->suspend_cb = NULL;
+ self->armed = 0;
+
+ return (PyObject*)self;
+}
+
+static int Checkpoint_init(PyObject* obj, PyObject* args, PyObject* kwargs)
+{
+ return 0;
+}
+
+static void Checkpoint_dealloc(CheckpointObject* self)
+{
+ checkpoint_close(&self->cps);
+
+ self->ob_type->tp_free((PyObject*)self);
+}
+
+static PyObject* pycheckpoint_open(PyObject* obj, PyObject* args)
+{
+ CheckpointObject* self = (CheckpointObject*)obj;
+ checkpoint_state* cps = &self->cps;
+ unsigned int domid;
+
+ if (!PyArg_ParseTuple(args, "I", &domid))
+ return NULL;
+
+ if (checkpoint_open(cps, domid) < 0) {
+ PyErr_SetString(CheckpointError, checkpoint_error(cps));
+
+ return NULL;
+ }
+
+ Py_RETURN_NONE;
+}
+
+static PyObject* pycheckpoint_close(PyObject* obj, PyObject* args)
+{
+ CheckpointObject* self = (CheckpointObject*)obj;
+
+ checkpoint_close(&self->cps);
+
+ Py_XDECREF(self->suspend_cb);
+ self->suspend_cb = NULL;
+ Py_XDECREF(self->postcopy_cb);
+ self->postcopy_cb = NULL;
+ Py_XDECREF(self->checkpoint_cb);
+ self->checkpoint_cb = NULL;
+
+ Py_RETURN_NONE;
+}
+
+static PyObject* pycheckpoint_start(PyObject* obj, PyObject* args) {
+ CheckpointObject* self = (CheckpointObject*)obj;
+
+ PyObject* iofile;
+ PyObject* suspend_cb = NULL;
+ PyObject* postcopy_cb = NULL;
+ PyObject* checkpoint_cb = NULL;
+ unsigned int interval = 0;
+
+ int fd;
+ struct save_callbacks callbacks;
+ int rc;
+
+ if (!PyArg_ParseTuple(args, "O|OOOI", &iofile, &suspend_cb, &postcopy_cb,
+ &checkpoint_cb, &interval))
+ return NULL;
+
+ self->interval = interval;
+
+ Py_INCREF(iofile);
+ Py_XINCREF(suspend_cb);
+ Py_XINCREF(postcopy_cb);
+ Py_XINCREF(checkpoint_cb);
+
+ fd = PyObject_AsFileDescriptor(iofile);
+ Py_DECREF(iofile);
+ if (fd < 0) {
+ PyErr_SetString(PyExc_TypeError, "invalid file handle");
+ return NULL;
+ }
+
+ if (suspend_cb && suspend_cb != Py_None) {
+ if (!PyCallable_Check(suspend_cb)) {
+ PyErr_SetString(PyExc_TypeError, "suspend callback not callable");
+ goto err;
+ }
+ self->suspend_cb = suspend_cb;
+ } else
+ self->suspend_cb = NULL;
+
+ if (postcopy_cb && postcopy_cb != Py_None) {
+ if (!PyCallable_Check(postcopy_cb)) {
+ PyErr_SetString(PyExc_TypeError, "postcopy callback not callable");
+ return NULL;
+ }
+ self->postcopy_cb = postcopy_cb;
+ } else
+ self->postcopy_cb = NULL;
+
+ if (checkpoint_cb && checkpoint_cb != Py_None) {
+ if (!PyCallable_Check(checkpoint_cb)) {
+ PyErr_SetString(PyExc_TypeError, "checkpoint callback not callable");
+ return NULL;
+ }
+ self->checkpoint_cb = checkpoint_cb;
+ } else
+ self->checkpoint_cb = NULL;
+
+ callbacks.suspend = suspend_trampoline;
+ callbacks.postcopy = postcopy_trampoline;
+ callbacks.checkpoint = checkpoint_trampoline;
+ callbacks.data = self;
+
+ self->threadstate = PyEval_SaveThread();
+ rc = checkpoint_start(&self->cps, fd, &callbacks);
+ PyEval_RestoreThread(self->threadstate);
+
+ if (rc < 0) {
+ PyErr_SetString(CheckpointError, checkpoint_error(&self->cps));
+ goto err;
+ }
+
+ Py_RETURN_NONE;
+
+ err:
+ self->suspend_cb = NULL;
+ Py_XDECREF(suspend_cb);
+ self->postcopy_cb = NULL;
+ Py_XDECREF(postcopy_cb);
+ self->checkpoint_cb = NULL;
+ Py_XDECREF(checkpoint_cb);
+
+ return NULL;
+}
+
+static PyMethodDef Checkpoint_methods[] = {
+ { "open", pycheckpoint_open, METH_VARARGS,
+ "open connection to xen" },
+ { "close", pycheckpoint_close, METH_NOARGS,
+ "close connection to xen" },
+ { "start", pycheckpoint_start, METH_VARARGS | METH_KEYWORDS,
+ "begin a checkpoint" },
+ { NULL, NULL, 0, NULL }
+};
+
+static PyTypeObject CheckpointType = {
+ PyObject_HEAD_INIT(NULL)
+ 0, /* ob_size */
+ PKG ".checkpointer", /* tp_name */
+ sizeof(CheckpointObject), /* tp_basicsize */
+ 0, /* tp_itemsize */
+ (destructor)Checkpoint_dealloc, /* tp_dealloc */
+ NULL, /* tp_print */
+ NULL, /* tp_getattr */
+ NULL, /* tp_setattr */
+ NULL, /* tp_compare */
+ NULL, /* tp_repr */
+ NULL, /* tp_as_number */
+ NULL, /* tp_as_sequence */
+ NULL, /* tp_as_mapping */
+ NULL, /* tp_hash */
+ NULL, /* tp_call */
+ NULL, /* tp_str */
+ NULL, /* tp_getattro */
+ NULL, /* tp_setattro */
+ NULL, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT, /* tp_flags */
+ "Checkpoint object", /* tp_doc */
+ NULL, /* tp_traverse */
+ NULL, /* tp_clear */
+ NULL, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ NULL, /* tp_iter */
+ NULL, /* tp_iternext */
+ Checkpoint_methods, /* tp_methods */
+ NULL, /* tp_members */
+ NULL, /* tp_getset */
+ NULL, /* tp_base */
+ NULL, /* tp_dict */
+ NULL, /* tp_descr_get */
+ NULL, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ (initproc)Checkpoint_init, /* tp_init */
+ NULL, /* tp_alloc */
+ Checkpoint_new, /* tp_new */
+};
+
+static PyMethodDef methods[] = {
+ { NULL }
+};
+
+static char doc[] = "checkpoint API";
+
+PyMODINIT_FUNC initcheckpoint(void) {
+ PyObject *m;
+
+ if (PyType_Ready(&CheckpointType) < 0)
+ return;
+
+ m = Py_InitModule3(PKG, methods, doc);
+
+ if (!m)
+ return;
+
+ Py_INCREF(&CheckpointType);
+ PyModule_AddObject(m, "checkpointer", (PyObject*)&CheckpointType);
+
+ CheckpointError = PyErr_NewException(PKG ".error", NULL, NULL);
+ Py_INCREF(CheckpointError);
+ PyModule_AddObject(m, "error", CheckpointError);
+
+ block_timer();
+}
+
+/* private functions */
+
+/* bounce C suspend call into python equivalent.
+ * returns 1 on success or 0 on failure */
+static int suspend_trampoline(void* data)
+{
+ CheckpointObject* self = (CheckpointObject*)data;
+
+ PyObject* result;
+
+ /* call default suspend function, then python hook if available */
+ if (self->armed) {
+ if (checkpoint_wait(&self->cps) < 0) {
+ fprintf(stderr, "%s\n", checkpoint_error(&self->cps));
+ return 0;
+ }
+ } else {
+ if (self->interval) {
+ self->armed = 1;
+ checkpoint_settimer(&self->cps, self->interval);
+ }
+
+ if (!checkpoint_suspend(&self->cps)) {
+ fprintf(stderr, "%s\n", checkpoint_error(&self->cps));
+ return 0;
+ }
+ }
+
+ if (!self->suspend_cb)
+ return 1;
+
+ PyEval_RestoreThread(self->threadstate);
+ result = PyObject_CallFunction(self->suspend_cb, NULL);
+ self->threadstate = PyEval_SaveThread();
+
+ if (!result)
+ return 0;
+
+ if (result == Py_None || PyObject_IsTrue(result)) {
+ Py_DECREF(result);
+ return 1;
+ }
+
+ Py_DECREF(result);
+
+ return 0;
+}
+
+static int postcopy_trampoline(void* data)
+{
+ CheckpointObject* self = (CheckpointObject*)data;
+
+ PyObject* result;
+ int rc = 0;
+
+ if (!self->postcopy_cb)
+ goto resume;
+
+ PyEval_RestoreThread(self->threadstate);
+ result = PyObject_CallFunction(self->postcopy_cb, NULL);
+
+ if (result && (result == Py_None || PyObject_IsTrue(result)))
+ rc = 1;
+
+ Py_XDECREF(result);
+ self->threadstate = PyEval_SaveThread();
+
+ resume:
+ if (checkpoint_resume(&self->cps) < 0) {
+ fprintf(stderr, "%s\n", checkpoint_error(&self->cps));
+ return 0;
+ }
+
+ return rc;
+}
+
+static int checkpoint_trampoline(void* data)
+{
+ CheckpointObject* self = (CheckpointObject*)data;
+
+ PyObject* result;
+
+ if (checkpoint_postflush(&self->cps) < 0) {
+ fprintf(stderr, "%s\n", checkpoint_error(&self->cps));
+ return -1;
+ }
+
+ if (!self->checkpoint_cb)
+ return 0;
+
+ PyEval_RestoreThread(self->threadstate);
+ result = PyObject_CallFunction(self->checkpoint_cb, NULL);
+ self->threadstate = PyEval_SaveThread();
+
+ if (!result)
+ return 0;
+
+ if (result == Py_None || PyObject_IsTrue(result)) {
+ Py_DECREF(result);
+ return 1;
+ }
+
+ Py_DECREF(result);
+
+ return 0;
+}
diff --git a/tools/python/xen/lowlevel/checkpoint/checkpoint.h
b/tools/python/xen/lowlevel/checkpoint/checkpoint.h
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.h
@@ -0,0 +1,59 @@
+/* API for checkpointing */
+
+#ifndef _CHECKPOINT_H_
+#define _CHECKPOINT_H_ 1
+
+#include <pthread.h>
+#include <semaphore.h>
+#include <time.h>
+
+#include <xenguest.h>
+#include <xs.h>
+
+typedef enum {
+ dt_unknown,
+ dt_pv,
+ dt_hvm,
+ dt_pvhvm /* HVM with PV drivers */
+} checkpoint_domtype;
+
+typedef struct {
+ int xch; /* xc handle */
+ int xce; /* event channel handle */
+ struct xs_handle* xsh; /* xenstore handle */
+ int watching_shutdown; /* state of watch on @releaseDomain */
+
+ unsigned int domid;
+ checkpoint_domtype domtype;
+ int fd;
+
+ int suspend_evtchn;
+
+ char* errstr;
+
+ /* suspend deadline thread support */
+ volatile int suspended;
+ volatile int done;
+ pthread_t suspend_thr;
+ sem_t suspended_sem;
+ sem_t resumed_sem;
+ timer_t timer;
+} checkpoint_state;
+
+char* checkpoint_error(checkpoint_state* s);
+
+void checkpoint_init(checkpoint_state* s);
+int checkpoint_open(checkpoint_state* s, unsigned int domid);
+void checkpoint_close(checkpoint_state* s);
+int checkpoint_start(checkpoint_state* s, int fd,
+ struct save_callbacks* callbacks);
+int checkpoint_suspend(checkpoint_state* s);
+int checkpoint_resume(checkpoint_state* s);
+int checkpoint_postflush(checkpoint_state* s);
+
+int checkpoint_settimer(checkpoint_state* s, int millis);
+int checkpoint_wait(checkpoint_state* s);
+void block_timer(void);
+void unblock_timer(void);
+
+#endif
diff --git a/tools/python/xen/lowlevel/checkpoint/libcheckpoint.c
b/tools/python/xen/lowlevel/checkpoint/libcheckpoint.c
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/lowlevel/checkpoint/libcheckpoint.c
@@ -0,0 +1,782 @@
+/* API for checkpointing */
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <signal.h>
+#include <sys/stat.h>
+
+#include <xenctrl.h>
+#include <xenguest.h>
+#include <xs.h>
+
+#include "checkpoint.h"
+
+static char errbuf[256];
+
+static int setup_suspend_evtchn(checkpoint_state* s);
+static void release_suspend_evtchn(checkpoint_state *s);
+static int setup_shutdown_watch(checkpoint_state* s);
+static int check_shutdown_watch(checkpoint_state* s);
+static void release_shutdown_watch(checkpoint_state* s);
+static int poll_evtchn(checkpoint_state* s);
+
+static int switch_qemu_logdirty(checkpoint_state* s, int enable);
+static int suspend_hvm(checkpoint_state* s);
+static int suspend_qemu(checkpoint_state* s);
+static int resume_qemu(checkpoint_state* s);
+static int send_qemu(checkpoint_state* s);
+
+static int create_suspend_timer(checkpoint_state* s);
+static int delete_suspend_timer(checkpoint_state* s);
+static int create_suspend_thread(checkpoint_state* s);
+static void stop_suspend_thread(checkpoint_state* s);
+
+/* Returns a string describing the most recent error returned by
+ * a checkpoint function. Static -- do not free. */
+char* checkpoint_error(checkpoint_state* s)
+{
+ return s->errstr;
+}
+
+void checkpoint_init(checkpoint_state* s)
+{
+ s->xch = -1;
+ s->xce = -1;
+ s->xsh = NULL;
+ s->watching_shutdown = 0;
+
+ s->domid = 0;
+ s->domtype = dt_unknown;
+ s->fd = -1;
+
+ s->suspend_evtchn = -1;
+
+ s->errstr = NULL;
+
+ s->suspended = 0;
+ s->done = 0;
+ s->suspend_thr = 0;
+ s->timer = 0;
+}
+
+/* open a checkpoint session to guest domid */
+int checkpoint_open(checkpoint_state* s, unsigned int domid)
+{
+ xc_dominfo_t dominfo;
+ unsigned long pvirq;
+
+ s->domid = domid;
+
+ s->xch = xc_interface_open();
+ if (s->xch < 0) {
+ s->errstr = "could not open control interface (are you root?)";
+
+ return -1;
+ }
+
+ s->xsh = xs_daemon_open();
+ if (!s->xsh) {
+ checkpoint_close(s);
+ s->errstr = "could not open xenstore handle";
+
+ return -1;
+ }
+
+ s->xce = xc_evtchn_open();
+ if (s->xce < 0) {
+ checkpoint_close(s);
+ s->errstr = "could not open event channel handle";
+
+ return -1;
+ }
+
+ if (xc_domain_getinfo(s->xch, s->domid, 1, &dominfo) < 0) {
+ checkpoint_close(s);
+ s->errstr = "could not get domain info";
+
+ return -1;
+ }
+ if (dominfo.hvm) {
+ if (xc_get_hvm_param(s->xch, s->domid, HVM_PARAM_CALLBACK_IRQ, &pvirq))
{
+ checkpoint_close(s);
+ s->errstr = "could not get HVM callback IRQ";
+
+ return -1;
+ }
+ s->domtype = pvirq ? dt_pvhvm : dt_hvm;
+ } else
+ s->domtype = dt_pv;
+
+ if (setup_shutdown_watch(s) < 0) {
+ checkpoint_close(s);
+
+ return -1;
+ }
+
+ if (s->domtype == dt_pv) {
+ if (setup_suspend_evtchn(s) < 0) {
+ checkpoint_close(s);
+
+ return -1;
+ }
+ } else if (s->domtype == dt_pvhvm) {
+ checkpoint_close(s);
+ s->errstr = "PV-on-HVM is unsupported";
+
+ return -1;
+ }
+
+ return 0;
+}
+
+void checkpoint_close(checkpoint_state* s)
+{
+ if (s->timer)
+ delete_suspend_timer(s);
+ if (s->suspend_thr)
+ stop_suspend_thread(s);
+
+ release_shutdown_watch(s);
+ release_suspend_evtchn(s);
+
+ if (s->xch >= 0) {
+ xc_interface_close(s->xch);
+ s->xch = -1;
+ }
+ if (s->xce >= 0) {
+ xc_evtchn_close(s->xce);
+ s->xce = -1;
+ }
+ if (s->xsh) {
+ xs_daemon_close(s->xsh);
+ s->xsh = NULL;
+ }
+
+ s->domid = 0;
+ s->fd = -1;
+ s->suspend_evtchn = -1;
+}
+
+/* we toggle logdirty ourselves around the xc_domain_save call --
+ * it avoids having to pass around checkpoint_state */
+static void noop_switch_logdirty(int domid, unsigned enable)
+{
+ return;
+}
+
+int checkpoint_start(checkpoint_state* s, int fd,
+ struct save_callbacks* callbacks)
+{
+ int hvm, rc;
+ int flags = XCFLAGS_LIVE;
+
+ if (!s->domid) {
+ s->errstr = "checkpoint state not opened";
+ return -1;
+ }
+
+ s->fd = fd;
+
+ hvm = s->domtype > dt_pv;
+ if (hvm) {
+ flags |= XCFLAGS_HVM;
+ if ((rc = switch_qemu_logdirty(s, 1)))
+ return rc;
+ }
+
+ rc = xc_domain_save(s->xch, fd, s->domid, 0, 0, flags, callbacks, hvm,
+ noop_switch_logdirty);
+
+ if (hvm)
+ switch_qemu_logdirty(s, 0);
+
+ return rc;
+}
+
+/* suspend the domain. Returns 0 on failure, 1 on success */
+int checkpoint_suspend(checkpoint_state* s)
+{
+ struct timeval tv;
+ int rc;
+
+ gettimeofday(&tv, NULL);
+ fprintf(stderr, "PROF: suspending at %lu.%06lu\n", (unsigned long)tv.tv_sec,
+ (unsigned long)tv.tv_usec);
+
+ if (s->domtype == dt_hvm) {
+ return suspend_hvm(s) < 0 ? 0 : 1;
+ }
+
+ rc = xc_evtchn_notify(s->xce, s->suspend_evtchn);
+ if (rc < 0) {
+ snprintf(errbuf, sizeof(errbuf),
+ "failed to notify suspend event channel: %d", rc);
+ s->errstr = errbuf;
+
+ return 0;
+ }
+
+ do {
+ rc = poll_evtchn(s);
+ } while (rc >= 0 && rc != s->suspend_evtchn);
+ if (rc <= 0) {
+ snprintf(errbuf, sizeof(errbuf),
+ "failed to receive suspend notification: %d", rc);
+ s->errstr = errbuf;
+
+ return 0;
+ }
+ if (xc_evtchn_unmask(s->xce, s->suspend_evtchn) < 0) {
+ snprintf(errbuf, sizeof(errbuf),
+ "failed to unmask suspend notification channel: %d", rc);
+ s->errstr = errbuf;
+
+ return 0;
+ }
+
+ return 1;
+}
+
+/* wait for a suspend to be triggered by another thread */
+int checkpoint_wait(checkpoint_state* s)
+{
+ int rc;
+
+ if (!s->suspend_thr) {
+ s->errstr = "checkpoint timer is not active\n";
+ return -1;
+ }
+
+ do {
+ rc = sem_wait(&s->suspended_sem);
+ if (rc < 0 && errno != EINTR) {
+ snprintf(errbuf, sizeof(errbuf),
+ "error waiting for suspend semaphore: %d %d\n", rc, errno);
+ s->errstr = errbuf;
+ return -1;
+ }
+ } while (rc < 0);
+
+ if (!s->suspended) {
+ snprintf(errbuf, sizeof(errbuf), "domain not suspended?\n");
+ s->errstr = errbuf;
+ return -1;
+ }
+
+ return 0;
+}
+
+/* let guest execution resume */
+int checkpoint_resume(checkpoint_state* s)
+{
+ struct timeval tv;
+ int rc;
+
+ if (xc_domain_resume(s->xch, s->domid, 1)) {
+ snprintf(errbuf, sizeof(errbuf), "error resuming domain: %d", errno);
+ s->errstr = errbuf;
+
+ return -1;
+ }
+
+ gettimeofday(&tv, NULL);
+ fprintf(stderr, "PROF: resumed at %lu.%06lu\n", (unsigned long)tv.tv_sec,
+ (unsigned long)tv.tv_usec);
+
+ if (s->domtype > dt_pv && resume_qemu(s) < 0)
+ return -1;
+
+ /* restore watchability in xenstore */
+ if (xs_resume_domain(s->xsh, s->domid) < 0)
+ fprintf(stderr, "error resuming domain in xenstore\n");
+
+ s->suspended = 0;
+
+ if (s->suspend_thr) {
+ if ((rc = sem_post(&s->resumed_sem)))
+ fprintf(stderr, "error posting resume semaphore\n");
+ }
+
+ return 0;
+}
+
+/* called after xc_domain_save has flushed its buffer */
+int checkpoint_postflush(checkpoint_state *s)
+{
+ if (s->domtype > dt_pv && send_qemu(s) < 0)
+ return -1;
+
+ return 0;
+}
+
+/* force suspend within millis ms if copy hasn't completed yet */
+int checkpoint_settimer(checkpoint_state* s, int millis)
+{
+ struct itimerspec t;
+ int err;
+
+ if (!s->suspend_thr) {
+ if (create_suspend_timer(s) < 0)
+ return -1;
+
+ if (create_suspend_thread(s) < 0) {
+ delete_suspend_timer(s);
+ return -1;
+ }
+ }
+
+ t.it_value.tv_sec = millis / 1000;
+ t.it_value.tv_nsec = (millis % 1000) * 1000000L;
+ t.it_interval.tv_sec = t.it_value.tv_sec;
+ t.it_interval.tv_nsec = t.it_value.tv_nsec;
+
+ if ((err = timer_settime(s->timer, 0, &t, NULL))) {
+ fprintf(stderr, "Error arming timer: %d\n", err);
+ return -1;
+ }
+
+ return 0;
+}
+
+int delete_suspend_timer(checkpoint_state* s)
+{
+ int rc = 0;
+
+ if (s->timer) {
+ if ((rc = timer_delete(s->timer)))
+ fprintf(stderr, "Error deleting timer: %s\n", strerror(errno));
+ s->timer = NULL;
+ }
+
+ return rc;
+}
+
+/* Set up event channel used to signal a guest to suspend itself */
+static int setup_suspend_evtchn(checkpoint_state* s)
+{
+ int port;
+
+ port = xs_suspend_evtchn_port(s->domid);
+ if (port < 0) {
+ s->errstr = "failed to read suspend event channel";
+ return -1;
+ }
+
+ s->suspend_evtchn = xc_suspend_evtchn_init(s->xch, s->xce, s->domid, port);
+ if (s->suspend_evtchn < 0) {
+ snprintf(errbuf, sizeof(errbuf), "failed to bind suspend event channel");
+ s->errstr = errbuf;
+
+ return -1;
+ }
+
+ fprintf(stderr, "bound to suspend event channel %u:%d as %d\n", s->domid,
port,
+ s->suspend_evtchn);
+
+ return 0;
+}
+
+/* release suspend event channels bound to guest */
+static void release_suspend_evtchn(checkpoint_state *s)
+{
+ /* TODO: teach xen to clean up if port is unbound */
+ if (s->xce >= 0 && s->suspend_evtchn > 0) {
+ xc_suspend_evtchn_release(s->xce, s->suspend_evtchn);
+ s->suspend_evtchn = 0;
+ }
+}
+
+static int setup_shutdown_watch(checkpoint_state* s)
+{
+ char buf[16];
+
+ /* write domain ID to watch so we can ignore other domain shutdowns */
+ snprintf(buf, sizeof(buf), "%u", s->domid);
+ if ( !xs_watch(s->xsh, "@releaseDomain", buf) ) {
+ fprintf(stderr, "Could not bind to shutdown watch\n");
+ return -1;
+ }
+ /* watch fires once on registration */
+ s->watching_shutdown = 1;
+ check_shutdown_watch(s);
+
+ return 0;
+}
+
+static int check_shutdown_watch(checkpoint_state* s) {
+ unsigned int count;
+ char **vec;
+ char buf[16];
+
+ vec = xs_read_watch(s->xsh, &count);
+ if (s->watching_shutdown == 1) {
+ s->watching_shutdown = 2;
+ return 0;
+ }
+ if (!vec) {
+ fprintf(stderr, "empty watch fired\n");
+ return 0;
+ }
+ snprintf(buf, sizeof(buf), "%d", s->domid);
+ if (!strcmp(vec[XS_WATCH_TOKEN], buf)) {
+ fprintf(stderr, "domain %d shut down\n", s->domid);
+ return -1;
+ }
+
+ return 0;
+}
+
+static void release_shutdown_watch(checkpoint_state* s) {
+ char buf[16];
+
+ if (!s->xsh)
+ return;
+
+ if (!s->watching_shutdown)
+ return;
+
+ snprintf(buf, sizeof(buf), "%u", s->domid);
+ if (!xs_unwatch(s->xsh, "@releaseDomain", buf))
+ fprintf(stderr, "Could not release shutdown watch\n");
+}
+
+/* wrapper around xc_evtchn_pending which detects errors */
+static int poll_evtchn(checkpoint_state* s)
+{
+ int fd, xsfd, maxfd;
+ fd_set rfds, efds;
+ struct timeval tv;
+ int rc;
+
+ fd = xc_evtchn_fd(s->xce);
+ xsfd = xs_fileno(s->xsh);
+ maxfd = fd > xsfd ? fd : xsfd;
+ FD_ZERO(&rfds);
+ FD_ZERO(&efds);
+ FD_SET(fd, &rfds);
+ FD_SET(xsfd, &rfds);
+ FD_SET(fd, &efds);
+ FD_SET(xsfd, &efds);
+
+ /* give it 500 ms to respond */
+ tv.tv_sec = 0;
+ tv.tv_usec = 500000;
+
+ rc = select(maxfd + 1, &rfds, NULL, &efds, &tv);
+ if (rc < 0)
+ fprintf(stderr, "error polling event channel: %s\n", strerror(errno));
+ else if (!rc)
+ fprintf(stderr, "timeout waiting for event channel\n");
+ else if (FD_ISSET(fd, &rfds))
+ return xc_evtchn_pending(s->xce);
+ else if (FD_ISSET(xsfd, &rfds))
+ return check_shutdown_watch(s);
+
+ return -1;
+}
+
+/* adapted from the eponymous function in xc_save */
+static int switch_qemu_logdirty(checkpoint_state *s, int enable)
+{
+ char path[128];
+ char *tail, *cmd, *response;
+ char **vec;
+ unsigned int len;
+
+ sprintf(path, "/local/domain/0/device-model/%u/logdirty/", s->domid);
+ tail = path + strlen(path);
+
+ strcpy(tail, "ret");
+ if (!xs_watch(s->xsh, path, "qemu-logdirty-ret")) {
+ s->errstr = "error watching qemu logdirty return";
+ return -1;
+ }
+ /* null fire. XXX unify with shutdown watch! */
+ vec = xs_read_watch(s->xsh, &len);
+ free(vec);
+
+ strcpy(tail, "cmd");
+ cmd = enable ? "enable" : "disable";
+ if (!xs_write(s->xsh, XBT_NULL, path, cmd, strlen(cmd))) {
+ s->errstr = "error signalling qemu logdirty";
+ return -1;
+ }
+
+ vec = xs_read_watch(s->xsh, &len);
+ free(vec);
+
+ strcpy(tail, "ret");
+ xs_unwatch(s->xsh, path, "qemu-logdirty-ret");
+
+ response = xs_read(s->xsh, XBT_NULL, path, &len);
+ if (!len || strcmp(response, cmd)) {
+ if (len)
+ free(response);
+ s->errstr = "qemu logdirty command failed";
+ return -1;
+ }
+ free(response);
+ fprintf(stderr, "qemu logdirty mode: %s\n", cmd);
+
+ return 0;
+}
+
+static int suspend_hvm(checkpoint_state *s)
+{
+ int rc = -1;
+
+ fprintf(stderr, "issuing HVM suspend hypercall\n");
+ rc = xc_domain_shutdown(s->xch, s->domid, SHUTDOWN_suspend);
+ if (rc < 0) {
+ s->errstr = "shutdown hypercall failed";
+ return -1;
+ }
+ fprintf(stderr, "suspend hypercall returned %d\n", rc);
+
+ if (check_shutdown_watch(s) >= 0)
+ return -1;
+
+ rc = suspend_qemu(s);
+
+ return rc;
+}
+
+static int suspend_qemu(checkpoint_state *s)
+{
+ char path[128];
+
+ fprintf(stderr, "pausing QEMU\n");
+
+ sprintf(path, "/local/domain/0/device-model/%d/command", s->domid);
+ if (!xs_write(s->xsh, XBT_NULL, path, "save", 4)) {
+ fprintf(stderr, "error signalling QEMU to save\n");
+ return -1;
+ }
+
+ sprintf(path, "/local/domain/0/device-model/%d/state", s->domid);
+
+ do {
+ char* state;
+ unsigned int len;
+
+ state = xs_read(s->xsh, XBT_NULL, path, &len);
+ if (!state) {
+ s->errstr = "error reading QEMU state";
+ return -1;
+ }
+
+ if (!strcmp(state, "paused")) {
+ free(state);
+ return 0;
+ }
+
+ free(state);
+ usleep(1000);
+ } while(1);
+
+ return -1;
+}
+
+static int resume_qemu(checkpoint_state *s)
+{
+ char path[128];
+ fprintf(stderr, "resuming QEMU\n");
+
+ sprintf(path, "/local/domain/0/device-model/%d/command", s->domid);
+ if (!xs_write(s->xsh, XBT_NULL, path, "continue", 8)) {
+ fprintf(stderr, "error signalling QEMU to resume\n");
+ return -1;
+ }
+
+ return 0;
+}
+
+static int send_qemu(checkpoint_state *s)
+{
+ char buf[8192];
+ char path[128];
+ struct stat sb;
+ uint32_t qlen = 0;
+ int qfd;
+ int rc;
+
+ if (s->fd < 0)
+ return -1;
+
+ sprintf(path, "/var/lib/xen/qemu-save.%d", s->domid);
+
+ if (stat(path, &sb) < 0) {
+ snprintf(errbuf, sizeof(errbuf),
+ "error getting QEMU state file status: %s", strerror(errno));
+ s->errstr = errbuf;
+ return -1;
+ }
+
+ qlen = sb.st_size;
+ qfd = open(path, O_RDONLY);
+ if (qfd < 0) {
+ snprintf(errbuf, sizeof(errbuf), "error opening QEMU state file: %s",
+ strerror(errno));
+ s->errstr = errbuf;
+ return -1;
+ }
+
+ fprintf(stderr, "Sending %u bytes of QEMU state\n", qlen);
+ if (write(s->fd, "RemusDeviceModelState", 21) != 21) {
+ s->errstr = "error writing QEMU header";
+ close(qfd);
+ return -1;
+ }
+ if (write(s->fd, &qlen, sizeof(qlen)) != sizeof(qlen)) {
+ s->errstr = "error writing QEMU size";
+ close(qfd);
+ return -1;
+ }
+
+ while ((rc = read(qfd, buf, qlen > sizeof(buf) ? sizeof(buf) : qlen)) > 0)
{
+ qlen -= rc;
+ if (write(s->fd, buf, rc) != rc) {
+ rc = -1;
+ break;
+ }
+ }
+ if (rc < 0) {
+ snprintf(errbuf, sizeof(errbuf), "error writing QEMU state: %s",
+ strerror(errno));
+ s->errstr = errbuf;
+ }
+
+ close(qfd);
+
+ return rc;
+}
+
+/*thread responsible to suspend the domain early if necessary*/
+static void *suspend_thread(void *arg)
+{
+ checkpoint_state* s = (checkpoint_state*)arg;
+ sigset_t tss;
+ int rc;
+ int sig;
+
+ fprintf(stderr, "Suspend thread started\n");
+
+ sigemptyset(&tss);
+ sigaddset(&tss, SIGRTMIN);
+
+ while (1) {
+ /* wait for checkpoint thread to signal resume */
+ if ((rc = sem_wait(&s->resumed_sem)))
+ fprintf(stderr, "Error waiting on resume semaphore\n");
+
+ if ((rc = sigwait(&tss, &sig))) {
+ fprintf(stderr, "sigwait failed: %d %d\n", rc, errno);
+ break;
+ }
+ if (sig != SIGRTMIN)
+ fprintf(stderr, "received unexpected signal %d\n", sig);
+
+ if (s->done)
+ break;
+
+ if (s->suspended) {
+ fprintf(stderr, "domain already suspended?\n");
+ } else {
+ rc = checkpoint_suspend(s);
+ if (rc)
+ s->suspended = 1;
+ else
+ fprintf(stderr, "checkpoint_suspend failed\n");
+ }
+
+ if ((rc = sem_post(&s->suspended_sem)))
+ fprintf(stderr, "Error posting suspend semaphore\n");
+ }
+
+ fprintf(stderr, "Suspend thread exiting\n");
+
+ return NULL;
+}
+
+static int create_suspend_timer(checkpoint_state* s)
+{
+ struct sigevent event;
+ int err;
+
+ event.sigev_notify = SIGEV_SIGNAL;
+ event.sigev_signo = SIGRTMIN;
+ event.sigev_value.sival_int = 0;
+
+ if ((err = timer_create(CLOCK_REALTIME, &event, &s->timer))) {
+ snprintf(errbuf, sizeof(errbuf), "Error creating timer: %d\n", err);
+ s->errstr = errbuf;
+ return -1;
+ }
+
+ return 0;
+}
+
+void block_timer(void)
+{
+ sigset_t tss;
+
+ sigemptyset(&tss);
+ sigaddset(&tss, SIGRTMIN);
+
+ pthread_sigmask(SIG_BLOCK, &tss, NULL);
+}
+
+void unblock_timer(void)
+{
+ sigset_t tss;
+
+ sigemptyset(&tss);
+ sigaddset(&tss, SIGRTMIN);
+
+ pthread_sigmask(SIG_UNBLOCK, &tss, NULL);
+}
+
+static int create_suspend_thread(checkpoint_state* s)
+{
+ int err;
+
+ if ((err = sem_init(&s->suspended_sem, 0, 0))) {
+ snprintf(errbuf, sizeof(errbuf),
+ "Error initializing suspend semaphore: %d\n", err);
+ s->errstr = errbuf;
+ return -1;
+ }
+
+ if ((err = sem_init(&s->resumed_sem, 0, 0))) {
+ snprintf(errbuf, sizeof(errbuf),
+ "Error initializing resume semaphore: %d\n", err);
+ s->errstr = errbuf;
+ return -1;
+ }
+
+ /* signal mask should be inherited */
+ block_timer();
+
+ if ((err = pthread_create(&s->suspend_thr, NULL, suspend_thread, s))) {
+ snprintf(errbuf, sizeof(errbuf), "Error creating suspend thread: %d\n",
err);
+ s->errstr = errbuf;
+ return -1;
+ }
+
+ return 0;
+}
+
+static void stop_suspend_thread(checkpoint_state* s)
+{
+ int err;
+
+ s->done = 1;
+
+ err = sem_post(&s->resumed_sem);
+
+ err = pthread_join(s->suspend_thr, NULL);
+ s->suspend_thr = 0;
+}
diff --git a/tools/python/xen/lowlevel/netlink/libnetlink.c
b/tools/python/xen/lowlevel/netlink/libnetlink.c
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/lowlevel/netlink/libnetlink.c
@@ -0,0 +1,585 @@
+/*
+ * libnetlink.c RTnetlink service routines.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version
+ * 2 of the License, or (at your option) any later version.
+ *
+ * Authors: Alexey Kuznetsov, <kuznet@xxxxxxxxxxxxx>
+ *
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <syslog.h>
+#include <fcntl.h>
+#include <net/if_arp.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <string.h>
+#include <errno.h>
+#include <time.h>
+#include <sys/uio.h>
+
+#include "libnetlink.h"
+
+void rtnl_close(struct rtnl_handle *rth)
+{
+ close(rth->fd);
+}
+
+int rtnl_open_byproto(struct rtnl_handle *rth, unsigned subscriptions,
+ int protocol)
+{
+ socklen_t addr_len;
+ int sndbuf = 32768;
+ int rcvbuf = 32768;
+
+ memset(rth, 0, sizeof(rth));
+
+ rth->fd = socket(AF_NETLINK, SOCK_RAW, protocol);
+ if (rth->fd < 0) {
+ perror("Cannot open netlink socket");
+ return -1;
+ }
+
+ if (setsockopt(rth->fd,SOL_SOCKET,SO_SNDBUF,&sndbuf,sizeof(sndbuf)) <
0) {
+ perror("SO_SNDBUF");
+ return -1;
+ }
+
+ if (setsockopt(rth->fd,SOL_SOCKET,SO_RCVBUF,&rcvbuf,sizeof(rcvbuf)) <
0) {
+ perror("SO_RCVBUF");
+ return -1;
+ }
+
+ memset(&rth->local, 0, sizeof(rth->local));
+ rth->local.nl_family = AF_NETLINK;
+ rth->local.nl_groups = subscriptions;
+
+ if (bind(rth->fd, (struct sockaddr*)&rth->local, sizeof(rth->local)) <
0) {
+ perror("Cannot bind netlink socket");
+ return -1;
+ }
+ addr_len = sizeof(rth->local);
+ if (getsockname(rth->fd, (struct sockaddr*)&rth->local, &addr_len) < 0)
{
+ perror("Cannot getsockname");
+ return -1;
+ }
+ if (addr_len != sizeof(rth->local)) {
+ fprintf(stderr, "Wrong address length %d\n", addr_len);
+ return -1;
+ }
+ if (rth->local.nl_family != AF_NETLINK) {
+ fprintf(stderr, "Wrong address family %d\n",
rth->local.nl_family);
+ return -1;
+ }
+ rth->seq = time(NULL);
+ return 0;
+}
+
+int rtnl_open(struct rtnl_handle *rth, unsigned subscriptions)
+{
+ return rtnl_open_byproto(rth, subscriptions, NETLINK_ROUTE);
+}
+
+int rtnl_wilddump_request(struct rtnl_handle *rth, int family, int type)
+{
+ struct {
+ struct nlmsghdr nlh;
+ struct rtgenmsg g;
+ } req;
+ struct sockaddr_nl nladdr;
+
+ memset(&nladdr, 0, sizeof(nladdr));
+ nladdr.nl_family = AF_NETLINK;
+
+ memset(&req, 0, sizeof(req));
+ req.nlh.nlmsg_len = sizeof(req);
+ req.nlh.nlmsg_type = type;
+ req.nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST;
+ req.nlh.nlmsg_pid = 0;
+ req.nlh.nlmsg_seq = rth->dump = ++rth->seq;
+ req.g.rtgen_family = family;
+
+ return sendto(rth->fd, (void*)&req, sizeof(req), 0,
+ (struct sockaddr*)&nladdr, sizeof(nladdr));
+}
+
+int rtnl_send(struct rtnl_handle *rth, const char *buf, int len)
+{
+ struct sockaddr_nl nladdr;
+
+ memset(&nladdr, 0, sizeof(nladdr));
+ nladdr.nl_family = AF_NETLINK;
+
+ return sendto(rth->fd, buf, len, 0, (struct sockaddr*)&nladdr,
sizeof(nladdr));
+}
+
+int rtnl_dump_request(struct rtnl_handle *rth, int type, void *req, int len)
+{
+ struct nlmsghdr nlh;
+ struct sockaddr_nl nladdr;
+ struct iovec iov[2] = {
+ { .iov_base = &nlh, .iov_len = sizeof(nlh) },
+ { .iov_base = req, .iov_len = len }
+ };
+ struct msghdr msg = {
+ .msg_name = &nladdr,
+ .msg_namelen = sizeof(nladdr),
+ .msg_iov = iov,
+ .msg_iovlen = 2,
+ };
+
+ memset(&nladdr, 0, sizeof(nladdr));
+ nladdr.nl_family = AF_NETLINK;
+
+ nlh.nlmsg_len = NLMSG_LENGTH(len);
+ nlh.nlmsg_type = type;
+ nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST;
+ nlh.nlmsg_pid = 0;
+ nlh.nlmsg_seq = rth->dump = ++rth->seq;
+
+ return sendmsg(rth->fd, &msg, 0);
+}
+
+int rtnl_dump_filter(struct rtnl_handle *rth,
+ rtnl_filter_t filter,
+ void *arg1,
+ rtnl_filter_t junk,
+ void *arg2)
+{
+ struct sockaddr_nl nladdr;
+ struct iovec iov;
+ struct msghdr msg = {
+ .msg_name = &nladdr,
+ .msg_namelen = sizeof(nladdr),
+ .msg_iov = &iov,
+ .msg_iovlen = 1,
+ };
+ char buf[16384];
+
+ iov.iov_base = buf;
+ while (1) {
+ int status;
+ struct nlmsghdr *h;
+
+ iov.iov_len = sizeof(buf);
+ status = recvmsg(rth->fd, &msg, 0);
+
+ if (status < 0) {
+ if (errno == EINTR)
+ continue;
+ perror("OVERRUN");
+ continue;
+ }
+
+ if (status == 0) {
+ fprintf(stderr, "EOF on netlink\n");
+ return -1;
+ }
+
+ h = (struct nlmsghdr*)buf;
+ while (NLMSG_OK(h, status)) {
+ int err;
+
+ if (nladdr.nl_pid != 0 ||
+ h->nlmsg_pid != rth->local.nl_pid ||
+ h->nlmsg_seq != rth->dump) {
+ if (junk) {
+ err = junk(&nladdr, h, arg2);
+ if (err < 0)
+ return err;
+ }
+ goto skip_it;
+ }
+
+ if (h->nlmsg_type == NLMSG_DONE)
+ return 0;
+ if (h->nlmsg_type == NLMSG_ERROR) {
+ struct nlmsgerr *err = (struct
nlmsgerr*)NLMSG_DATA(h);
+ if (h->nlmsg_len < NLMSG_LENGTH(sizeof(struct
nlmsgerr))) {
+ fprintf(stderr, "ERROR truncated\n");
+ } else {
+ errno = -err->error;
+ perror("RTNETLINK answers");
+ }
+ return -1;
+ }
+ err = filter(&nladdr, h, arg1);
+ if (err < 0)
+ return err;
+
+skip_it:
+ h = NLMSG_NEXT(h, status);
+ }
+ if (msg.msg_flags & MSG_TRUNC) {
+ fprintf(stderr, "Message truncated\n");
+ continue;
+ }
+ if (status) {
+ fprintf(stderr, "!!!Remnant of size %d\n", status);
+ exit(1);
+ }
+ }
+}
+
+int rtnl_talk(struct rtnl_handle *rtnl, struct nlmsghdr *n, pid_t peer,
+ unsigned groups, struct nlmsghdr *answer,
+ rtnl_filter_t junk,
+ void *jarg)
+{
+ int status;
+ unsigned seq;
+ struct nlmsghdr *h;
+ struct sockaddr_nl nladdr;
+ struct iovec iov = {
+ .iov_base = (void*) n,
+ .iov_len = n->nlmsg_len
+ };
+ struct msghdr msg = {
+ .msg_name = &nladdr,
+ .msg_namelen = sizeof(nladdr),
+ .msg_iov = &iov,
+ .msg_iovlen = 1,
+ };
+ char buf[16384];
+
+ memset(&nladdr, 0, sizeof(nladdr));
+ nladdr.nl_family = AF_NETLINK;
+ nladdr.nl_pid = peer;
+ nladdr.nl_groups = groups;
+
+ n->nlmsg_seq = seq = ++rtnl->seq;
+
+ if (answer == NULL)
+ n->nlmsg_flags |= NLM_F_ACK;
+
+ status = sendmsg(rtnl->fd, &msg, 0);
+
+ if (status < 0) {
+ perror("Cannot talk to rtnetlink");
+ return -1;
+ }
+
+ memset(buf,0,sizeof(buf));
+
+ iov.iov_base = buf;
+
+ while (1) {
+ iov.iov_len = sizeof(buf);
+ status = recvmsg(rtnl->fd, &msg, 0);
+
+ if (status < 0) {
+ if (errno == EINTR)
+ continue;
+ perror("OVERRUN");
+ continue;
+ }
+ if (status == 0) {
+ fprintf(stderr, "EOF on netlink\n");
+ return -1;
+ }
+ if (msg.msg_namelen != sizeof(nladdr)) {
+ fprintf(stderr, "sender address length == %d\n",
msg.msg_namelen);
+ exit(1);
+ }
+ for (h = (struct nlmsghdr*)buf; status >= sizeof(*h); ) {
+ int err;
+ int len = h->nlmsg_len;
+ int l = len - sizeof(*h);
+
+ if (l<0 || len>status) {
+ if (msg.msg_flags & MSG_TRUNC) {
+ fprintf(stderr, "Truncated message\n");
+ return -1;
+ }
+ fprintf(stderr, "!!!malformed message:
len=%d\n", len);
+ exit(1);
+ }
+
+ if (nladdr.nl_pid != peer ||
+ h->nlmsg_pid != rtnl->local.nl_pid ||
+ h->nlmsg_seq != seq) {
+ if (junk) {
+ err = junk(&nladdr, h, jarg);
+ if (err < 0)
+ return err;
+ }
+ continue;
+ }
+
+ if (h->nlmsg_type == NLMSG_ERROR) {
+ struct nlmsgerr *err = (struct
nlmsgerr*)NLMSG_DATA(h);
+ if (l < sizeof(struct nlmsgerr)) {
+ fprintf(stderr, "ERROR truncated\n");
+ } else {
+ errno = -err->error;
+ if (errno == 0) {
+ if (answer)
+ memcpy(answer, h,
h->nlmsg_len);
+ return 0;
+ }
+ perror("RTNETLINK answers");
+ }
+ return -1;
+ }
+ if (answer) {
+ memcpy(answer, h, h->nlmsg_len);
+ return 0;
+ }
+
+ fprintf(stderr, "Unexpected reply!!!\n");
+
+ status -= NLMSG_ALIGN(len);
+ h = (struct nlmsghdr*)((char*)h + NLMSG_ALIGN(len));
+ }
+ if (msg.msg_flags & MSG_TRUNC) {
+ fprintf(stderr, "Message truncated\n");
+ continue;
+ }
+ if (status) {
+ fprintf(stderr, "!!!Remnant of size %d\n", status);
+ exit(1);
+ }
+ }
+}
+
+int rtnl_listen(struct rtnl_handle *rtnl,
+ rtnl_filter_t handler,
+ void *jarg)
+{
+ int status;
+ struct nlmsghdr *h;
+ struct sockaddr_nl nladdr;
+ struct iovec iov;
+ struct msghdr msg = {
+ .msg_name = &nladdr,
+ .msg_namelen = sizeof(nladdr),
+ .msg_iov = &iov,
+ .msg_iovlen = 1,
+ };
+ char buf[8192];
+
+ memset(&nladdr, 0, sizeof(nladdr));
+ nladdr.nl_family = AF_NETLINK;
+ nladdr.nl_pid = 0;
+ nladdr.nl_groups = 0;
+
+ iov.iov_base = buf;
+ while (1) {
+ iov.iov_len = sizeof(buf);
+ status = recvmsg(rtnl->fd, &msg, 0);
+
+ if (status < 0) {
+ if (errno == EINTR)
+ continue;
+ perror("OVERRUN");
+ continue;
+ }
+ if (status == 0) {
+ fprintf(stderr, "EOF on netlink\n");
+ return -1;
+ }
+ if (msg.msg_namelen != sizeof(nladdr)) {
+ fprintf(stderr, "Sender address length == %d\n",
msg.msg_namelen);
+ exit(1);
+ }
+ for (h = (struct nlmsghdr*)buf; status >= sizeof(*h); ) {
+ int err;
+ int len = h->nlmsg_len;
+ int l = len - sizeof(*h);
+
+ if (l<0 || len>status) {
+ if (msg.msg_flags & MSG_TRUNC) {
+ fprintf(stderr, "Truncated message\n");
+ return -1;
+ }
+ fprintf(stderr, "!!!malformed message:
len=%d\n", len);
+ exit(1);
+ }
+
+ err = handler(&nladdr, h, jarg);
+ if (err < 0)
+ return err;
+
+ status -= NLMSG_ALIGN(len);
+ h = (struct nlmsghdr*)((char*)h + NLMSG_ALIGN(len));
+ }
+ if (msg.msg_flags & MSG_TRUNC) {
+ fprintf(stderr, "Message truncated\n");
+ continue;
+ }
+ if (status) {
+ fprintf(stderr, "!!!Remnant of size %d\n", status);
+ exit(1);
+ }
+ }
+}
+
+int rtnl_from_file(FILE *rtnl, rtnl_filter_t handler,
+ void *jarg)
+{
+ int status;
+ struct sockaddr_nl nladdr;
+ char buf[8192];
+ struct nlmsghdr *h = (void*)buf;
+
+ memset(&nladdr, 0, sizeof(nladdr));
+ nladdr.nl_family = AF_NETLINK;
+ nladdr.nl_pid = 0;
+ nladdr.nl_groups = 0;
+
+ while (1) {
+ int err, len, type;
+ int l;
+
+ status = fread(&buf, 1, sizeof(*h), rtnl);
+
+ if (status < 0) {
+ if (errno == EINTR)
+ continue;
+ perror("rtnl_from_file: fread");
+ return -1;
+ }
+ if (status == 0)
+ return 0;
+
+ len = h->nlmsg_len;
+ type= h->nlmsg_type;
+ l = len - sizeof(*h);
+
+ if (l<0 || len>sizeof(buf)) {
+ fprintf(stderr, "!!!malformed message: len=%d @%lu\n",
+ len, ftell(rtnl));
+ return -1;
+ }
+
+ status = fread(NLMSG_DATA(h), 1, NLMSG_ALIGN(l), rtnl);
+
+ if (status < 0) {
+ perror("rtnl_from_file: fread");
+ return -1;
+ }
+ if (status < l) {
+ fprintf(stderr, "rtnl-from_file: truncated message\n");
+ return -1;
+ }
+
+ err = handler(&nladdr, h, jarg);
+ if (err < 0)
+ return err;
+ }
+}
+
+int addattr32(struct nlmsghdr *n, int maxlen, int type, __u32 data)
+{
+ int len = RTA_LENGTH(4);
+ struct rtattr *rta;
+ if (NLMSG_ALIGN(n->nlmsg_len) + len > maxlen) {
+ fprintf(stderr,"addattr32: Error! max allowed bound %d
exceeded\n",maxlen);
+ return -1;
+ }
+ rta = NLMSG_TAIL(n);
+ rta->rta_type = type;
+ rta->rta_len = len;
+ memcpy(RTA_DATA(rta), &data, 4);
+ n->nlmsg_len = NLMSG_ALIGN(n->nlmsg_len) + len;
+ return 0;
+}
+
+int addattr_l(struct nlmsghdr *n, int maxlen, int type, const void *data,
+ int alen)
+{
+ int len = RTA_LENGTH(alen);
+ struct rtattr *rta;
+
+ if (NLMSG_ALIGN(n->nlmsg_len) + RTA_ALIGN(len) > maxlen) {
+ fprintf(stderr, "addattr_l ERROR: message exceeded bound of
%d\n",maxlen);
+ return -1;
+ }
+ rta = NLMSG_TAIL(n);
+ rta->rta_type = type;
+ rta->rta_len = len;
+ memcpy(RTA_DATA(rta), data, alen);
+ n->nlmsg_len = NLMSG_ALIGN(n->nlmsg_len) + RTA_ALIGN(len);
+ return 0;
+}
+
+int addraw_l(struct nlmsghdr *n, int maxlen, const void *data, int len)
+{
+ if (NLMSG_ALIGN(n->nlmsg_len) + NLMSG_ALIGN(len) > maxlen) {
+ fprintf(stderr, "addraw_l ERROR: message exceeded bound of
%d\n",maxlen);
+ return -1;
+ }
+
+ memcpy(NLMSG_TAIL(n), data, len);
+ memset((void *) NLMSG_TAIL(n) + len, 0, NLMSG_ALIGN(len) - len);
+ n->nlmsg_len = NLMSG_ALIGN(n->nlmsg_len) + NLMSG_ALIGN(len);
+ return 0;
+}
+
+int rta_addattr32(struct rtattr *rta, int maxlen, int type, __u32 data)
+{
+ int len = RTA_LENGTH(4);
+ struct rtattr *subrta;
+
+ if (RTA_ALIGN(rta->rta_len) + len > maxlen) {
+ fprintf(stderr,"rta_addattr32: Error! max allowed bound %d
exceeded\n",maxlen);
+ return -1;
+ }
+ subrta = (struct rtattr*)(((char*)rta) + RTA_ALIGN(rta->rta_len));
+ subrta->rta_type = type;
+ subrta->rta_len = len;
+ memcpy(RTA_DATA(subrta), &data, 4);
+ rta->rta_len = NLMSG_ALIGN(rta->rta_len) + len;
+ return 0;
+}
+
+int rta_addattr_l(struct rtattr *rta, int maxlen, int type,
+ const void *data, int alen)
+{
+ struct rtattr *subrta;
+ int len = RTA_LENGTH(alen);
+
+ if (RTA_ALIGN(rta->rta_len) + RTA_ALIGN(len) > maxlen) {
+ fprintf(stderr,"rta_addattr_l: Error! max allowed bound %d
exceeded\n",maxlen);
+ return -1;
+ }
+ subrta = (struct rtattr*)(((char*)rta) + RTA_ALIGN(rta->rta_len));
+ subrta->rta_type = type;
+ subrta->rta_len = len;
+ memcpy(RTA_DATA(subrta), data, alen);
+ rta->rta_len = NLMSG_ALIGN(rta->rta_len) + RTA_ALIGN(len);
+ return 0;
+}
+
+int parse_rtattr(struct rtattr *tb[], int max, struct rtattr *rta, int len)
+{
+ memset(tb, 0, sizeof(struct rtattr *) * (max + 1));
+ while (RTA_OK(rta, len)) {
+ if (rta->rta_type <= max)
+ tb[rta->rta_type] = rta;
+ rta = RTA_NEXT(rta,len);
+ }
+ if (len)
+ fprintf(stderr, "!!!Deficit %d, rta_len=%d\n", len,
rta->rta_len);
+ return 0;
+}
+
+int parse_rtattr_byindex(struct rtattr *tb[], int max, struct rtattr *rta, int
len)
+{
+ int i = 0;
+
+ memset(tb, 0, sizeof(struct rtattr *) * max);
+ while (RTA_OK(rta, len)) {
+ if (rta->rta_type <= max && i < max)
+ tb[i++] = rta;
+ rta = RTA_NEXT(rta,len);
+ }
+ if (len)
+ fprintf(stderr, "!!!Deficit %d, rta_len=%d\n", len,
rta->rta_len);
+ return i;
+}
diff --git a/tools/python/xen/lowlevel/netlink/libnetlink.h
b/tools/python/xen/lowlevel/netlink/libnetlink.h
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/lowlevel/netlink/libnetlink.h
@@ -0,0 +1,58 @@
+#ifndef __LIBNETLINK_H__
+#define __LIBNETLINK_H__ 1
+
+#include <netinet/in.h>
+#include <asm/types.h>
+#include <linux/netlink.h>
+#include <linux/rtnetlink.h>
+
+struct rtnl_handle
+{
+ int fd;
+ struct sockaddr_nl local;
+ struct sockaddr_nl peer;
+ __u32 seq;
+ __u32 dump;
+};
+
+extern int rtnl_open(struct rtnl_handle *rth, unsigned subscriptions);
+extern int rtnl_open_byproto(struct rtnl_handle *rth, unsigned subscriptions,
int protocol);
+extern void rtnl_close(struct rtnl_handle *rth);
+extern int rtnl_wilddump_request(struct rtnl_handle *rth, int fam, int type);
+extern int rtnl_dump_request(struct rtnl_handle *rth, int type, void *req, int
len);
+
+typedef int (*rtnl_filter_t)(const struct sockaddr_nl *,
+ struct nlmsghdr *n, void *);
+extern int rtnl_dump_filter(struct rtnl_handle *rth, rtnl_filter_t filter,
+ void *arg1,
+ rtnl_filter_t junk,
+ void *arg2);
+extern int rtnl_talk(struct rtnl_handle *rtnl, struct nlmsghdr *n, pid_t peer,
+ unsigned groups, struct nlmsghdr *answer,
+ rtnl_filter_t junk,
+ void *jarg);
+extern int rtnl_send(struct rtnl_handle *rth, const char *buf, int);
+
+
+extern int addattr32(struct nlmsghdr *n, int maxlen, int type, __u32 data);
+extern int addattr_l(struct nlmsghdr *n, int maxlen, int type, const void
*data, int alen);
+extern int addraw_l(struct nlmsghdr *n, int maxlen, const void *data, int len);
+extern int rta_addattr32(struct rtattr *rta, int maxlen, int type, __u32 data);
+extern int rta_addattr_l(struct rtattr *rta, int maxlen, int type, const void
*data, int alen);
+
+extern int parse_rtattr(struct rtattr *tb[], int max, struct rtattr *rta, int
len);
+extern int parse_rtattr_byindex(struct rtattr *tb[], int max, struct rtattr
*rta, int len);
+
+#define parse_rtattr_nested(tb, max, rta) \
+ (parse_rtattr((tb), (max), RTA_DATA(rta), RTA_PAYLOAD(rta)))
+
+extern int rtnl_listen(struct rtnl_handle *, rtnl_filter_t handler,
+ void *jarg);
+extern int rtnl_from_file(FILE *, rtnl_filter_t handler,
+ void *jarg);
+
+#define NLMSG_TAIL(nmsg) \
+ ((struct rtattr *) (((void *) (nmsg)) + NLMSG_ALIGN((nmsg)->nlmsg_len)))
+
+#endif /* __LIBNETLINK_H__ */
+
diff --git a/tools/python/xen/lowlevel/netlink/netlink.c
b/tools/python/xen/lowlevel/netlink/netlink.c
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/lowlevel/netlink/netlink.c
@@ -0,0 +1,211 @@
+/* python binding to libnetlink */
+
+#include <Python.h>
+#include "libnetlink.h"
+
+#define PKG "xen.lowlevel.netlink"
+
+typedef struct {
+ PyObject_HEAD
+ int opened;
+ struct rtnl_handle rth;
+} PyRtnlObject;
+
+/* todo: subscriptions? */
+static PyObject* PyRtnl_new(PyTypeObject* type, PyObject* args,
+ PyObject* kwargs)
+{
+ return type->tp_alloc(type, 0);
+}
+
+static int PyRtnl_init(PyObject* obj, PyObject* args, PyObject* kwargs)
+{
+ PyRtnlObject* self = (PyRtnlObject*)obj;
+
+ if (rtnl_open(&self->rth, 0) < 0) {
+ PyErr_SetString(PyExc_IOError, "could not open rtnl handle");
+ return -1;
+ }
+
+ return 0;
+}
+
+static void PyRtnl_dealloc(PyRtnlObject* obj)
+{
+ PyRtnlObject* self = (PyRtnlObject*)obj;
+
+ rtnl_close(&self->rth);
+}
+
+static PyObject* pyrtnl_talk(PyObject* obj, PyObject* args)
+{
+ PyRtnlObject* self = (PyRtnlObject*)obj;
+ char* msg;
+ int len;
+ int peer = 0;
+ int groups = 0;
+
+ if (!PyArg_ParseTuple(args, "s#|ii", &msg, &len, &peer, &groups))
+ return NULL;
+
+ if (rtnl_talk(&self->rth, (struct nlmsghdr*)msg, peer, groups, NULL, NULL,
+ NULL) < 0)
+ {
+ PyErr_SetString(PyExc_IOError, "error sending message");
+ return NULL;
+ }
+
+ Py_RETURN_NONE;
+}
+
+static PyObject* pyrtnl_wilddump_request(PyObject* obj, PyObject* args)
+{
+ PyRtnlObject* self = (PyRtnlObject*)obj;
+ int family, type;
+
+ if (!PyArg_ParseTuple(args, "ii", &family, &type))
+ return NULL;
+
+ if (rtnl_wilddump_request(&self->rth, family, type) < 0) {
+ PyErr_SetString(PyExc_IOError, "could not send dump request");
+ return NULL;
+ }
+
+ Py_RETURN_NONE;
+}
+
+static PyObject* pyrtnl_dump_request(PyObject* obj, PyObject* args)
+{
+ PyRtnlObject* self = (PyRtnlObject*)obj;
+ int type;
+ char* req;
+ int len;
+
+ if (!PyArg_ParseTuple(args, "is#", &type, &req, &len))
+ return NULL;
+
+ if (rtnl_dump_request(&self->rth, type, req, len) < 0) {
+ PyErr_SetString(PyExc_IOError, "could not send dump request");
+ return NULL;
+ }
+
+ Py_RETURN_NONE;
+}
+
+/* translate args to python and call python callback */
+static int dump_filter_helper(const struct sockaddr_nl *who,
+ struct nlmsghdr *n, void *arg)
+{
+ PyObject* filter = arg;
+ PyObject* args;
+ PyObject* result;
+
+ args = Py_BuildValue("s#s#", who, sizeof(*who), n, n->nlmsg_len);
+ result = PyObject_CallObject(filter, args);
+ Py_DECREF(args);
+ if (!result)
+ return -1;
+
+ /* result is ignored as long as an exception isn't raised */
+ Py_DECREF(result);
+ return 0;
+}
+
+static PyObject* pyrtnl_dump_filter(PyObject* obj, PyObject* args)
+{
+ PyRtnlObject* self = (PyRtnlObject*)obj;
+ PyObject *filter;
+
+ if (!PyArg_ParseTuple(args, "O:dump_filter", &filter))
+ return NULL;
+
+ if (!PyCallable_Check(filter)) {
+ PyErr_SetString(PyExc_TypeError, "parameter must be callable");
+ return NULL;
+ }
+
+ Py_INCREF(filter);
+ if (rtnl_dump_filter(&self->rth, dump_filter_helper, filter, NULL,
+ NULL) < 0)
+ {
+ Py_DECREF(filter);
+ return NULL;
+ }
+ Py_DECREF(filter);
+
+ Py_RETURN_NONE;
+}
+
+static PyMethodDef PyRtnl_methods[] = {
+ { "talk", pyrtnl_talk, METH_VARARGS,
+ "send a message to rtnetlink and receive a response.\n" },
+ { "wilddump_request", pyrtnl_wilddump_request, METH_VARARGS,
+ "dump objects.\n" },
+ { "dump_request", pyrtnl_dump_request, METH_VARARGS,
+ "start a dump of a particular netlink type.\n" },
+ { "dump_filter", pyrtnl_dump_filter, METH_VARARGS,
+ "iterate over an rtnl dump.\n" },
+ { NULL }
+};
+
+static PyTypeObject PyRtnlType = {
+ PyObject_HEAD_INIT(NULL)
+ 0, /* ob_size */
+ PKG ".rtnl", /* tp_name */
+ sizeof(PyRtnlObject), /* tp_basicsize */
+ 0, /* tp_itemsize */
+ (destructor)PyRtnl_dealloc, /* tp_dealloc */
+ NULL, /* tp_print */
+ NULL, /* tp_getattr */
+ NULL, /* tp_setattr */
+ NULL, /* tp_compare */
+ NULL, /* tp_repr */
+ NULL, /* tp_as_number */
+ NULL, /* tp_as_sequence */
+ NULL, /* tp_as_mapping */
+ NULL, /* tp_hash */
+ NULL, /* tp_call */
+ NULL, /* tp_str */
+ NULL, /* tp_getattro */
+ NULL, /* tp_setattro */
+ NULL, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT, /* tp_flags */
+ "rtnetlink handle", /* tp_doc */
+ NULL, /* tp_traverse */
+ NULL, /* tp_clear */
+ NULL, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ NULL, /* tp_iter */
+ NULL, /* tp_iternext */
+ PyRtnl_methods, /* tp_methods */
+ NULL, /* tp_members */
+ NULL, /* tp_getset */
+ NULL, /* tp_base */
+ NULL, /* tp_dict */
+ NULL, /* tp_descr_get */
+ NULL, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ PyRtnl_init, /* tp_init */
+ NULL, /* tp_alloc */
+ PyRtnl_new, /* tp_new */
+};
+
+static PyMethodDef methods[] = {
+ { NULL }
+};
+
+static char doc[] = "libnetlink wrapper";
+
+PyMODINIT_FUNC initnetlink(void)
+{
+ PyObject *mod;
+
+ if (PyType_Ready(&PyRtnlType) == -1)
+ return;
+
+ if (!(mod = Py_InitModule3(PKG, methods, doc)))
+ return;
+
+ Py_INCREF(&PyRtnlType);
+ PyModule_AddObject(mod, "rtnl", (PyObject *)&PyRtnlType);
+}
diff --git a/tools/python/xen/remus/__init__.py
b/tools/python/xen/remus/__init__.py
new file mode 100644
diff --git a/tools/python/xen/remus/blkdev.py b/tools/python/xen/remus/blkdev.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/blkdev.py
@@ -0,0 +1,31 @@
+handlers = []
+
+class BlkDevException(Exception): pass
+
+class BlkDev(object):
+ "Object representing a VM block device"
+ def __init__(self, **props):
+ self.uname = ''
+ if 'dev' not in props:
+ raise BlkDevException('no device')
+ #if 'uname' not in props:
+ #raise BlkDevException('no uname')
+ if 'mode' not in props:
+ raise BlkDevException('no mode')
+ self.__dict__.update(props)
+ self.dev = props['dev'].rstrip(':disk')
+
+ def __str__(self):
+ return '%s,%s,%s' % (self.uname, self.dev, self.mode)
+
+def register(handler):
+ "register a block device class with parser"
+ if handler not in handlers:
+ handlers.insert(0, handler)
+
+def parse(props):
+ "turn a vm device dictionary into a blkdev object"
+ for handler in handlers:
+ if handler.handles(**props):
+ return handler(**props)
+ return BlkDev(**props)
diff --git a/tools/python/xen/remus/image.py b/tools/python/xen/remus/image.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/image.py
@@ -0,0 +1,227 @@
+# VM image file manipulation
+
+import logging, struct
+
+import vm
+
+SIGNATURE = 'LinuxGuestRecord'
+LONGLEN = struct.calcsize('L')
+INTLEN = struct.calcsize('i')
+PAGE_SIZE = 4096
+# ~0L
+P2M_EXT_SIG = 4294967295L
+# frames per page
+FPP = 1024
+LTAB_MASK = 0xf << 28
+BATCH_SIZE = 1024
+IDXLEN = INTLEN + BATCH_SIZE * LONGLEN
+
+logging.basicConfig(level=logging.DEBUG)
+log = logging.getLogger()
+
+class VMParseException(Exception): pass
+
+class VMImage(object):
+ def __init__(self, img=None):
+ """img may be a path or a file object.
+ If compact is True, apply checkpoints to base image instead
+ of simply concatenating them.
+ """
+ self.img = img
+
+ self.dom = None
+ self.fd = None
+ self.header = None
+ self.nr_pfns = 0
+ # p2m extension header (unparsed)
+ self.p2mext = None
+
+ if self.img:
+ self.open(self.img)
+
+ def open(self, img):
+ if isinstance(img, str):
+ self.fd = file(img, 'rb')
+ else:
+ self.fd = img
+
+ self.readheader()
+
+ def readheader(self):
+ sig = self.fd.read(len(SIGNATURE))
+ if sig != SIGNATURE:
+ raise VMParseException("Bad signature in image")
+
+ hlen = self.fd.read(INTLEN)
+ hlen, = struct.unpack('!i', hlen)
+
+ self.header = self.fd.read(hlen)
+ self.dom = parseheader(self.header)
+
+ def readp2mfl(self):
+ "read the P2M frame list"
+ pfnlen = self.fd.read(LONGLEN)
+ self.nr_pfns, = struct.unpack('L', pfnlen)
+ p2m0 = self.fd.read(LONGLEN)
+
+ p2mhdr = p2m0
+ p2m0, = struct.unpack('L', p2m0)
+ if p2m0 == P2M_EXT_SIG:
+ elen = self.fd.read(INTLEN)
+ elen, = struct.unpack('I', elen)
+
+ self.p2mext = self.fd.read(elen)
+
+ p2m0 = self.fd.read(LONGLEN)
+ p2m0, = struct.unpack('L', p2m0)
+ p2mfl = [p2m0]
+
+ p2mfle = (self.nr_pfns + FPP - 1)/FPP - 1
+ p2ms = self.fd.read(LONGLEN * p2mfle)
+ p2mfl.extend(struct.unpack('%dL' % p2mfle, p2ms))
+
+ self.p2mfl = p2mfl
+
+ def flush(self):
+ self.ofd.write(self.tail)
+
+class Writer(object):
+ """compress a stream of checkpoints into a single image of the
+ last checkpoint"""
+ def __init__(self, fd, compact=False):
+ self.fd = fd
+ self.compact = compact
+
+ self.vm = None
+ self.tail = None
+ # offset to first batch of pages
+ self.imgstart = 0
+ # PFN mappings
+ self.pfns = []
+
+ def __del__(self):
+ self.close()
+
+ def writeheader(self):
+ hlen = struct.pack('!i', len(self.vm.header))
+ header = ''.join([SIGNATURE, hlen, self.vm.header])
+ self.fd.write(header)
+
+ def writep2mfl(self):
+ p2m = [struct.pack('L', self.vm.nr_pfns)]
+ if self.vm.p2mext:
+ p2m.extend([struct.pack('L', P2M_EXT_SIG), self.vm.p2mext])
+ p2m.append(struct.pack('%dL' % len(self.vm.p2mfl), *self.vm.p2mfl))
+ self.fd.write(''.join(p2m))
+
+ def writebatch(self, batch):
+ def offset(pfn):
+ isz = (pfn / BATCH_SIZE + 1) * IDXLEN
+ return self.imgstart + isz + pfn * PAGE_SIZE
+
+ if not self.compact:
+ return self.fd.write(batch)
+
+ batch = parsebatch(batch)
+ # sort pages for better disk seek behaviour
+ batch.sort(lambda x, y: cmp(x[0] & ~LTAB_MASK, y[0] & ~LTAB_MASK))
+
+ for pfndesc, page in batch:
+ pfn = pfndesc & ~LTAB_MASK
+ if pfn > self.vm.nr_pfns:
+ log.error('INVALID PFN: %d' % pfn)
+ if len(self.pfns) <= pfn:
+ self.pfns.extend([0] * (pfn - len(self.pfns) + 1))
+ self.pfns[pfn] = pfndesc
+ self.fd.seek(offset(pfn))
+ self.fd.write(page)
+
+ #print "max offset: %d, %d" % (len(self.pfns), offset(self.pfns[-1]))
+
+ def writeindex(self):
+ "Write batch header in front of each page"
+ hdrlen = INTLEN + BATCH_SIZE * LONGLEN
+ batches = (len(self.pfns) + BATCH_SIZE - 1) / BATCH_SIZE
+
+ for i in xrange(batches):
+ offset = self.imgstart + i * (hdrlen + (PAGE_SIZE * BATCH_SIZE))
+ pfnoff = i * BATCH_SIZE
+ # python auto-clamps overreads
+ pfns = self.pfns[pfnoff:pfnoff + BATCH_SIZE]
+
+ self.fd.seek(offset)
+ self.fd.write(struct.pack('i', len(pfns)))
+ self.fd.write(struct.pack('%dL' % len(pfns), *pfns))
+
+ def slurp(self, ifd):
+ """Apply an incremental checkpoint to a loaded image.
+ accepts a path or a file object."""
+ if isinstance(ifd, str):
+ ifd = file(ifd, 'rb')
+
+ if not self.vm:
+ self.vm = VMImage(ifd)
+ self.writeheader()
+
+ self.vm.readp2mfl()
+ self.writep2mfl()
+ self.imgstart = self.fd.tell()
+
+ while True:
+ l, batch = readbatch(ifd)
+ if l <= 0:
+ break
+ self.writebatch(batch)
+ self.tail = batch + ifd.read()
+
+ def flush(self):
+ if self.tail:
+ self.fd.seek(0, 2)
+ self.fd.write(self.tail)
+ if self.compact:
+ self.writeindex()
+ self.tail = None
+
+ def close(self):
+ self.flush()
+
+def parseheader(header):
+ "parses a header sexpression"
+ return vm.parsedominfo(vm.strtosxpr(header))
+
+def makeheader(dominfo):
+ "create an image header from a VM dominfo sxpr"
+ items = [SIGNATURE]
+ sxpr = vm.sxprtostr(dominfo)
+ items.append(struct.pack('!i', len(sxpr)))
+ items.append(sxpr)
+ return ''.join(items)
+
+def readbatch(fd):
+ batch = []
+ batchlen = fd.read(INTLEN)
+ batch.append(batchlen)
+ batchlen, = struct.unpack('i', batchlen)
+ log.info("batch length: %d" % batchlen)
+ if batchlen <= 0:
+ return (batchlen, batch[0])
+
+ batchfns = fd.read(LONGLEN * batchlen)
+ batch.append(batchfns)
+ pages = fd.read(PAGE_SIZE * batchlen)
+ if len(pages) != PAGE_SIZE * batchlen:
+ log.error('SHORT READ: %d' % len(pages))
+ batch.append(pages)
+
+ return (batchlen, ''.join(batch))
+
+def parsebatch(batch):
+ "parse a batch string into pages"
+ batchlen, batch = batch[:INTLEN], batch[INTLEN:]
+ batchlen, = struct.unpack('i', batchlen)
+ #print 'batch length: %d' % batchlen
+ pfnlen = batchlen * LONGLEN
+ pfns = struct.unpack('%dL' % batchlen, batch[:pfnlen])
+ pagebuf = batch[pfnlen:]
+ pages = [pagebuf[i*PAGE_SIZE:(i+1)*PAGE_SIZE] for i in xrange(batchlen)]
+ return zip(pfns, pages)
diff --git a/tools/python/xen/remus/netlink.py
b/tools/python/xen/remus/netlink.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/netlink.py
@@ -0,0 +1,314 @@
+# netlink wrappers
+
+import socket, struct
+import xen.lowlevel.netlink
+
+NETLINK_ROUTE = 0
+
+NLM_F_REQUEST = 1 # It is request message.
+NLM_F_MULTI = 2 # Multipart message, terminated by NLMSG_DONE
+NLM_F_ACK = 4 # Reply with ack, with zero or error code
+NLM_F_ECHO = 8 # Echo this request
+
+# Modifiers to GET request
+NLM_F_ROOT = 0x100 # specify tree root
+NLM_F_MATCH = 0x200 # return all matching
+NLM_F_ATOMIC = 0x400 # atomic GET
+NLM_F_DUMP = NLM_F_ROOT|NLM_F_MATCH
+
+# Modifiers to NEW request
+NLM_F_REPLACE = 0x100 # Override existing
+NLM_F_EXCL = 0x200 # Do not touch, if it exists
+NLM_F_CREATE = 0x400 # Create, if it does not exist
+NLM_F_APPEND = 0x800 # Add to end of list
+
+RTM_NEWLINK = 16
+RTM_GETLINK = 18
+RTM_NEWQDISC = 36
+RTM_DELQDISC = 37
+RTM_GETQDISC = 38
+
+IFLA_UNSPEC = 0
+IFLA_ADDRESS = 1
+IFLA_BROADCAST = 2
+IFLA_IFNAME = 3
+IFLA_MTU = 4
+IFLA_LINK = 5
+IFLA_QDISC = 6
+IFLA_STATS = 7
+IFLA_COST = 8
+IFLA_PRIORITY = 9
+IFLA_MASTER = 10
+IFLA_WIRELESS = 11
+IFLA_PROTINFO = 12
+IFLA_TXQLEN = 13
+IFLA_MAP = 14
+IFLA_WEIGHT = 15
+
+TCA_UNSPEC = 0
+TCA_KIND = 1
+TCA_OPTIONS = 2
+TCA_STATS = 3
+TCA_XSTATS = 4
+TCA_RATE = 5
+TCA_FCNT = 6
+TCA_STATS2 = 7
+
+class RTNLException(Exception): pass
+
+def align(l, alignto=4):
+ return (l + alignto - 1) & ~(alignto - 1)
+
+class rtattr(object):
+ "rtattribute"
+ fmt = "HH"
+ fmtlen = struct.calcsize(fmt)
+
+ def __init__(self, msg=None):
+ if msg:
+ self.unpack(msg)
+ else:
+ self.rta_len = 0
+ self.rta_type = 0
+
+ self.body = ''
+
+ def __len__(self):
+ return align(self.rta_len)
+
+ def pack(self):
+ self.rta_len = align(self.fmtlen + len(self.body))
+ s = struct.pack(self.fmt, self.rta_len, self.rta_type) + self.body
+ pad = self.rta_len - len(s)
+ if pad:
+ s += '\0' * pad
+ return s
+
+ def unpack(self, msg):
+ args = struct.unpack(self.fmt, msg[:self.fmtlen])
+ self.rta_len, self.rta_type = args
+
+ self.body = msg[align(self.fmtlen):self.rta_len]
+
+class rtattrlist(object):
+ def __init__(self, msg):
+ self.start = msg
+
+ def __iter__(self):
+ body = self.start
+ while len(body) > rtattr.fmtlen:
+ rta = rtattr(body)
+ yield rta
+ body = body[len(rta):]
+
+class nlmsg(object):
+ "netlink message header"
+ fmt = "IHHII"
+ fmtlen = struct.calcsize(fmt)
+
+ def __init__(self, msg=None):
+ if msg:
+ self.unpack(msg)
+ else:
+ self.nlmsg_len = 0
+ self.nlmsg_type = 0
+ self.nlmsg_flags = 0
+ self.nlmsg_seq = 0
+ self.nlmsg_pid = 0
+
+ self.rta = ''
+ self.body = ''
+
+ def __len__(self):
+ return align(self.fmtlen + len(self.body) + len(self.rta))
+
+ def addattr(self, type, data):
+ attr = rtattr()
+ attr.rta_type = type
+ attr.body = data
+ self.rta += attr.pack()
+
+ def settype(self, cmd):
+ self.nlmsg_type = cmd
+
+ def pack(self):
+ return struct.pack(self.fmt, len(self), self.nlmsg_type,
+ self.nlmsg_flags, self.nlmsg_seq,
+ self.nlmsg_pid) + self.body + self.rta
+
+ def unpack(self, msg):
+ args = struct.unpack(self.fmt, msg[:self.fmtlen])
+ self.nlmsg_len, self.nlmsg_type, self.nlmsg_flags = args[:3]
+ self.nlmsg_seq, self.nlmsg_pid = args[3:]
+
+ self.body = msg[align(self.fmtlen):]
+ self.rta = ''
+
+ def __str__(self):
+ return '<netlink message, len %d, type %d>' % \
+ (self.nlmsg_len, self.nlmsg_type)
+
+class ifinfomsg(object):
+ "interface info message"
+ fmt = "BxHiII"
+ fmtlen = struct.calcsize(fmt)
+
+ def __init__(self, msg=None):
+ if msg:
+ self.unpack(msg)
+ else:
+ self.ifi_family = 0
+ self.ifi_type = 0
+ self.ifi_index = 0
+ self.ifi_flags = 0
+ self.ifi_change = 0
+
+ self.body = ''
+
+ def unpack(self, msg):
+ args = struct.unpack(self.fmt, msg[:self.fmtlen])
+ self.ifi_family, self.ifi_type, self.ifi_index= args[:3]
+ self.ifi_flags, self.ifi_change = args[3:]
+
+ self.body = msg[align(self.fmtlen):]
+
+ def __str__(self):
+ return '<ifinfo message, family %d, type %d, index %d>' % \
+ (self.ifi_family, self.ifi_type, self.ifi_index)
+
+class tcmsg(object):
+ "TC message"
+ fmt = "BxxxiIII"
+ fmtlen = struct.calcsize(fmt)
+
+ def __init__(self, msg=None):
+ if msg:
+ self.unpack(msg)
+ else:
+ self.tcm_family = socket.AF_UNSPEC
+ self.tcm_ifindex = 0
+ self.tcm_handle = 0
+ self.tcm_parent = 0
+ self.tcm_info = 0
+
+ self.rta = ''
+
+ def unpack(self, msg):
+ args = struct.unpack(self.fmt, msg[:self.fmtlen])
+ self.tcm_family, self.tcm_ifindex, self.tcm_handle = args[:3]
+ self.tcm_parent, self.tcm_info = args[3:]
+
+ self.rta = msg[align(self.fmtlen):]
+
+ def pack(self):
+ return struct.pack(self.fmt, self.tcm_family, self.tcm_ifindex,
+ self.tcm_handle, self.tcm_parent, self.tcm_info)
+
+ def __str__(self):
+ return '<tc message, family %d, index %d>' % \
+ (self.tcm_family, self.tcm_ifindex)
+
+class newlinkmsg(object):
+ def __init__(self, nlmsg):
+ if nlmsg.nlmsg_type != RTM_NEWLINK:
+ raise RTNLException("wrong message type")
+ self.nlmsg = nlmsg
+ self.ifi = ifinfomsg(self.nlmsg.body)
+
+ self.rtattrs = {}
+ for rta in rtattrlist(self.ifi.body):
+ self.rtattrs[rta.rta_type] = rta.body
+
+class newqdiscmsg(object):
+ def __init__(self, nlmsg):
+ if nlmsg.nlmsg_type != RTM_NEWQDISC:
+ raise RTNLException("wrong message type")
+ self.nlmsg = nlmsg
+ self.t = tcmsg(self.nlmsg.body)
+
+ self.rtattrs = {}
+ for rta in rtattrlist(self.t.rta):
+ self.rtattrs[rta.rta_type] = rta.body
+
+class rtnl(object):
+ def __init__(self):
+ self._rth = xen.lowlevel.netlink.rtnl()
+ self._linkcache = None
+
+ def getlink(self, key, cached=False):
+ """returns the interface object corresponding to the key, which
+ may be an index number or device name."""
+ if not cached:
+ self._linkcache = None
+ if self._linkcache is None:
+ self._linkcache = self.getlinks()
+
+ if isinstance(key, int):
+ return self._linkcache.get(key)
+
+ for k, v in self._linkcache.iteritems():
+ if v['name'] == key:
+ return v
+
+ return None
+
+ def getlinks(self):
+ """returns a dictionary of interfaces keyed by kernel
+ interface index"""
+ links = {}
+ def dumpfilter(addr, msgstr):
+ msg = newlinkmsg(nlmsg(msgstr))
+ idx = msg.ifi.ifi_index
+ ifname = msg.rtattrs[IFLA_IFNAME].strip('\0')
+ address = msg.rtattrs.get(IFLA_ADDRESS)
+
+ link = {'index': idx,
+ 'type': msg.ifi.ifi_type,
+ 'name': ifname,
+ 'address': address}
+ links[idx] = link
+
+ self._rth.wilddump_request(socket.AF_UNSPEC, RTM_GETLINK)
+ self._rth.dump_filter(dumpfilter)
+
+ return links
+
+ def getqdisc(self, dev):
+ """returns the queueing discipline on device dev, which may be
+ specified by kernel index or device name"""
+ qdiscs = self.getqdiscs(dev)
+ if qdiscs:
+ return qdiscs.values()[0]
+ return None
+
+ def getqdiscs(self, dev=None):
+ """returns a dictionary of queueing disciplines keyed by kernel
+ interface index"""
+ qdiscs = {}
+ def dumpfilter(addr, msgstr):
+ msg = newqdiscmsg(nlmsg(msgstr))
+ idx = msg.t.tcm_ifindex
+ handle = msg.t.tcm_handle
+ kind = msg.rtattrs[TCA_KIND].strip('\0')
+ opts = msg.rtattrs.get(TCA_OPTIONS)
+
+ qdisc = {'index': idx,
+ 'handle': handle,
+ 'kind': kind,
+ 'options': opts}
+ qdiscs[idx] = qdisc
+
+ tcm = tcmsg()
+ if dev:
+ link = self.getlink(dev)
+ if not link:
+ raise QdiscException('device %s not found' % dev)
+ tcm.tcm_ifindex = link['index']
+
+ msg = tcm.pack()
+ self._rth.dump_request(RTM_GETQDISC, msg)
+ self._rth.dump_filter(dumpfilter)
+ return qdiscs
+
+ def talk(self, req):
+ self._rth.talk(req)
diff --git a/tools/python/xen/remus/profile.py
b/tools/python/xen/remus/profile.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/profile.py
@@ -0,0 +1,56 @@
+"""Simple profiling module
+"""
+
+import time
+
+class ProfileBlock(object):
+ """A section of code to be profiled"""
+ def __init__(self, name):
+ self.name = name
+
+ def enter(self):
+ print "PROF: entered %s at %f" % (self.name, time.time())
+
+ def exit(self):
+ print "PROF: exited %s at %f" % (self.name, time.time())
+
+class NullProfiler(object):
+ def enter(self, name):
+ pass
+
+ def exit(self, name=None):
+ pass
+
+class Profiler(object):
+ def __init__(self):
+ self.blocks = {}
+ self.running = []
+
+ def enter(self, name):
+ try:
+ block = self.blocks[name]
+ except KeyError:
+ block = ProfileBlock(name)
+ self.blocks[name] = block
+
+ block.enter()
+ self.running.append(block)
+
+ def exit(self, name=None):
+ if name is not None:
+ block = None
+ while self.running:
+ tmp = self.running.pop()
+ if tmp.name == name:
+ block = tmp
+ break
+ tmp.exit()
+ if not block:
+ raise KeyError('block %s not running' % name)
+ else:
+ try:
+ block = self.running.pop()
+ except IndexError:
+ raise KeyError('no block running')
+
+ block.exit()
diff --git a/tools/python/xen/remus/qdisc.py b/tools/python/xen/remus/qdisc.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/qdisc.py
@@ -0,0 +1,178 @@
+import socket, struct
+
+import netlink
+
+qdisc_kinds = {}
+
+TC_H_ROOT = 0xFFFFFFFF
+
+class QdiscException(Exception): pass
+
+class request(object):
+ "qdisc request message"
+ def __init__(self, cmd, flags=0, dev=None, handle=0):
+ self.n = netlink.nlmsg()
+ self.t = netlink.tcmsg()
+
+ self.n.nlmsg_flags = netlink.NLM_F_REQUEST|flags
+ self.n.nlmsg_type = cmd
+ self.t.tcm_family = socket.AF_UNSPEC
+
+ if not handle:
+ handle = TC_H_ROOT
+ self.t.tcm_parent = handle
+
+ if dev:
+ self.t.tcm_ifindex = dev
+
+ def pack(self):
+ t = self.t.pack()
+ self.n.body = t
+ return self.n.pack()
+
+class addrequest(request):
+ def __init__(self, dev, handle, qdisc):
+ flags = netlink.NLM_F_EXCL|netlink.NLM_F_CREATE
+ super(addrequest, self).__init__(netlink.RTM_NEWQDISC, flags=flags,
+ dev=dev, handle=handle)
+ self.n.addattr(netlink.TCA_KIND, qdisc.kind)
+ opts = qdisc.pack()
+ if opts:
+ self.n.addattr(netlink.TCA_OPTIONS, opts)
+
+class delrequest(request):
+ def __init__(self, dev, handle):
+ super(delrequest, self).__init__(netlink.RTM_DELQDISC, dev=dev,
+ handle=handle)
+
+class changerequest(request):
+ def __init__(self, dev, handle, qdisc):
+ super(changerequest, self).__init__(netlink.RTM_NEWQDISC,
+ dev=dev, handle=handle)
+ self.n.addattr(netlink.TCA_KIND, qdisc.kind)
+ opts = qdisc.pack()
+ if opts:
+ self.n.addattr(netlink.TCA_OPTIONS, opts)
+
+class Qdisc(object):
+ def __new__(cls, qdict=None, *args, **opts):
+ if qdict:
+ kind = qdict.get('kind')
+ cls = qdisc_kinds.get(kind, cls)
+ obj = super(Qdisc, cls).__new__(cls, qdict=qdict, *args, **opts)
+ return obj
+
+ def __init__(self, qdict):
+ self._qdict = qdict
+ self.kind = qdict['kind']
+ self.handle = qdict['handle'] >> 16
+
+ def parse(self, opts):
+ if opts:
+ raise QdiscException('cannot parse qdisc parameters')
+
+ def optstr(self):
+ if self.qdict['options']:
+ return '[cannot parse qdisc parameters]'
+ else:
+ return ''
+
+ def pack(self):
+ return ''
+
+TC_PRIO_MAX = 15
+class PrioQdisc(Qdisc):
+ fmt = 'i%sB' % (TC_PRIO_MAX + 1)
+
+ def __init__(self, qdict):
+ super(PrioQdisc, self).__init__(qdict)
+
+ if qdict.get('options'):
+ self.unpack(qdict['options'])
+ else:
+ self.bands = 3
+ self.priomap = [1, 2, 2, 2, 1, 2, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1]
+
+ def pack(self):
+ #return struct.pack(self.fmt, self.bands, *self.priomap)
+ return ''
+
+ def unpack(self, opts):
+ args = struct.unpack(self.fmt, opts)
+ self.bands = args[0]
+ self.priomap = args[1:]
+
+ def optstr(self):
+ mapstr = ' '.join([str(p) for p in self.priomap])
+ return 'bands %d priomap %s' % (self.bands, mapstr)
+
+qdisc_kinds['prio'] = PrioQdisc
+qdisc_kinds['pfifo_fast'] = PrioQdisc
+
+class CfifoQdisc(Qdisc):
+ fmt = 'II'
+
+ def __init__(self, qdict):
+ super(CfifoQdisc, self).__init__(qdict)
+
+ if qdict.get('options'):
+ self.unpack(qdict['options'])
+ else:
+ self.epoch = 0
+ self.vmid = 0
+
+ def pack(self):
+ return struct.pack(self.fmt, self.epoch, self.vmid)
+
+ def unpack(self, opts):
+ self.epoch, self.vmid = struct.unpack(self.fmt, opts)
+
+ def parse(self, opts):
+ args = list(opts)
+ try:
+ while args:
+ arg = args.pop(0)
+ if arg == 'epoch':
+ self.epoch = int(args.pop(0))
+ continue
+ if arg.lower() == 'vmid':
+ self.vmid = int(args.pop(0))
+ continue
+ except Exception, inst:
+ raise QdiscException(str(inst))
+
+ def optstr(self):
+ return 'epoch %d vmID %d' % (self.epoch, self.vmid)
+
+qdisc_kinds['cfifo'] = CfifoQdisc
+
+TC_QUEUE_CHECKPOINT = 0
+TC_QUEUE_RELEASE = 1
+
+class QueueQdisc(Qdisc):
+ fmt = 'I'
+
+ def __init__(self, qdict=None):
+ if not qdict:
+ qdict = {'kind': 'queue',
+ 'handle': TC_H_ROOT}
+ super(QueueQdisc, self).__init__(qdict)
+
+ self.action = 0
+
+ def pack(self):
+ return struct.pack(self.fmt, self.action)
+
+ def parse(self, args):
+ if not args:
+ raise QdiscException('no action given')
+ arg = args[0]
+
+ if arg == 'checkpoint':
+ self.action = TC_QUEUE_CHECKPOINT
+ elif arg == 'release':
+ self.action = TC_QUEUE_RELEASE
+ else:
+ raise QdiscException('unknown action')
+
+qdisc_kinds['queue'] = QueueQdisc
diff --git a/tools/python/xen/remus/save.py b/tools/python/xen/remus/save.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/save.py
@@ -0,0 +1,172 @@
+#!/usr/bin/env python
+
+import os, select, socket, threading, time, signal, xmlrpclib
+
+from xen.xend.XendClient import server
+from xen.xend.xenstore.xswatch import xswatch
+
+import xen.lowlevel.xc
+from xen.xend.xenstore import xsutil
+xc = xen.lowlevel.xc.xc()
+
+import xen.lowlevel.checkpoint
+
+import vm, image
+
+XCFLAGS_LIVE = 1
+
+xcsave = '/usr/lib/xen/bin/xc_save'
+
+class _proxy(object):
+ "proxy simulates an object without inheritance"
+ def __init__(self, obj):
+ self._obj = obj
+
+ def __getattr__(self, name):
+ return getattr(self._obj, name)
+
+ def proxy(self, obj):
+ self._obj = obj
+
+class CheckpointError(Exception): pass
+
+class CheckpointingFile(_proxy):
+ """Tee writes into separate file objects for each round.
+ This is necessary because xc_save gets a single file descriptor
+ for the duration of checkpointing.
+ """
+ def __init__(self, path):
+ self.path = path
+
+ self.round = 0
+ self.rfd, self.wfd = os.pipe()
+ self.fd = file(path, 'wb')
+
+ # this pipe is used to notify the writer thread of checkpoints
+ self.cprfd, self.cpwfd = os.pipe()
+
+ super(CheckpointingFile, self).__init__(self.fd)
+
+ wt = threading.Thread(target=self._wrthread, name='disk-write-thread')
+ wt.setDaemon(True)
+ wt.start()
+ self.wt = wt
+
+ def fileno(self):
+ return self.wfd
+
+ def close(self):
+ os.close(self.wfd)
+ # closing wfd should signal writer to stop
+ self.wt.join()
+ os.close(self.rfd)
+ os.close(self.cprfd)
+ os.close(self.cpwfd)
+ self.fd.close()
+ self.wt = None
+
+ def checkpoint(self):
+ os.write(self.cpwfd, '1')
+
+ def _wrthread(self):
+ while True:
+ r, o, e = select.select((self.rfd, self.cprfd), (), ())
+ if self.rfd in r:
+ data = os.read(self.rfd, 256 * 1024)
+ if not data:
+ break
+ self.fd.write(data)
+ if self.cprfd in r:
+ junk = os.read(self.cprfd, 1)
+ self.round += 1
+ self.fd = file('%s.%d' % (self.path, self.round), 'wb')
+ self.proxy(self.fd)
+
+class MigrationSocket(_proxy):
+ def __init__(self, address):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect(address)
+
+ sock.send("receive\n")
+ sock.recv(80)
+
+ fd = os.fdopen(sock.fileno(), 'w+')
+
+ self.sock = sock
+ super(MigrationSocket, self).__init__(fd)
+
+class Keepalive(object):
+ "Call a keepalive method at intervals"
+ def __init__(self, method, interval=0.1):
+ self.keepalive = method
+ self.interval = interval
+
+ self.thread = None
+ self.running = False
+
+ def start(self):
+ if not self.interval:
+ return
+ self.thread = threading.Thread(target=self.run,
name='keepalive-thread')
+ self.thread.setDaemon(True)
+ self.running = True
+ self.thread.start()
+
+ def stop(self):
+ if not self.thread:
+ return
+ self.running = False
+ self.thread.join()
+ self.thread = None
+
+ def run(self):
+ while self.running:
+ self.keepalive()
+ time.sleep(self.interval)
+ self.keepalive(stop=True)
+
+class Saver(object):
+ def __init__(self, domid, fd, suspendcb=None, resumecb=None,
+ checkpointcb=None, interval=0):
+ """Create a Saver object for taking guest checkpoints.
+ domid: name, number or UUID of a running domain
+ fd: a stream to which checkpoint data will be written.
+ suspendcb: callback invoked after guest is suspended
+ resumecb: callback invoked before guest resumes
+ checkpointcb: callback invoked when a checkpoint is complete. Return
+ True to take another checkpoint, or False to stop.
+ """
+ self.fd = fd
+ self.suspendcb = suspendcb
+ self.resumecb = resumecb
+ self.checkpointcb = checkpointcb
+ self.interval = interval
+
+ self.vm = vm.VM(domid)
+
+ self.checkpointer = None
+
+ def start(self):
+ vm.getshadowmem(self.vm)
+
+ hdr = image.makeheader(self.vm.dominfo)
+ self.fd.write(hdr)
+ self.fd.flush()
+
+ self.checkpointer = xen.lowlevel.checkpoint.checkpointer()
+ try:
+ self.checkpointer.open(self.vm.domid)
+ self.checkpointer.start(self.fd, self.suspendcb, self.resumecb,
+ self.checkpointcb, self.interval)
+ self.checkpointer.close()
+ except xen.lowlevel.checkpoint.error, e:
+ raise CheckpointError(e)
+
+ def _resume(self):
+ """low-overhead version of XendDomainInfo.resumeDomain"""
+ # TODO: currently assumes SUSPEND_CANCEL is available
+ if True:
+ xc.domain_resume(self.vm.domid, 1)
+ xsutil.ResumeDomain(self.vm.domid)
+ else:
+ server.xend.domain.resumeDomain(self.vm.domid)
diff --git a/tools/python/xen/remus/tapdisk.py
b/tools/python/xen/remus/tapdisk.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/tapdisk.py
@@ -0,0 +1,4 @@
+import blkdev
+
+class TapDisk(BlkDev):
+ pass
diff --git a/tools/python/xen/remus/util.py b/tools/python/xen/remus/util.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/util.py
@@ -0,0 +1,31 @@
+# utility functions
+
+import os, subprocess
+
+class PipeException(Exception):
+ def __init__(self, message, errno):
+ self.errno = errno
+ message = '%s: %d, %s' % (message, errno, os.strerror(errno))
+ Exception.__init__(self, message)
+
+def canonifymac(mac):
+ return ':'.join(['%02x' % int(field, 16) for field in mac.split(':')])
+
+def runcmd(args, cwd=None):
+ # TODO: stdin handling
+ if type(args) == str:
+ args = args.split(' ')
+ try:
+ proc = subprocess.Popen(args, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, close_fds=True,
+ cwd=cwd)
+ stdout = proc.stdout.read()
+ stderr = proc.stderr.read()
+ proc.wait()
+ if proc.returncode:
+ print ' '.join(args)
+ print stderr.strip()
+ raise PipeException('%s failed' % args[0], proc.returncode)
+ return stdout
+ except (OSError, IOError), inst:
+ raise PipeException('could not run %s' % args[0], inst.errno)
diff --git a/tools/python/xen/remus/vbd.py b/tools/python/xen/remus/vbd.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/vbd.py
@@ -0,0 +1,9 @@
+import blkdev
+
+class VBD(blkdev.BlkDev):
+ def handles(self, **props):
+ uname = props.get('uname', '')
+ return uname.startswith('phy:')
+ handles = classmethod(handles)
+
+blkdev.register(VBD)
diff --git a/tools/python/xen/remus/vdi.py b/tools/python/xen/remus/vdi.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/vdi.py
@@ -0,0 +1,121 @@
+#code to play with vdis and snapshots
+
+import os
+
+def run(cmd):
+ fd = os.popen(cmd)
+ res = [l for l in fd if l.rstrip()]
+ return not fd.close(), res
+
+
+_blockstore = '/blockstore.dat'
+
+def set_blockstore(blockstore):
+ global _blockstore
+ __blockstore = blockstore
+
+
+class SnapShot:
+ def __init__(self, vdi, block, index):
+ self.__vdi = vdi
+ self.__block = block
+ self.__index = index
+
+ #TODO add snapshot date and radix
+
+ def __str__(self):
+ return '%d %d %d' % (self.__vdi.id(), self.__block, self.__index)
+
+ def vdi(self):
+ return self.__vdi
+
+ def block(self):
+ return self.__block
+
+ def index(self):
+ return self.__index
+
+ def match(self, block, index):
+ return self.__block == block and self.__index == index
+
+
+class VDIException(Exception):
+ pass
+
+
+class VDI:
+ def __init__(self, id, name):
+ self.__id = id
+ self.__name = name
+
+ def __str__(self):
+ return 'vdi: %d %s' % (self.__id, self.__name)
+
+ def id(self):
+ return self.__id
+
+ def name(self):
+ return self.__name
+
+ def list_snapshots(self):
+ res, ls = run('vdi_snap_list %s %d' % (_blockstore, self.__id))
+ if res:
+ return [SnapShot(self, int(l[0]), int(l[1])) for l in [l.split()
for l in ls[1:]]]
+ else:
+ raise VDIException("Error reading snapshot list")
+
+ def snapshot(self):
+ res, ls = run('vdi_checkpoint %s %d' % (_blockstore, self.__id))
+ if res:
+ _, block, idx = ls[0].split()
+ return SnapShot(self, int(block), int(idx))
+ else:
+ raise VDIException("Error taking vdi snapshot")
+
+
+def create(name, snap):
+ res, _ = run('vdi_create %s %s %d %d'
+ % (_blockstore, name, snap.block(), snap.index()))
+ if res:
+ return lookup_by_name(name)
+ else:
+ raise VDIException('Unable to create vdi from snapshot')
+
+
+def fill(name, img_file):
+ res, _ = run('vdi_create %s %s' % (_blockstore, name))
+
+ if res:
+ vdi = lookup_by_name(name)
+ res, _ = run('vdi_fill %d %s' % (vdi.id(), img_file))
+ if res:
+ return vdi
+ raise VDIException('Unable to create vdi from disk img file')
+
+
+def list_vdis():
+ vdis = []
+ res, lines = run('vdi_list %s' % _blockstore)
+ if res:
+ for l in lines:
+ r = l.split()
+ vdis.append(VDI(int(r[0]), r[1]))
+ return vdis
+ else:
+ raise VDIException("Error doing vdi list")
+
+
+def lookup_by_id(id):
+ vdis = list_vdis()
+ for v in vdis:
+ if v.id() == id:
+ return v
+ raise VDIException("No match from vdi id")
+
+
+def lookup_by_name(name):
+ vdis = list_vdis()
+ for v in vdis:
+ if v.name() == name:
+ return v
+ raise VDIException("No match for vdi name")
diff --git a/tools/python/xen/remus/vif.py b/tools/python/xen/remus/vif.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/vif.py
@@ -0,0 +1,14 @@
+from xen.remus.util import canonifymac
+
+class VIF(object):
+ def __init__(self, **props):
+ self.__dict__.update(props)
+ if 'mac' in props:
+ self.mac = canonifymac(props['mac'])
+
+ def __str__(self):
+ return self.mac
+
+def parse(props):
+ "turn a vm device dictionary into a vif object"
+ return VIF(**props)
diff --git a/tools/python/xen/remus/vm.py b/tools/python/xen/remus/vm.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/vm.py
@@ -0,0 +1,156 @@
+#!/usr/bin/env python
+
+import xmlrpclib
+
+from xen.xend.XendClient import server
+from xen.xend import sxp
+# XXX XendDomain is voodoo to let balloon import succeed
+from xen.xend import XendDomain, balloon
+
+import vif
+import blkdev
+# need a nicer way to load disk drivers
+import vbd
+
+class VMException(Exception): pass
+
+class VM(object):
+ "Representation of a virtual machine"
+ def __init__(self, domid=None, dominfo=None):
+ self.dominfo = dominfo
+
+ self.domid = -1
+ self.name = 'unknown'
+ self.dom = {}
+ self.disks = []
+ self.vifs = []
+
+ if domid:
+ try:
+ self.dominfo = server.xend.domain(domid, 'all')
+ except xmlrpclib.Fault:
+ raise VMException('error looking up domain %s' % str(domid))
+
+ if self.dominfo:
+ self.loaddominfo()
+
+ def loaddominfo(self):
+ self.dom = parsedominfo(self.dominfo)
+ self.domid = self.dom['domid']
+ self.name = self.dom['name']
+
+ self.disks = getdisks(self.dom)
+ self.vifs = getvifs(self.dom)
+
+ def __str__(self):
+ return 'VM %d (%s), MACs: [%s], disks: [%s]' % \
+ (self.domid, self.name, self.epoch, ', '.join(self.macs),
+ ', '.join([str(d) for d in self.disks]))
+
+def parsedominfo(dominfo):
+ "parses a dominfo sexpression in the form of python lists of lists"
+ def s2d(s):
+ r = {}
+ for elem in s:
+ if len(elem) == 0:
+ continue
+ name = elem[0]
+ if len(elem) == 1:
+ val = None
+ else:
+ val = elem[1]
+ if isinstance(val, list):
+ val = s2d(elem[1:])
+ if isinstance(name, list):
+ # hack for ['cpus', [[1]]]
+ return s2d(elem)
+ if name in r:
+ for k, v in val.iteritems():
+ if k in r[name]:
+ if not isinstance(r[name][k], list):
+ r[name][k] = [r[name][k]]
+ r[name][k].append(v)
+ else:
+ r[name][k] = v
+ else:
+ r[name] = val
+ return r
+
+ return s2d(dominfo[1:])
+
+def domtosxpr(dom):
+ "convert a dominfo into a python sxpr"
+ def d2s(d):
+ r = []
+ for k, v in d.iteritems():
+ elem = [k]
+ if isinstance(v, dict):
+ elem.extend(d2s(v))
+ else:
+ if v is None:
+ v = ''
+ elem.append(v)
+ r.append(elem)
+ return r
+
+ sxpr = ['domain']
+ sxpr.extend(d2s(dom))
+ return sxpr
+
+def strtosxpr(s):
+ "convert a string to a python sxpr"
+ p = sxp.Parser()
+ p.input(s)
+ return p.get_val()
+
+def sxprtostr(sxpr):
+ "convert an sxpr to string"
+ return sxp.to_string(sxpr)
+
+def getvifs(dom):
+ "return vif objects for devices in dom"
+ vifs = dom['device'].get('vif', [])
+ if type(vifs) != list:
+ vifs = [vifs]
+
+ return [vif.parse(v) for v in vifs]
+
+def getdisks(dom):
+ "return block device objects for devices in dom"
+ disks = dom['device'].get('vbd', [])
+ if type(disks) != list:
+ disks = [disks]
+
+ # tapdisk1 devices
+ tap1s = dom['device'].get('tap', [])
+ if type(tap1s) != list:
+ disks.append(tap1s)
+ else:
+ disks.extend(tap1s)
+
+ # tapdisk2 devices
+ tap2s = dom['device'].get('tap2', [])
+ if type(tap2s) != list:
+ disks.append(tap2s)
+ else:
+ disks.extend(tap2s)
+
+ return [blkdev.parse(disk) for disk in disks]
+
+def fromxend(domid):
+ "create a VM object from xend information"
+ return VM(domid)
+
+def getshadowmem(vm):
+ "Balloon down domain0 to create free memory for shadow paging."
+ maxmem = int(vm.dom['maxmem'])
+ shadow = int(vm.dom['shadow_memory'])
+ vcpus = int(vm.dom['vcpus'])
+
+ # from XendDomainInfo.checkLiveMigrateMemory:
+ # 1MB per vcpu plus 4Kib/Mib of RAM. This is higher than
+ # the minimum that Xen would allocate if no value were given.
+ needed = vcpus * 1024 + maxmem * 4 - shadow * 1024
+ if needed > 0:
+ print "Freeing %d kB for shadow mode" % needed
+ balloon.free(needed, vm.dominfo)
_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-devel
|