WARNING - OLD ARCHIVES

This is an archived copy of the Xen.org mailing list, which we have preserved to ensure that existing links to archives are not broken. The live archive, which contains the latest emails, can be found at http://lists.xen.org/
   
 
 
Xen 
 
Home Products Support Community News
 
   
 

xen-devel

[Xen-devel] [PATCH 1 of 3] Remus: add python control extensions

To: xen-devel@xxxxxxxxxxxxxxxxxxx
Subject: [Xen-devel] [PATCH 1 of 3] Remus: add python control extensions
From: Brendan Cully <brendan@xxxxxxxxx>
Date: Thu, 12 Nov 2009 17:10:22 -0800
Cc: andy@xxxxxxxxx
Delivery-date: Thu, 12 Nov 2009 17:13:07 -0800
Envelope-to: www-data@xxxxxxxxxxxxxxxxxxx
In-reply-to: <patchbomb.1258074621@xxxxxxxxxxxx>
List-help: <mailto:xen-devel-request@lists.xensource.com?subject=help>
List-id: Xen developer discussion <xen-devel.lists.xensource.com>
List-post: <mailto:xen-devel@lists.xensource.com>
List-subscribe: <http://lists.xensource.com/mailman/listinfo/xen-devel>, <mailto:xen-devel-request@lists.xensource.com?subject=subscribe>
List-unsubscribe: <http://lists.xensource.com/mailman/listinfo/xen-devel>, <mailto:xen-devel-request@lists.xensource.com?subject=unsubscribe>
References: <patchbomb.1258074621@xxxxxxxxxxxx>
Sender: xen-devel-bounces@xxxxxxxxxxxxxxxxxxx
User-agent: Mercurial-patchbomb/1.3.1+375-2dee9a359262
# HG changeset patch
# User Brendan Cully <brendan@xxxxxxxxx>
# Date 1258073720 28800
# Node ID 213fb814acf431d2a382e8f9c09b4cea106c0958
# Parent  accded2f185f4178f875b170a5c01544648a68d2
Remus: add python control extensions

Signed-off-by: Brendan Cully <brendan@xxxxxxxxx>

diff --git a/tools/python/setup.py b/tools/python/setup.py
--- a/tools/python/setup.py
+++ b/tools/python/setup.py
@@ -67,10 +67,28 @@
                libraries          = libraries,
                sources            = [ "ptsname/ptsname.c" ])
 
+checkpoint = Extension("checkpoint",
+                       extra_compile_args = extra_compile_args,
+                       include_dirs       = include_dirs,
+                       library_dirs       = library_dirs,
+                       libraries          = libraries + [ "rt" ],
+                       sources            = [ 
"xen/lowlevel/checkpoint/checkpoint.c",
+                                              
"xen/lowlevel/checkpoint/libcheckpoint.c"])
+
+netlink = Extension("netlink",
+                    extra_compile_args = extra_compile_args,
+                    include_dirs       = include_dirs,
+                    library_dirs       = library_dirs,
+                    libraries          = libraries,
+                    sources            = [ "xen/lowlevel/netlink/netlink.c",
+                                           
"xen/lowlevel/netlink/libnetlink.c"])
+
 modules = [ xc, xs, ptsname, acm, flask ]
-if os.uname()[0] == 'SunOS':
-    modules.append(scf)
-    modules.append(process)
+plat = os.uname()[0]
+if plat == 'SunOS':
+    modules.extend([ scf, process ])
+if plat == 'Linux':
+    modules.extend([ checkpoint, netlink ])
 
 setup(name            = 'xen',
       version         = '3.0',
@@ -89,6 +107,7 @@
                          'xen.web',
                          'xen.sv',
                          'xen.xsview',
+                         'xen.remus',
 
                          'xen.xend.tests',
                          'xen.xend.server.tests',
diff --git a/tools/python/xen/lowlevel/checkpoint/checkpoint.c 
b/tools/python/xen/lowlevel/checkpoint/checkpoint.c
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.c
@@ -0,0 +1,363 @@
+/* python bridge to checkpointing API */
+
+#include <Python.h>
+
+#include <xs.h>
+#include <xenctrl.h>
+
+#include "checkpoint.h"
+
+#define PKG "xen.lowlevel.checkpoint"
+
+static PyObject* CheckpointError;
+
+typedef struct {
+  PyObject_HEAD
+  checkpoint_state cps;
+
+  /* milliseconds between checkpoints */
+  unsigned int interval;
+  int armed;
+
+  PyObject* suspend_cb;
+  PyObject* postcopy_cb;
+  PyObject* checkpoint_cb;
+
+  PyThreadState* threadstate;
+} CheckpointObject;
+
+static int suspend_trampoline(void* data);
+static int postcopy_trampoline(void* data);
+static int checkpoint_trampoline(void* data);
+
+static PyObject* Checkpoint_new(PyTypeObject* type, PyObject* args,
+                               PyObject* kwargs)
+{
+  CheckpointObject* self = (CheckpointObject*)type->tp_alloc(type, 0);
+
+  if (!self)
+    return NULL;
+
+  checkpoint_init(&self->cps);
+  self->suspend_cb = NULL;
+  self->armed = 0;
+
+  return (PyObject*)self;
+}
+
+static int Checkpoint_init(PyObject* obj, PyObject* args, PyObject* kwargs)
+{
+  return 0;
+}
+
+static void Checkpoint_dealloc(CheckpointObject* self)
+{
+  checkpoint_close(&self->cps);
+
+  self->ob_type->tp_free((PyObject*)self);
+}
+
+static PyObject* pycheckpoint_open(PyObject* obj, PyObject* args)
+{
+  CheckpointObject* self = (CheckpointObject*)obj;
+  checkpoint_state* cps = &self->cps;
+  unsigned int domid;
+
+  if (!PyArg_ParseTuple(args, "I", &domid))
+    return NULL;
+
+  if (checkpoint_open(cps, domid) < 0) {
+    PyErr_SetString(CheckpointError, checkpoint_error(cps));
+
+    return NULL;
+  }
+
+  Py_RETURN_NONE;
+}
+
+static PyObject* pycheckpoint_close(PyObject* obj, PyObject* args)
+{
+  CheckpointObject* self = (CheckpointObject*)obj;
+
+  checkpoint_close(&self->cps);
+
+  Py_XDECREF(self->suspend_cb);
+  self->suspend_cb = NULL;
+  Py_XDECREF(self->postcopy_cb);
+  self->postcopy_cb = NULL;
+  Py_XDECREF(self->checkpoint_cb);
+  self->checkpoint_cb = NULL;
+
+  Py_RETURN_NONE;
+}
+
+static PyObject* pycheckpoint_start(PyObject* obj, PyObject* args) {
+  CheckpointObject* self = (CheckpointObject*)obj;
+
+  PyObject* iofile;
+  PyObject* suspend_cb = NULL;
+  PyObject* postcopy_cb = NULL;
+  PyObject* checkpoint_cb = NULL;
+  unsigned int interval = 0;
+
+  int fd;
+  struct save_callbacks callbacks;
+  int rc;
+
+  if (!PyArg_ParseTuple(args, "O|OOOI", &iofile, &suspend_cb, &postcopy_cb,
+                       &checkpoint_cb, &interval))
+    return NULL;
+
+  self->interval = interval;
+
+  Py_INCREF(iofile);
+  Py_XINCREF(suspend_cb);
+  Py_XINCREF(postcopy_cb);
+  Py_XINCREF(checkpoint_cb);
+
+  fd = PyObject_AsFileDescriptor(iofile);
+  Py_DECREF(iofile);
+  if (fd < 0) {
+    PyErr_SetString(PyExc_TypeError, "invalid file handle");
+    return NULL;
+  }
+
+  if (suspend_cb && suspend_cb != Py_None) {
+    if (!PyCallable_Check(suspend_cb)) {
+      PyErr_SetString(PyExc_TypeError, "suspend callback not callable");
+      goto err;
+    }
+    self->suspend_cb = suspend_cb;
+  } else
+    self->suspend_cb = NULL;
+
+  if (postcopy_cb && postcopy_cb != Py_None) {
+    if (!PyCallable_Check(postcopy_cb)) {
+      PyErr_SetString(PyExc_TypeError, "postcopy callback not callable");
+      return NULL;
+    }
+    self->postcopy_cb = postcopy_cb;
+  } else
+    self->postcopy_cb = NULL;
+
+  if (checkpoint_cb && checkpoint_cb != Py_None) {
+    if (!PyCallable_Check(checkpoint_cb)) {
+      PyErr_SetString(PyExc_TypeError, "checkpoint callback not callable");
+      return NULL;
+    }
+    self->checkpoint_cb = checkpoint_cb;
+  } else
+    self->checkpoint_cb = NULL;
+
+  callbacks.suspend = suspend_trampoline;
+  callbacks.postcopy = postcopy_trampoline;
+  callbacks.checkpoint = checkpoint_trampoline;
+  callbacks.data = self;
+
+  self->threadstate = PyEval_SaveThread();
+  rc = checkpoint_start(&self->cps, fd, &callbacks);
+  PyEval_RestoreThread(self->threadstate);
+
+  if (rc < 0) {
+    PyErr_SetString(CheckpointError, checkpoint_error(&self->cps));
+    goto err;
+  }
+
+  Py_RETURN_NONE;
+
+  err:
+  self->suspend_cb = NULL;
+  Py_XDECREF(suspend_cb);
+  self->postcopy_cb = NULL;
+  Py_XDECREF(postcopy_cb);
+  self->checkpoint_cb = NULL;
+  Py_XDECREF(checkpoint_cb);
+
+  return NULL;
+}
+
+static PyMethodDef Checkpoint_methods[] = {
+  { "open", pycheckpoint_open, METH_VARARGS,
+    "open connection to xen" },
+  { "close", pycheckpoint_close, METH_NOARGS,
+    "close connection to xen" },
+  { "start", pycheckpoint_start, METH_VARARGS | METH_KEYWORDS,
+    "begin a checkpoint" },
+  { NULL, NULL, 0, NULL }
+};
+
+static PyTypeObject CheckpointType = {
+  PyObject_HEAD_INIT(NULL)
+  0,                          /* ob_size           */
+  PKG ".checkpointer",   /* tp_name           */
+  sizeof(CheckpointObject),   /* tp_basicsize      */
+  0,                          /* tp_itemsize       */
+  (destructor)Checkpoint_dealloc, /* tp_dealloc        */
+  NULL,                       /* tp_print          */
+  NULL,                       /* tp_getattr        */
+  NULL,                       /* tp_setattr        */
+  NULL,                       /* tp_compare        */
+  NULL,                       /* tp_repr           */
+  NULL,                       /* tp_as_number      */
+  NULL,                       /* tp_as_sequence    */
+  NULL,                       /* tp_as_mapping     */
+  NULL,                       /* tp_hash           */
+  NULL,                       /* tp_call           */
+  NULL,                       /* tp_str            */
+  NULL,                       /* tp_getattro       */
+  NULL,                       /* tp_setattro       */
+  NULL,                       /* tp_as_buffer      */
+  Py_TPFLAGS_DEFAULT,         /* tp_flags          */
+  "Checkpoint object",        /* tp_doc            */
+  NULL,                       /* tp_traverse       */
+  NULL,                       /* tp_clear          */
+  NULL,                       /* tp_richcompare    */
+  0,                          /* tp_weaklistoffset */
+  NULL,                       /* tp_iter           */
+  NULL,                       /* tp_iternext       */
+  Checkpoint_methods,         /* tp_methods        */
+  NULL,                       /* tp_members        */
+  NULL,                       /* tp_getset         */
+  NULL,                       /* tp_base           */
+  NULL,                       /* tp_dict           */
+  NULL,                       /* tp_descr_get      */
+  NULL,                       /* tp_descr_set      */
+  0,                          /* tp_dictoffset     */
+  (initproc)Checkpoint_init,  /* tp_init           */
+  NULL,                       /* tp_alloc          */
+  Checkpoint_new,             /* tp_new            */
+};
+
+static PyMethodDef methods[] = {
+  { NULL }
+};
+
+static char doc[] = "checkpoint API";
+
+PyMODINIT_FUNC initcheckpoint(void) {
+  PyObject *m;
+
+  if (PyType_Ready(&CheckpointType) < 0)
+    return;
+
+  m = Py_InitModule3(PKG, methods, doc);
+
+  if (!m)
+    return;
+
+  Py_INCREF(&CheckpointType);
+  PyModule_AddObject(m, "checkpointer", (PyObject*)&CheckpointType);
+
+  CheckpointError = PyErr_NewException(PKG ".error", NULL, NULL);
+  Py_INCREF(CheckpointError);
+  PyModule_AddObject(m, "error", CheckpointError);
+
+  block_timer();
+}
+
+/* private functions */
+
+/* bounce C suspend call into python equivalent.
+ * returns 1 on success or 0 on failure */
+static int suspend_trampoline(void* data)
+{
+  CheckpointObject* self = (CheckpointObject*)data;
+
+  PyObject* result;
+
+  /* call default suspend function, then python hook if available */
+  if (self->armed) {
+    if (checkpoint_wait(&self->cps) < 0) {
+      fprintf(stderr, "%s\n", checkpoint_error(&self->cps));
+      return 0;
+    }
+  } else {
+    if (self->interval) {
+      self->armed = 1;
+      checkpoint_settimer(&self->cps, self->interval);
+    }
+
+    if (!checkpoint_suspend(&self->cps)) {
+      fprintf(stderr, "%s\n", checkpoint_error(&self->cps));
+      return 0;
+    }
+  }
+
+  if (!self->suspend_cb)
+    return 1;
+
+  PyEval_RestoreThread(self->threadstate);
+  result = PyObject_CallFunction(self->suspend_cb, NULL);
+  self->threadstate = PyEval_SaveThread();
+
+  if (!result)
+    return 0;
+
+  if (result == Py_None || PyObject_IsTrue(result)) {
+    Py_DECREF(result);
+    return 1;
+  }
+
+  Py_DECREF(result);
+
+  return 0;
+}
+
+static int postcopy_trampoline(void* data)
+{
+  CheckpointObject* self = (CheckpointObject*)data;
+
+  PyObject* result;
+  int rc = 0;
+
+  if (!self->postcopy_cb)
+    goto resume;
+
+  PyEval_RestoreThread(self->threadstate);
+  result = PyObject_CallFunction(self->postcopy_cb, NULL);
+
+  if (result && (result == Py_None || PyObject_IsTrue(result)))
+    rc = 1;
+
+  Py_XDECREF(result);
+  self->threadstate = PyEval_SaveThread();
+
+  resume:
+  if (checkpoint_resume(&self->cps) < 0) {
+    fprintf(stderr, "%s\n", checkpoint_error(&self->cps));
+    return 0;
+  }
+
+  return rc;
+}
+
+static int checkpoint_trampoline(void* data)
+{
+  CheckpointObject* self = (CheckpointObject*)data;
+
+  PyObject* result;
+
+  if (checkpoint_postflush(&self->cps) < 0) {
+      fprintf(stderr, "%s\n", checkpoint_error(&self->cps));
+      return -1;
+  }
+
+  if (!self->checkpoint_cb)
+    return 0;
+
+  PyEval_RestoreThread(self->threadstate);
+  result = PyObject_CallFunction(self->checkpoint_cb, NULL);
+  self->threadstate = PyEval_SaveThread();
+
+  if (!result)
+    return 0;
+
+  if (result == Py_None || PyObject_IsTrue(result)) {
+    Py_DECREF(result);
+    return 1;
+  }
+
+  Py_DECREF(result);
+
+  return 0;
+}
diff --git a/tools/python/xen/lowlevel/checkpoint/checkpoint.h 
b/tools/python/xen/lowlevel/checkpoint/checkpoint.h
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.h
@@ -0,0 +1,59 @@
+/* API for checkpointing */
+
+#ifndef _CHECKPOINT_H_
+#define _CHECKPOINT_H_ 1
+
+#include <pthread.h>
+#include <semaphore.h>
+#include <time.h>
+
+#include <xenguest.h>
+#include <xs.h>
+
+typedef enum {
+    dt_unknown,
+    dt_pv,
+    dt_hvm,
+    dt_pvhvm /* HVM with PV drivers */
+} checkpoint_domtype;
+
+typedef struct {
+    int xch;               /* xc handle */
+    int xce;               /* event channel handle */
+    struct xs_handle* xsh; /* xenstore handle */
+    int watching_shutdown; /* state of watch on @releaseDomain */
+
+    unsigned int domid;
+    checkpoint_domtype domtype;
+    int fd;
+    
+    int suspend_evtchn;
+
+    char* errstr;
+
+    /* suspend deadline thread support */
+    volatile int suspended;
+    volatile int done;
+    pthread_t suspend_thr;
+    sem_t suspended_sem;
+    sem_t resumed_sem;
+    timer_t timer;
+} checkpoint_state;
+
+char* checkpoint_error(checkpoint_state* s);
+
+void checkpoint_init(checkpoint_state* s);
+int checkpoint_open(checkpoint_state* s, unsigned int domid);
+void checkpoint_close(checkpoint_state* s);
+int checkpoint_start(checkpoint_state* s, int fd,
+                    struct save_callbacks* callbacks);
+int checkpoint_suspend(checkpoint_state* s);
+int checkpoint_resume(checkpoint_state* s);
+int checkpoint_postflush(checkpoint_state* s);
+
+int checkpoint_settimer(checkpoint_state* s, int millis);
+int checkpoint_wait(checkpoint_state* s);
+void block_timer(void);
+void unblock_timer(void);
+
+#endif
diff --git a/tools/python/xen/lowlevel/checkpoint/libcheckpoint.c 
b/tools/python/xen/lowlevel/checkpoint/libcheckpoint.c
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/lowlevel/checkpoint/libcheckpoint.c
@@ -0,0 +1,782 @@
+/* API for checkpointing */
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <signal.h>
+#include <sys/stat.h>
+
+#include <xenctrl.h>
+#include <xenguest.h>
+#include <xs.h>
+
+#include "checkpoint.h"
+
+static char errbuf[256];
+
+static int setup_suspend_evtchn(checkpoint_state* s);
+static void release_suspend_evtchn(checkpoint_state *s);
+static int setup_shutdown_watch(checkpoint_state* s);
+static int check_shutdown_watch(checkpoint_state* s);
+static void release_shutdown_watch(checkpoint_state* s);
+static int poll_evtchn(checkpoint_state* s);
+
+static int switch_qemu_logdirty(checkpoint_state* s, int enable);
+static int suspend_hvm(checkpoint_state* s);
+static int suspend_qemu(checkpoint_state* s);
+static int resume_qemu(checkpoint_state* s);
+static int send_qemu(checkpoint_state* s);
+
+static int create_suspend_timer(checkpoint_state* s);
+static int delete_suspend_timer(checkpoint_state* s);
+static int create_suspend_thread(checkpoint_state* s);
+static void stop_suspend_thread(checkpoint_state* s);
+
+/* Returns a string describing the most recent error returned by
+ * a checkpoint function. Static -- do not free. */
+char* checkpoint_error(checkpoint_state* s)
+{
+    return s->errstr;
+}
+
+void checkpoint_init(checkpoint_state* s)
+{
+    s->xch = -1;
+    s->xce = -1;
+    s->xsh = NULL;
+    s->watching_shutdown = 0;
+
+    s->domid = 0;
+    s->domtype = dt_unknown;
+    s->fd = -1;
+
+    s->suspend_evtchn = -1;
+
+    s->errstr = NULL;
+
+    s->suspended = 0;
+    s->done = 0;
+    s->suspend_thr = 0;
+    s->timer = 0;
+}
+
+/* open a checkpoint session to guest domid */
+int checkpoint_open(checkpoint_state* s, unsigned int domid)
+{
+    xc_dominfo_t dominfo;
+    unsigned long pvirq;
+
+    s->domid = domid;
+
+    s->xch = xc_interface_open();
+    if (s->xch < 0) {
+       s->errstr = "could not open control interface (are you root?)";
+
+       return -1;
+    }
+
+    s->xsh = xs_daemon_open();
+    if (!s->xsh) {
+       checkpoint_close(s);
+       s->errstr = "could not open xenstore handle";
+
+       return -1;
+    }
+
+    s->xce = xc_evtchn_open();
+    if (s->xce < 0) {
+       checkpoint_close(s);
+       s->errstr = "could not open event channel handle";
+
+       return -1;
+    }
+
+    if (xc_domain_getinfo(s->xch, s->domid, 1, &dominfo) < 0) {
+       checkpoint_close(s);
+       s->errstr = "could not get domain info";
+
+       return -1;
+    }
+    if (dominfo.hvm) {
+       if (xc_get_hvm_param(s->xch, s->domid, HVM_PARAM_CALLBACK_IRQ, &pvirq)) 
{
+           checkpoint_close(s);
+           s->errstr = "could not get HVM callback IRQ";
+
+           return -1;
+       }
+       s->domtype = pvirq ? dt_pvhvm : dt_hvm;
+    } else
+       s->domtype = dt_pv;
+
+    if (setup_shutdown_watch(s) < 0) {
+       checkpoint_close(s);
+
+       return -1;
+    }
+
+    if (s->domtype == dt_pv) {
+       if (setup_suspend_evtchn(s) < 0) {
+           checkpoint_close(s);
+
+           return -1;
+       }
+    } else if (s->domtype == dt_pvhvm) {
+       checkpoint_close(s);
+       s->errstr = "PV-on-HVM is unsupported";
+
+       return -1;
+    }
+
+    return 0;
+}
+
+void checkpoint_close(checkpoint_state* s)
+{
+  if (s->timer)
+    delete_suspend_timer(s);
+  if (s->suspend_thr)
+    stop_suspend_thread(s);
+
+  release_shutdown_watch(s);
+  release_suspend_evtchn(s);
+
+  if (s->xch >= 0) {
+    xc_interface_close(s->xch);
+    s->xch = -1;
+  }
+  if (s->xce >= 0) {
+    xc_evtchn_close(s->xce);
+    s->xce = -1;
+  }
+  if (s->xsh) {
+    xs_daemon_close(s->xsh);
+    s->xsh = NULL;
+  }
+
+  s->domid = 0;
+  s->fd = -1;
+  s->suspend_evtchn = -1;
+}
+
+/* we toggle logdirty ourselves around the xc_domain_save call --
+ * it avoids having to pass around checkpoint_state */
+static void noop_switch_logdirty(int domid, unsigned enable)
+{
+    return;
+}
+
+int checkpoint_start(checkpoint_state* s, int fd,
+                    struct save_callbacks* callbacks)
+{
+    int hvm, rc;
+    int flags = XCFLAGS_LIVE;
+
+    if (!s->domid) {
+       s->errstr = "checkpoint state not opened";
+       return -1;
+    }
+
+    s->fd = fd;
+
+    hvm = s->domtype > dt_pv;
+    if (hvm) {
+       flags |= XCFLAGS_HVM;
+       if ((rc = switch_qemu_logdirty(s, 1)))
+           return rc;
+    }
+
+    rc = xc_domain_save(s->xch, fd, s->domid, 0, 0, flags, callbacks, hvm,
+       noop_switch_logdirty);
+
+    if (hvm)
+       switch_qemu_logdirty(s, 0);
+
+    return rc;
+}
+
+/* suspend the domain. Returns 0 on failure, 1 on success */
+int checkpoint_suspend(checkpoint_state* s)
+{
+  struct timeval tv;
+  int rc;
+
+  gettimeofday(&tv, NULL);
+  fprintf(stderr, "PROF: suspending at %lu.%06lu\n", (unsigned long)tv.tv_sec,
+         (unsigned long)tv.tv_usec);
+
+  if (s->domtype == dt_hvm) {
+      return suspend_hvm(s) < 0 ? 0 : 1;
+  }
+
+  rc = xc_evtchn_notify(s->xce, s->suspend_evtchn);
+  if (rc < 0) {
+    snprintf(errbuf, sizeof(errbuf),
+            "failed to notify suspend event channel: %d", rc);
+    s->errstr = errbuf;
+
+    return 0;
+  }
+
+  do {
+    rc = poll_evtchn(s);
+  } while (rc >= 0 && rc != s->suspend_evtchn);
+  if (rc <= 0) {
+    snprintf(errbuf, sizeof(errbuf),
+            "failed to receive suspend notification: %d", rc);
+    s->errstr = errbuf;
+
+    return 0;
+  }
+  if (xc_evtchn_unmask(s->xce, s->suspend_evtchn) < 0) {
+    snprintf(errbuf, sizeof(errbuf),
+            "failed to unmask suspend notification channel: %d", rc);
+    s->errstr = errbuf;
+
+    return 0;
+  }
+
+  return 1;
+}
+
+/* wait for a suspend to be triggered by another thread */
+int checkpoint_wait(checkpoint_state* s)
+{
+  int rc;
+
+  if (!s->suspend_thr) {
+    s->errstr = "checkpoint timer is not active\n";
+    return -1;
+  }
+
+  do {
+    rc = sem_wait(&s->suspended_sem);
+    if (rc < 0 && errno != EINTR) {
+      snprintf(errbuf, sizeof(errbuf),
+              "error waiting for suspend semaphore: %d %d\n", rc, errno);
+      s->errstr = errbuf;
+      return -1;
+    }
+  } while (rc < 0);
+
+  if (!s->suspended) {
+    snprintf(errbuf, sizeof(errbuf), "domain not suspended?\n");
+    s->errstr = errbuf;
+    return -1;
+  }
+
+  return 0;
+}
+
+/* let guest execution resume */
+int checkpoint_resume(checkpoint_state* s)
+{
+  struct timeval tv;
+  int rc;
+
+  if (xc_domain_resume(s->xch, s->domid, 1)) {
+    snprintf(errbuf, sizeof(errbuf), "error resuming domain: %d", errno);
+    s->errstr = errbuf;
+
+    return -1;
+  }
+
+  gettimeofday(&tv, NULL);
+  fprintf(stderr, "PROF: resumed at %lu.%06lu\n", (unsigned long)tv.tv_sec,
+         (unsigned long)tv.tv_usec);
+
+  if (s->domtype > dt_pv && resume_qemu(s) < 0)
+      return -1;
+
+  /* restore watchability in xenstore */
+  if (xs_resume_domain(s->xsh, s->domid) < 0)
+    fprintf(stderr, "error resuming domain in xenstore\n");
+
+  s->suspended = 0;
+
+  if (s->suspend_thr) {
+    if ((rc = sem_post(&s->resumed_sem)))
+      fprintf(stderr, "error posting resume semaphore\n");
+  }
+
+  return 0;
+}
+
+/* called after xc_domain_save has flushed its buffer */
+int checkpoint_postflush(checkpoint_state *s)
+{
+    if (s->domtype > dt_pv && send_qemu(s) < 0)
+       return -1;
+
+    return 0;
+}
+
+/* force suspend within millis ms if copy hasn't completed yet */
+int checkpoint_settimer(checkpoint_state* s, int millis)
+{
+  struct itimerspec t;
+  int err;
+
+  if (!s->suspend_thr) {
+    if (create_suspend_timer(s) < 0)
+      return -1;
+
+    if (create_suspend_thread(s) < 0) {
+      delete_suspend_timer(s);
+      return -1;
+    }
+  }
+
+  t.it_value.tv_sec = millis / 1000;
+  t.it_value.tv_nsec = (millis % 1000) * 1000000L;
+  t.it_interval.tv_sec = t.it_value.tv_sec;
+  t.it_interval.tv_nsec = t.it_value.tv_nsec;
+
+  if ((err = timer_settime(s->timer, 0, &t, NULL))) {
+    fprintf(stderr, "Error arming timer: %d\n", err);
+    return -1;
+  }
+
+  return 0;
+}
+
+int delete_suspend_timer(checkpoint_state* s)
+{
+  int rc = 0;
+
+  if (s->timer) {
+    if ((rc = timer_delete(s->timer)))
+      fprintf(stderr, "Error deleting timer: %s\n", strerror(errno));
+    s->timer = NULL;
+  }
+
+  return rc;
+}
+
+/* Set up event channel used to signal a guest to suspend itself */
+static int setup_suspend_evtchn(checkpoint_state* s)
+{
+  int port;
+
+  port = xs_suspend_evtchn_port(s->domid);
+  if (port < 0) {
+    s->errstr = "failed to read suspend event channel";
+    return -1;
+  }
+
+  s->suspend_evtchn = xc_suspend_evtchn_init(s->xch, s->xce, s->domid, port);
+  if (s->suspend_evtchn < 0) {
+    snprintf(errbuf, sizeof(errbuf), "failed to bind suspend event channel");
+    s->errstr = errbuf;
+
+    return -1;
+  }
+
+  fprintf(stderr, "bound to suspend event channel %u:%d as %d\n", s->domid, 
port,
+    s->suspend_evtchn);
+
+  return 0;
+}
+
+/* release suspend event channels bound to guest */
+static void release_suspend_evtchn(checkpoint_state *s)
+{
+  /* TODO: teach xen to clean up if port is unbound */
+  if (s->xce >= 0 && s->suspend_evtchn > 0) {
+    xc_suspend_evtchn_release(s->xce, s->suspend_evtchn);
+    s->suspend_evtchn = 0;
+  }
+}
+
+static int setup_shutdown_watch(checkpoint_state* s)
+{
+  char buf[16];
+
+  /* write domain ID to watch so we can ignore other domain shutdowns */
+  snprintf(buf, sizeof(buf), "%u", s->domid);
+  if ( !xs_watch(s->xsh, "@releaseDomain", buf) ) {
+    fprintf(stderr, "Could not bind to shutdown watch\n");
+    return -1;
+  }
+  /* watch fires once on registration */
+  s->watching_shutdown = 1;
+  check_shutdown_watch(s);
+
+  return 0;
+}
+
+static int check_shutdown_watch(checkpoint_state* s) {
+  unsigned int count;
+  char **vec;
+  char buf[16];
+
+  vec = xs_read_watch(s->xsh, &count);
+  if (s->watching_shutdown == 1) {
+      s->watching_shutdown = 2;
+      return 0;
+  }
+  if (!vec) {
+    fprintf(stderr, "empty watch fired\n");
+    return 0;
+  }
+  snprintf(buf, sizeof(buf), "%d", s->domid);
+  if (!strcmp(vec[XS_WATCH_TOKEN], buf)) {
+    fprintf(stderr, "domain %d shut down\n", s->domid);
+    return -1;
+  }
+
+  return 0;
+}
+
+static void release_shutdown_watch(checkpoint_state* s) {
+  char buf[16];
+
+  if (!s->xsh)
+    return;
+
+  if (!s->watching_shutdown)
+      return;
+
+  snprintf(buf, sizeof(buf), "%u", s->domid);
+  if (!xs_unwatch(s->xsh, "@releaseDomain", buf))
+    fprintf(stderr, "Could not release shutdown watch\n");
+}
+
+/* wrapper around xc_evtchn_pending which detects errors */
+static int poll_evtchn(checkpoint_state* s)
+{
+  int fd, xsfd, maxfd;
+  fd_set rfds, efds;
+  struct timeval tv;
+  int rc;
+
+  fd = xc_evtchn_fd(s->xce);
+  xsfd = xs_fileno(s->xsh);
+  maxfd = fd > xsfd ? fd : xsfd;
+  FD_ZERO(&rfds);
+  FD_ZERO(&efds);
+  FD_SET(fd, &rfds);
+  FD_SET(xsfd, &rfds);
+  FD_SET(fd, &efds);
+  FD_SET(xsfd, &efds);
+
+  /* give it 500 ms to respond */
+  tv.tv_sec = 0;
+  tv.tv_usec = 500000;
+
+  rc = select(maxfd + 1, &rfds, NULL, &efds, &tv);
+  if (rc < 0)
+    fprintf(stderr, "error polling event channel: %s\n", strerror(errno));
+  else if (!rc)
+    fprintf(stderr, "timeout waiting for event channel\n");
+  else if (FD_ISSET(fd, &rfds))
+    return xc_evtchn_pending(s->xce);
+  else if (FD_ISSET(xsfd, &rfds))
+    return check_shutdown_watch(s);
+
+  return -1;
+}
+
+/* adapted from the eponymous function in xc_save */
+static int switch_qemu_logdirty(checkpoint_state *s, int enable)
+{
+    char path[128];
+    char *tail, *cmd, *response;
+    char **vec;
+    unsigned int len;
+
+    sprintf(path, "/local/domain/0/device-model/%u/logdirty/", s->domid);
+    tail = path + strlen(path);
+
+    strcpy(tail, "ret");
+    if (!xs_watch(s->xsh, path, "qemu-logdirty-ret")) {
+       s->errstr = "error watching qemu logdirty return";
+       return -1;
+    }
+    /* null fire. XXX unify with shutdown watch! */
+    vec = xs_read_watch(s->xsh, &len);
+    free(vec);
+
+    strcpy(tail, "cmd");
+    cmd = enable ? "enable" : "disable";
+    if (!xs_write(s->xsh, XBT_NULL, path, cmd, strlen(cmd))) {
+       s->errstr = "error signalling qemu logdirty";
+       return -1;
+    }
+
+    vec = xs_read_watch(s->xsh, &len);
+    free(vec);
+
+    strcpy(tail, "ret");
+    xs_unwatch(s->xsh, path, "qemu-logdirty-ret");
+
+    response = xs_read(s->xsh, XBT_NULL, path, &len);
+    if (!len || strcmp(response, cmd)) {
+       if (len)
+           free(response);
+       s->errstr = "qemu logdirty command failed";
+       return -1;
+    }
+    free(response);
+    fprintf(stderr, "qemu logdirty mode: %s\n", cmd);
+
+    return 0;
+}
+
+static int suspend_hvm(checkpoint_state *s)
+{
+    int rc = -1;
+
+    fprintf(stderr, "issuing HVM suspend hypercall\n");
+    rc = xc_domain_shutdown(s->xch, s->domid, SHUTDOWN_suspend);
+    if (rc < 0) {
+       s->errstr = "shutdown hypercall failed";
+       return -1;
+    }
+    fprintf(stderr, "suspend hypercall returned %d\n", rc);
+
+    if (check_shutdown_watch(s) >= 0)
+       return -1;
+
+    rc = suspend_qemu(s);
+
+    return rc;
+}
+
+static int suspend_qemu(checkpoint_state *s)
+{
+    char path[128];
+
+    fprintf(stderr, "pausing QEMU\n");
+
+    sprintf(path, "/local/domain/0/device-model/%d/command", s->domid);
+    if (!xs_write(s->xsh, XBT_NULL, path, "save", 4)) {
+       fprintf(stderr, "error signalling QEMU to save\n");
+       return -1;
+    }
+
+    sprintf(path, "/local/domain/0/device-model/%d/state", s->domid);
+
+    do {
+       char* state;
+       unsigned int len;
+
+       state = xs_read(s->xsh, XBT_NULL, path, &len);
+       if (!state) {
+           s->errstr = "error reading QEMU state";
+           return -1;
+       }
+
+       if (!strcmp(state, "paused")) {
+           free(state);
+           return 0;
+       }
+
+       free(state);
+       usleep(1000);
+    } while(1);
+
+    return -1;
+}
+
+static int resume_qemu(checkpoint_state *s)
+{
+    char path[128];
+    fprintf(stderr, "resuming QEMU\n");
+
+    sprintf(path, "/local/domain/0/device-model/%d/command", s->domid);
+    if (!xs_write(s->xsh, XBT_NULL, path, "continue", 8)) {
+       fprintf(stderr, "error signalling QEMU to resume\n");
+       return -1;
+    }
+
+    return 0;
+}
+
+static int send_qemu(checkpoint_state *s)
+{
+    char buf[8192];
+    char path[128];
+    struct stat sb;
+    uint32_t qlen = 0;
+    int qfd;
+    int rc;
+
+    if (s->fd < 0)
+       return -1;
+
+    sprintf(path, "/var/lib/xen/qemu-save.%d", s->domid);
+
+    if (stat(path, &sb) < 0) {
+       snprintf(errbuf, sizeof(errbuf),
+               "error getting QEMU state file status: %s", strerror(errno));
+       s->errstr = errbuf;
+       return -1;
+    }
+
+    qlen = sb.st_size;
+    qfd = open(path, O_RDONLY);
+    if (qfd < 0) {
+       snprintf(errbuf, sizeof(errbuf), "error opening QEMU state file: %s",
+                strerror(errno));
+       s->errstr = errbuf;
+       return -1;
+    }
+
+    fprintf(stderr, "Sending %u bytes of QEMU state\n", qlen);
+    if (write(s->fd, "RemusDeviceModelState", 21) != 21) {
+       s->errstr = "error writing QEMU header";
+       close(qfd);
+       return -1;
+    }
+    if (write(s->fd, &qlen, sizeof(qlen)) != sizeof(qlen)) {
+       s->errstr = "error writing QEMU size";
+       close(qfd);
+       return -1;
+    }
+
+    while ((rc = read(qfd, buf, qlen > sizeof(buf) ? sizeof(buf) : qlen)) > 0) 
{
+       qlen -= rc;
+       if (write(s->fd, buf, rc) != rc) {
+           rc = -1;
+           break;
+       }
+    }
+    if (rc < 0) {
+       snprintf(errbuf, sizeof(errbuf), "error writing QEMU state: %s",
+                strerror(errno));
+       s->errstr = errbuf;
+    }
+
+    close(qfd);
+
+    return rc;
+}
+
+/*thread responsible to suspend the domain early if necessary*/
+static void *suspend_thread(void *arg)
+{
+  checkpoint_state* s = (checkpoint_state*)arg;
+  sigset_t tss;
+  int rc;
+  int sig;
+
+  fprintf(stderr, "Suspend thread started\n");
+
+  sigemptyset(&tss);
+  sigaddset(&tss, SIGRTMIN);
+
+  while (1) {
+    /* wait for checkpoint thread to signal resume */
+    if ((rc = sem_wait(&s->resumed_sem)))
+      fprintf(stderr, "Error waiting on resume semaphore\n");
+
+    if ((rc = sigwait(&tss, &sig))) {
+      fprintf(stderr, "sigwait failed: %d %d\n", rc, errno);
+      break;
+    }
+    if (sig != SIGRTMIN)
+      fprintf(stderr, "received unexpected signal %d\n", sig);
+
+    if (s->done)
+      break;
+
+    if (s->suspended) {
+      fprintf(stderr, "domain already suspended?\n");
+    } else {
+      rc = checkpoint_suspend(s);
+      if (rc)
+       s->suspended = 1;
+      else
+       fprintf(stderr, "checkpoint_suspend failed\n");
+    }
+
+    if ((rc = sem_post(&s->suspended_sem)))
+      fprintf(stderr, "Error posting suspend semaphore\n");
+  }
+
+  fprintf(stderr, "Suspend thread exiting\n");
+
+  return NULL;
+}
+
+static int create_suspend_timer(checkpoint_state* s)
+{
+  struct sigevent event;
+  int err;
+
+  event.sigev_notify = SIGEV_SIGNAL;
+  event.sigev_signo = SIGRTMIN;
+  event.sigev_value.sival_int = 0;
+
+  if ((err = timer_create(CLOCK_REALTIME, &event, &s->timer))) {
+    snprintf(errbuf, sizeof(errbuf), "Error creating timer: %d\n", err);
+    s->errstr = errbuf;
+    return -1;
+  }
+
+  return 0;
+}
+
+void block_timer(void)
+{
+  sigset_t tss;
+
+  sigemptyset(&tss);
+  sigaddset(&tss, SIGRTMIN);
+
+  pthread_sigmask(SIG_BLOCK, &tss, NULL);
+}
+
+void unblock_timer(void)
+{
+  sigset_t tss;
+
+  sigemptyset(&tss);
+  sigaddset(&tss, SIGRTMIN); 
+
+  pthread_sigmask(SIG_UNBLOCK, &tss, NULL);
+}
+
+static int create_suspend_thread(checkpoint_state* s)
+{
+  int err;
+
+  if ((err = sem_init(&s->suspended_sem, 0, 0))) {
+    snprintf(errbuf, sizeof(errbuf),
+            "Error initializing suspend semaphore: %d\n", err);
+    s->errstr = errbuf;
+    return -1;
+  }
+
+  if ((err = sem_init(&s->resumed_sem, 0, 0))) {
+    snprintf(errbuf, sizeof(errbuf),
+            "Error initializing resume semaphore: %d\n", err);
+    s->errstr = errbuf;
+    return -1;
+  }
+
+  /* signal mask should be inherited */
+  block_timer();
+
+  if ((err = pthread_create(&s->suspend_thr, NULL, suspend_thread, s))) {
+    snprintf(errbuf, sizeof(errbuf), "Error creating suspend thread: %d\n", 
err);
+    s->errstr = errbuf;
+    return -1;
+  }
+
+  return 0;
+}
+
+static void stop_suspend_thread(checkpoint_state* s)
+{
+  int err;
+
+  s->done = 1;
+
+  err = sem_post(&s->resumed_sem);
+
+  err = pthread_join(s->suspend_thr, NULL);
+  s->suspend_thr = 0;
+}
diff --git a/tools/python/xen/lowlevel/netlink/libnetlink.c 
b/tools/python/xen/lowlevel/netlink/libnetlink.c
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/lowlevel/netlink/libnetlink.c
@@ -0,0 +1,585 @@
+/*
+ * libnetlink.c        RTnetlink service routines.
+ *
+ *             This program is free software; you can redistribute it and/or
+ *             modify it under the terms of the GNU General Public License
+ *             as published by the Free Software Foundation; either version
+ *             2 of the License, or (at your option) any later version.
+ *
+ * Authors:    Alexey Kuznetsov, <kuznet@xxxxxxxxxxxxx>
+ *
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <syslog.h>
+#include <fcntl.h>
+#include <net/if_arp.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <string.h>
+#include <errno.h>
+#include <time.h>
+#include <sys/uio.h>
+
+#include "libnetlink.h"
+
+void rtnl_close(struct rtnl_handle *rth)
+{
+       close(rth->fd);
+}
+
+int rtnl_open_byproto(struct rtnl_handle *rth, unsigned subscriptions,
+                     int protocol)
+{
+       socklen_t addr_len;
+       int sndbuf = 32768;
+       int rcvbuf = 32768;
+
+       memset(rth, 0, sizeof(rth));
+
+       rth->fd = socket(AF_NETLINK, SOCK_RAW, protocol);
+       if (rth->fd < 0) {
+               perror("Cannot open netlink socket");
+               return -1;
+       }
+
+       if (setsockopt(rth->fd,SOL_SOCKET,SO_SNDBUF,&sndbuf,sizeof(sndbuf)) < 
0) {
+               perror("SO_SNDBUF");
+               return -1;
+       }
+
+       if (setsockopt(rth->fd,SOL_SOCKET,SO_RCVBUF,&rcvbuf,sizeof(rcvbuf)) < 
0) {
+               perror("SO_RCVBUF");
+               return -1;
+       }
+
+       memset(&rth->local, 0, sizeof(rth->local));
+       rth->local.nl_family = AF_NETLINK;
+       rth->local.nl_groups = subscriptions;
+
+       if (bind(rth->fd, (struct sockaddr*)&rth->local, sizeof(rth->local)) < 
0) {
+               perror("Cannot bind netlink socket");
+               return -1;
+       }
+       addr_len = sizeof(rth->local);
+       if (getsockname(rth->fd, (struct sockaddr*)&rth->local, &addr_len) < 0) 
{
+               perror("Cannot getsockname");
+               return -1;
+       }
+       if (addr_len != sizeof(rth->local)) {
+               fprintf(stderr, "Wrong address length %d\n", addr_len);
+               return -1;
+       }
+       if (rth->local.nl_family != AF_NETLINK) {
+               fprintf(stderr, "Wrong address family %d\n", 
rth->local.nl_family);
+               return -1;
+       }
+       rth->seq = time(NULL);
+       return 0;
+}
+
+int rtnl_open(struct rtnl_handle *rth, unsigned subscriptions)
+{
+       return rtnl_open_byproto(rth, subscriptions, NETLINK_ROUTE);
+}
+
+int rtnl_wilddump_request(struct rtnl_handle *rth, int family, int type)
+{
+       struct {
+               struct nlmsghdr nlh;
+               struct rtgenmsg g;
+       } req;
+       struct sockaddr_nl nladdr;
+
+       memset(&nladdr, 0, sizeof(nladdr));
+       nladdr.nl_family = AF_NETLINK;
+
+       memset(&req, 0, sizeof(req));
+       req.nlh.nlmsg_len = sizeof(req);
+       req.nlh.nlmsg_type = type;
+       req.nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST;
+       req.nlh.nlmsg_pid = 0;
+       req.nlh.nlmsg_seq = rth->dump = ++rth->seq;
+       req.g.rtgen_family = family;
+
+       return sendto(rth->fd, (void*)&req, sizeof(req), 0,
+                     (struct sockaddr*)&nladdr, sizeof(nladdr));
+}
+
+int rtnl_send(struct rtnl_handle *rth, const char *buf, int len)
+{
+       struct sockaddr_nl nladdr;
+
+       memset(&nladdr, 0, sizeof(nladdr));
+       nladdr.nl_family = AF_NETLINK;
+
+       return sendto(rth->fd, buf, len, 0, (struct sockaddr*)&nladdr, 
sizeof(nladdr));
+}
+
+int rtnl_dump_request(struct rtnl_handle *rth, int type, void *req, int len)
+{
+       struct nlmsghdr nlh;
+       struct sockaddr_nl nladdr;
+       struct iovec iov[2] = {
+               { .iov_base = &nlh, .iov_len = sizeof(nlh) },
+               { .iov_base = req, .iov_len = len }
+       };
+       struct msghdr msg = {
+               .msg_name = &nladdr,
+               .msg_namelen =  sizeof(nladdr),
+               .msg_iov = iov,
+               .msg_iovlen = 2,
+       };
+
+       memset(&nladdr, 0, sizeof(nladdr));
+       nladdr.nl_family = AF_NETLINK;
+
+       nlh.nlmsg_len = NLMSG_LENGTH(len);
+       nlh.nlmsg_type = type;
+       nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST;
+       nlh.nlmsg_pid = 0;
+       nlh.nlmsg_seq = rth->dump = ++rth->seq;
+
+       return sendmsg(rth->fd, &msg, 0);
+}
+
+int rtnl_dump_filter(struct rtnl_handle *rth,
+                    rtnl_filter_t filter,
+                    void *arg1,
+                    rtnl_filter_t junk,
+                    void *arg2)
+{
+       struct sockaddr_nl nladdr;
+       struct iovec iov;
+       struct msghdr msg = {
+               .msg_name = &nladdr,
+               .msg_namelen = sizeof(nladdr),
+               .msg_iov = &iov,
+               .msg_iovlen = 1,
+       };
+       char buf[16384];
+
+       iov.iov_base = buf;
+       while (1) {
+               int status;
+               struct nlmsghdr *h;
+
+               iov.iov_len = sizeof(buf);
+               status = recvmsg(rth->fd, &msg, 0);
+
+               if (status < 0) {
+                       if (errno == EINTR)
+                               continue;
+                       perror("OVERRUN");
+                       continue;
+               }
+
+               if (status == 0) {
+                       fprintf(stderr, "EOF on netlink\n");
+                       return -1;
+               }
+
+               h = (struct nlmsghdr*)buf;
+               while (NLMSG_OK(h, status)) {
+                       int err;
+
+                       if (nladdr.nl_pid != 0 ||
+                           h->nlmsg_pid != rth->local.nl_pid ||
+                           h->nlmsg_seq != rth->dump) {
+                               if (junk) {
+                                       err = junk(&nladdr, h, arg2);
+                                       if (err < 0)
+                                               return err;
+                               }
+                               goto skip_it;
+                       }
+
+                       if (h->nlmsg_type == NLMSG_DONE)
+                               return 0;
+                       if (h->nlmsg_type == NLMSG_ERROR) {
+                               struct nlmsgerr *err = (struct 
nlmsgerr*)NLMSG_DATA(h);
+                               if (h->nlmsg_len < NLMSG_LENGTH(sizeof(struct 
nlmsgerr))) {
+                                       fprintf(stderr, "ERROR truncated\n");
+                               } else {
+                                       errno = -err->error;
+                                       perror("RTNETLINK answers");
+                               }
+                               return -1;
+                       }
+                       err = filter(&nladdr, h, arg1);
+                       if (err < 0)
+                               return err;
+
+skip_it:
+                       h = NLMSG_NEXT(h, status);
+               }
+               if (msg.msg_flags & MSG_TRUNC) {
+                       fprintf(stderr, "Message truncated\n");
+                       continue;
+               }
+               if (status) {
+                       fprintf(stderr, "!!!Remnant of size %d\n", status);
+                       exit(1);
+               }
+       }
+}
+
+int rtnl_talk(struct rtnl_handle *rtnl, struct nlmsghdr *n, pid_t peer,
+             unsigned groups, struct nlmsghdr *answer,
+             rtnl_filter_t junk,
+             void *jarg)
+{
+       int status;
+       unsigned seq;
+       struct nlmsghdr *h;
+       struct sockaddr_nl nladdr;
+       struct iovec iov = {
+               .iov_base = (void*) n,
+               .iov_len = n->nlmsg_len
+       };
+       struct msghdr msg = {
+               .msg_name = &nladdr,
+               .msg_namelen = sizeof(nladdr),
+               .msg_iov = &iov,
+               .msg_iovlen = 1,
+       };
+       char   buf[16384];
+
+       memset(&nladdr, 0, sizeof(nladdr));
+       nladdr.nl_family = AF_NETLINK;
+       nladdr.nl_pid = peer;
+       nladdr.nl_groups = groups;
+
+       n->nlmsg_seq = seq = ++rtnl->seq;
+
+       if (answer == NULL)
+               n->nlmsg_flags |= NLM_F_ACK;
+
+       status = sendmsg(rtnl->fd, &msg, 0);
+
+       if (status < 0) {
+               perror("Cannot talk to rtnetlink");
+               return -1;
+       }
+
+       memset(buf,0,sizeof(buf));
+
+       iov.iov_base = buf;
+
+       while (1) {
+               iov.iov_len = sizeof(buf);
+               status = recvmsg(rtnl->fd, &msg, 0);
+
+               if (status < 0) {
+                       if (errno == EINTR)
+                               continue;
+                       perror("OVERRUN");
+                       continue;
+               }
+               if (status == 0) {
+                       fprintf(stderr, "EOF on netlink\n");
+                       return -1;
+               }
+               if (msg.msg_namelen != sizeof(nladdr)) {
+                       fprintf(stderr, "sender address length == %d\n", 
msg.msg_namelen);
+                       exit(1);
+               }
+               for (h = (struct nlmsghdr*)buf; status >= sizeof(*h); ) {
+                       int err;
+                       int len = h->nlmsg_len;
+                       int l = len - sizeof(*h);
+
+                       if (l<0 || len>status) {
+                               if (msg.msg_flags & MSG_TRUNC) {
+                                       fprintf(stderr, "Truncated message\n");
+                                       return -1;
+                               }
+                               fprintf(stderr, "!!!malformed message: 
len=%d\n", len);
+                               exit(1);
+                       }
+
+                       if (nladdr.nl_pid != peer ||
+                           h->nlmsg_pid != rtnl->local.nl_pid ||
+                           h->nlmsg_seq != seq) {
+                               if (junk) {
+                                       err = junk(&nladdr, h, jarg);
+                                       if (err < 0)
+                                               return err;
+                               }
+                               continue;
+                       }
+
+                       if (h->nlmsg_type == NLMSG_ERROR) {
+                               struct nlmsgerr *err = (struct 
nlmsgerr*)NLMSG_DATA(h);
+                               if (l < sizeof(struct nlmsgerr)) {
+                                       fprintf(stderr, "ERROR truncated\n");
+                               } else {
+                                       errno = -err->error;
+                                       if (errno == 0) {
+                                               if (answer)
+                                                       memcpy(answer, h, 
h->nlmsg_len);
+                                               return 0;
+                                       }
+                                       perror("RTNETLINK answers");
+                               }
+                               return -1;
+                       }
+                       if (answer) {
+                               memcpy(answer, h, h->nlmsg_len);
+                               return 0;
+                       }
+
+                       fprintf(stderr, "Unexpected reply!!!\n");
+
+                       status -= NLMSG_ALIGN(len);
+                       h = (struct nlmsghdr*)((char*)h + NLMSG_ALIGN(len));
+               }
+               if (msg.msg_flags & MSG_TRUNC) {
+                       fprintf(stderr, "Message truncated\n");
+                       continue;
+               }
+               if (status) {
+                       fprintf(stderr, "!!!Remnant of size %d\n", status);
+                       exit(1);
+               }
+       }
+}
+
+int rtnl_listen(struct rtnl_handle *rtnl,
+               rtnl_filter_t handler,
+               void *jarg)
+{
+       int status;
+       struct nlmsghdr *h;
+       struct sockaddr_nl nladdr;
+       struct iovec iov;
+       struct msghdr msg = {
+               .msg_name = &nladdr,
+               .msg_namelen = sizeof(nladdr),
+               .msg_iov = &iov,
+               .msg_iovlen = 1,
+       };
+       char   buf[8192];
+
+       memset(&nladdr, 0, sizeof(nladdr));
+       nladdr.nl_family = AF_NETLINK;
+       nladdr.nl_pid = 0;
+       nladdr.nl_groups = 0;
+
+       iov.iov_base = buf;
+       while (1) {
+               iov.iov_len = sizeof(buf);
+               status = recvmsg(rtnl->fd, &msg, 0);
+
+               if (status < 0) {
+                       if (errno == EINTR)
+                               continue;
+                       perror("OVERRUN");
+                       continue;
+               }
+               if (status == 0) {
+                       fprintf(stderr, "EOF on netlink\n");
+                       return -1;
+               }
+               if (msg.msg_namelen != sizeof(nladdr)) {
+                       fprintf(stderr, "Sender address length == %d\n", 
msg.msg_namelen);
+                       exit(1);
+               }
+               for (h = (struct nlmsghdr*)buf; status >= sizeof(*h); ) {
+                       int err;
+                       int len = h->nlmsg_len;
+                       int l = len - sizeof(*h);
+
+                       if (l<0 || len>status) {
+                               if (msg.msg_flags & MSG_TRUNC) {
+                                       fprintf(stderr, "Truncated message\n");
+                                       return -1;
+                               }
+                               fprintf(stderr, "!!!malformed message: 
len=%d\n", len);
+                               exit(1);
+                       }
+
+                       err = handler(&nladdr, h, jarg);
+                       if (err < 0)
+                               return err;
+
+                       status -= NLMSG_ALIGN(len);
+                       h = (struct nlmsghdr*)((char*)h + NLMSG_ALIGN(len));
+               }
+               if (msg.msg_flags & MSG_TRUNC) {
+                       fprintf(stderr, "Message truncated\n");
+                       continue;
+               }
+               if (status) {
+                       fprintf(stderr, "!!!Remnant of size %d\n", status);
+                       exit(1);
+               }
+       }
+}
+
+int rtnl_from_file(FILE *rtnl, rtnl_filter_t handler,
+                  void *jarg)
+{
+       int status;
+       struct sockaddr_nl nladdr;
+       char   buf[8192];
+       struct nlmsghdr *h = (void*)buf;
+
+       memset(&nladdr, 0, sizeof(nladdr));
+       nladdr.nl_family = AF_NETLINK;
+       nladdr.nl_pid = 0;
+       nladdr.nl_groups = 0;
+
+       while (1) {
+               int err, len, type;
+               int l;
+
+               status = fread(&buf, 1, sizeof(*h), rtnl);
+
+               if (status < 0) {
+                       if (errno == EINTR)
+                               continue;
+                       perror("rtnl_from_file: fread");
+                       return -1;
+               }
+               if (status == 0)
+                       return 0;
+
+               len = h->nlmsg_len;
+               type= h->nlmsg_type;
+               l = len - sizeof(*h);
+
+               if (l<0 || len>sizeof(buf)) {
+                       fprintf(stderr, "!!!malformed message: len=%d @%lu\n",
+                               len, ftell(rtnl));
+                       return -1;
+               }
+
+               status = fread(NLMSG_DATA(h), 1, NLMSG_ALIGN(l), rtnl);
+
+               if (status < 0) {
+                       perror("rtnl_from_file: fread");
+                       return -1;
+               }
+               if (status < l) {
+                       fprintf(stderr, "rtnl-from_file: truncated message\n");
+                       return -1;
+               }
+
+               err = handler(&nladdr, h, jarg);
+               if (err < 0)
+                       return err;
+       }
+}
+
+int addattr32(struct nlmsghdr *n, int maxlen, int type, __u32 data)
+{
+       int len = RTA_LENGTH(4);
+       struct rtattr *rta;
+       if (NLMSG_ALIGN(n->nlmsg_len) + len > maxlen) {
+               fprintf(stderr,"addattr32: Error! max allowed bound %d 
exceeded\n",maxlen);
+               return -1;
+       }
+       rta = NLMSG_TAIL(n);
+       rta->rta_type = type;
+       rta->rta_len = len;
+       memcpy(RTA_DATA(rta), &data, 4);
+       n->nlmsg_len = NLMSG_ALIGN(n->nlmsg_len) + len;
+       return 0;
+}
+
+int addattr_l(struct nlmsghdr *n, int maxlen, int type, const void *data,
+             int alen)
+{
+       int len = RTA_LENGTH(alen);
+       struct rtattr *rta;
+
+       if (NLMSG_ALIGN(n->nlmsg_len) + RTA_ALIGN(len) > maxlen) {
+               fprintf(stderr, "addattr_l ERROR: message exceeded bound of 
%d\n",maxlen);
+               return -1;
+       }
+       rta = NLMSG_TAIL(n);
+       rta->rta_type = type;
+       rta->rta_len = len;
+       memcpy(RTA_DATA(rta), data, alen);
+       n->nlmsg_len = NLMSG_ALIGN(n->nlmsg_len) + RTA_ALIGN(len);
+       return 0;
+}
+
+int addraw_l(struct nlmsghdr *n, int maxlen, const void *data, int len)
+{
+       if (NLMSG_ALIGN(n->nlmsg_len) + NLMSG_ALIGN(len) > maxlen) {
+               fprintf(stderr, "addraw_l ERROR: message exceeded bound of 
%d\n",maxlen);
+               return -1;
+       }
+
+       memcpy(NLMSG_TAIL(n), data, len);
+       memset((void *) NLMSG_TAIL(n) + len, 0, NLMSG_ALIGN(len) - len);
+       n->nlmsg_len = NLMSG_ALIGN(n->nlmsg_len) + NLMSG_ALIGN(len);
+       return 0;
+}
+
+int rta_addattr32(struct rtattr *rta, int maxlen, int type, __u32 data)
+{
+       int len = RTA_LENGTH(4);
+       struct rtattr *subrta;
+
+       if (RTA_ALIGN(rta->rta_len) + len > maxlen) {
+               fprintf(stderr,"rta_addattr32: Error! max allowed bound %d 
exceeded\n",maxlen);
+               return -1;
+       }
+       subrta = (struct rtattr*)(((char*)rta) + RTA_ALIGN(rta->rta_len));
+       subrta->rta_type = type;
+       subrta->rta_len = len;
+       memcpy(RTA_DATA(subrta), &data, 4);
+       rta->rta_len = NLMSG_ALIGN(rta->rta_len) + len;
+       return 0;
+}
+
+int rta_addattr_l(struct rtattr *rta, int maxlen, int type,
+                 const void *data, int alen)
+{
+       struct rtattr *subrta;
+       int len = RTA_LENGTH(alen);
+
+       if (RTA_ALIGN(rta->rta_len) + RTA_ALIGN(len) > maxlen) {
+               fprintf(stderr,"rta_addattr_l: Error! max allowed bound %d 
exceeded\n",maxlen);
+               return -1;
+       }
+       subrta = (struct rtattr*)(((char*)rta) + RTA_ALIGN(rta->rta_len));
+       subrta->rta_type = type;
+       subrta->rta_len = len;
+       memcpy(RTA_DATA(subrta), data, alen);
+       rta->rta_len = NLMSG_ALIGN(rta->rta_len) + RTA_ALIGN(len);
+       return 0;
+}
+
+int parse_rtattr(struct rtattr *tb[], int max, struct rtattr *rta, int len)
+{
+       memset(tb, 0, sizeof(struct rtattr *) * (max + 1));
+       while (RTA_OK(rta, len)) {
+               if (rta->rta_type <= max)
+                       tb[rta->rta_type] = rta;
+               rta = RTA_NEXT(rta,len);
+       }
+       if (len)
+               fprintf(stderr, "!!!Deficit %d, rta_len=%d\n", len, 
rta->rta_len);
+       return 0;
+}
+
+int parse_rtattr_byindex(struct rtattr *tb[], int max, struct rtattr *rta, int 
len)
+{
+       int i = 0;
+
+       memset(tb, 0, sizeof(struct rtattr *) * max);
+       while (RTA_OK(rta, len)) {
+               if (rta->rta_type <= max && i < max)
+                       tb[i++] = rta;
+               rta = RTA_NEXT(rta,len);
+       }
+       if (len)
+               fprintf(stderr, "!!!Deficit %d, rta_len=%d\n", len, 
rta->rta_len);
+       return i;
+}
diff --git a/tools/python/xen/lowlevel/netlink/libnetlink.h 
b/tools/python/xen/lowlevel/netlink/libnetlink.h
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/lowlevel/netlink/libnetlink.h
@@ -0,0 +1,58 @@
+#ifndef __LIBNETLINK_H__
+#define __LIBNETLINK_H__ 1
+
+#include <netinet/in.h>
+#include <asm/types.h>
+#include <linux/netlink.h>
+#include <linux/rtnetlink.h>
+
+struct rtnl_handle
+{
+       int                     fd;
+       struct sockaddr_nl      local;
+       struct sockaddr_nl      peer;
+       __u32                   seq;
+       __u32                   dump;
+};
+
+extern int rtnl_open(struct rtnl_handle *rth, unsigned subscriptions);
+extern int rtnl_open_byproto(struct rtnl_handle *rth, unsigned subscriptions, 
int protocol);
+extern void rtnl_close(struct rtnl_handle *rth);
+extern int rtnl_wilddump_request(struct rtnl_handle *rth, int fam, int type);
+extern int rtnl_dump_request(struct rtnl_handle *rth, int type, void *req, int 
len);
+
+typedef int (*rtnl_filter_t)(const struct sockaddr_nl *, 
+                            struct nlmsghdr *n, void *);
+extern int rtnl_dump_filter(struct rtnl_handle *rth, rtnl_filter_t filter,
+                           void *arg1,
+                           rtnl_filter_t junk,
+                           void *arg2);
+extern int rtnl_talk(struct rtnl_handle *rtnl, struct nlmsghdr *n, pid_t peer,
+                    unsigned groups, struct nlmsghdr *answer,
+                    rtnl_filter_t junk,
+                    void *jarg);
+extern int rtnl_send(struct rtnl_handle *rth, const char *buf, int);
+
+
+extern int addattr32(struct nlmsghdr *n, int maxlen, int type, __u32 data);
+extern int addattr_l(struct nlmsghdr *n, int maxlen, int type, const void 
*data, int alen);
+extern int addraw_l(struct nlmsghdr *n, int maxlen, const void *data, int len);
+extern int rta_addattr32(struct rtattr *rta, int maxlen, int type, __u32 data);
+extern int rta_addattr_l(struct rtattr *rta, int maxlen, int type, const void 
*data, int alen);
+
+extern int parse_rtattr(struct rtattr *tb[], int max, struct rtattr *rta, int 
len);
+extern int parse_rtattr_byindex(struct rtattr *tb[], int max, struct rtattr 
*rta, int len);
+
+#define parse_rtattr_nested(tb, max, rta) \
+       (parse_rtattr((tb), (max), RTA_DATA(rta), RTA_PAYLOAD(rta)))
+
+extern int rtnl_listen(struct rtnl_handle *, rtnl_filter_t handler, 
+                      void *jarg);
+extern int rtnl_from_file(FILE *, rtnl_filter_t handler,
+                      void *jarg);
+
+#define NLMSG_TAIL(nmsg) \
+       ((struct rtattr *) (((void *) (nmsg)) + NLMSG_ALIGN((nmsg)->nlmsg_len)))
+
+#endif /* __LIBNETLINK_H__ */
+
diff --git a/tools/python/xen/lowlevel/netlink/netlink.c 
b/tools/python/xen/lowlevel/netlink/netlink.c
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/lowlevel/netlink/netlink.c
@@ -0,0 +1,211 @@
+/* python binding to libnetlink */
+
+#include <Python.h>
+#include "libnetlink.h"
+
+#define PKG "xen.lowlevel.netlink"
+
+typedef struct {
+  PyObject_HEAD
+  int opened;
+  struct rtnl_handle rth;
+} PyRtnlObject;
+
+/* todo: subscriptions? */
+static PyObject* PyRtnl_new(PyTypeObject* type, PyObject* args,
+                            PyObject* kwargs)
+{
+  return type->tp_alloc(type, 0);
+}
+
+static int PyRtnl_init(PyObject* obj, PyObject* args, PyObject* kwargs)
+{
+  PyRtnlObject* self = (PyRtnlObject*)obj;
+
+  if (rtnl_open(&self->rth, 0) < 0) {
+    PyErr_SetString(PyExc_IOError, "could not open rtnl handle");
+    return -1;
+  }
+
+  return 0;
+}
+
+static void PyRtnl_dealloc(PyRtnlObject* obj)
+{
+  PyRtnlObject* self = (PyRtnlObject*)obj;
+
+  rtnl_close(&self->rth);
+}
+
+static PyObject* pyrtnl_talk(PyObject* obj, PyObject* args)
+{
+  PyRtnlObject* self = (PyRtnlObject*)obj;
+  char* msg;
+  int len;
+  int peer = 0;
+  int groups = 0;
+
+  if (!PyArg_ParseTuple(args, "s#|ii", &msg, &len, &peer, &groups))
+    return NULL;
+
+  if (rtnl_talk(&self->rth, (struct nlmsghdr*)msg, peer, groups, NULL, NULL,
+                NULL) < 0)
+  {
+    PyErr_SetString(PyExc_IOError, "error sending message");
+    return NULL;
+  }
+
+  Py_RETURN_NONE;
+}
+
+static PyObject* pyrtnl_wilddump_request(PyObject* obj, PyObject* args)
+{
+  PyRtnlObject* self = (PyRtnlObject*)obj;
+  int family, type;
+
+  if (!PyArg_ParseTuple(args, "ii", &family, &type))
+    return NULL;
+
+  if (rtnl_wilddump_request(&self->rth, family, type) < 0) {
+    PyErr_SetString(PyExc_IOError, "could not send dump request");
+    return NULL;
+  }
+
+  Py_RETURN_NONE;
+}
+
+static PyObject* pyrtnl_dump_request(PyObject* obj, PyObject* args)
+{
+  PyRtnlObject* self = (PyRtnlObject*)obj;
+  int type;
+  char* req;
+  int len;
+
+  if (!PyArg_ParseTuple(args, "is#", &type, &req, &len))
+    return NULL;
+
+  if (rtnl_dump_request(&self->rth, type, req, len) < 0) {
+    PyErr_SetString(PyExc_IOError, "could not send dump request");
+    return NULL;
+  }
+
+  Py_RETURN_NONE;
+}
+
+/* translate args to python and call python callback */
+static int dump_filter_helper(const struct sockaddr_nl *who,
+                              struct nlmsghdr *n, void *arg)
+{
+  PyObject* filter = arg;
+  PyObject* args;
+  PyObject* result;
+
+  args = Py_BuildValue("s#s#", who, sizeof(*who), n, n->nlmsg_len);
+  result = PyObject_CallObject(filter, args);
+  Py_DECREF(args);
+  if (!result)
+    return -1;
+
+  /* result is ignored as long as an exception isn't raised */
+  Py_DECREF(result);
+  return 0;
+}
+
+static PyObject* pyrtnl_dump_filter(PyObject* obj, PyObject* args)
+{
+  PyRtnlObject* self = (PyRtnlObject*)obj;
+  PyObject *filter;
+
+  if (!PyArg_ParseTuple(args, "O:dump_filter", &filter))
+    return NULL;
+
+  if (!PyCallable_Check(filter)) {
+    PyErr_SetString(PyExc_TypeError, "parameter must be callable");
+    return NULL;
+  }
+
+  Py_INCREF(filter);
+  if (rtnl_dump_filter(&self->rth, dump_filter_helper, filter, NULL,
+                       NULL) < 0)
+  {
+    Py_DECREF(filter);
+    return NULL;
+  }
+  Py_DECREF(filter);
+
+  Py_RETURN_NONE;
+}
+
+static PyMethodDef PyRtnl_methods[] = {
+  { "talk", pyrtnl_talk, METH_VARARGS,
+    "send a message to rtnetlink and receive a response.\n" },
+  { "wilddump_request", pyrtnl_wilddump_request, METH_VARARGS,
+    "dump objects.\n" },
+  { "dump_request", pyrtnl_dump_request, METH_VARARGS,
+    "start a dump of a particular netlink type.\n" },
+  { "dump_filter", pyrtnl_dump_filter, METH_VARARGS,
+    "iterate over an rtnl dump.\n" },
+  { NULL }
+};
+
+static PyTypeObject PyRtnlType = {
+  PyObject_HEAD_INIT(NULL)
+  0,                          /* ob_size           */
+  PKG ".rtnl",                /* tp_name           */
+  sizeof(PyRtnlObject),       /* tp_basicsize      */
+  0,                          /* tp_itemsize       */
+  (destructor)PyRtnl_dealloc, /* tp_dealloc        */
+  NULL,                       /* tp_print          */
+  NULL,                       /* tp_getattr        */
+  NULL,                       /* tp_setattr        */
+  NULL,                       /* tp_compare        */
+  NULL,                       /* tp_repr           */
+  NULL,                       /* tp_as_number      */
+  NULL,                       /* tp_as_sequence    */
+  NULL,                       /* tp_as_mapping     */
+  NULL,                       /* tp_hash           */
+  NULL,                       /* tp_call           */
+  NULL,                       /* tp_str            */
+  NULL,                       /* tp_getattro       */
+  NULL,                       /* tp_setattro       */
+  NULL,                       /* tp_as_buffer      */
+  Py_TPFLAGS_DEFAULT,         /* tp_flags          */
+  "rtnetlink handle",         /* tp_doc            */
+  NULL,                       /* tp_traverse       */
+  NULL,                       /* tp_clear          */
+  NULL,                       /* tp_richcompare    */
+  0,                          /* tp_weaklistoffset */
+  NULL,                       /* tp_iter           */
+  NULL,                       /* tp_iternext       */
+  PyRtnl_methods,             /* tp_methods        */
+  NULL,                       /* tp_members        */
+  NULL,                       /* tp_getset         */
+  NULL,                       /* tp_base           */
+  NULL,                       /* tp_dict           */
+  NULL,                       /* tp_descr_get      */
+  NULL,                       /* tp_descr_set      */
+  0,                          /* tp_dictoffset     */
+  PyRtnl_init,                /* tp_init           */
+  NULL,                       /* tp_alloc          */
+  PyRtnl_new,                 /* tp_new            */
+};
+
+static PyMethodDef methods[] = {
+  { NULL }
+};
+
+static char doc[] = "libnetlink wrapper";
+
+PyMODINIT_FUNC initnetlink(void)
+{
+  PyObject *mod;
+
+  if (PyType_Ready(&PyRtnlType) == -1)
+    return;
+
+  if (!(mod = Py_InitModule3(PKG, methods, doc)))
+    return;
+
+  Py_INCREF(&PyRtnlType);
+  PyModule_AddObject(mod, "rtnl", (PyObject *)&PyRtnlType);
+}
diff --git a/tools/python/xen/remus/__init__.py 
b/tools/python/xen/remus/__init__.py
new file mode 100644
diff --git a/tools/python/xen/remus/blkdev.py b/tools/python/xen/remus/blkdev.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/blkdev.py
@@ -0,0 +1,31 @@
+handlers = []
+
+class BlkDevException(Exception): pass
+
+class BlkDev(object):
+    "Object representing a VM block device"
+    def __init__(self, **props):
+        self.uname = ''
+        if 'dev' not in props:
+            raise BlkDevException('no device')
+        #if 'uname' not in props:
+            #raise BlkDevException('no uname')
+        if 'mode' not in props:
+            raise BlkDevException('no mode')
+        self.__dict__.update(props)
+        self.dev = props['dev'].rstrip(':disk')
+
+    def __str__(self):
+        return '%s,%s,%s' % (self.uname, self.dev, self.mode)
+
+def register(handler):
+    "register a block device class with parser"
+    if handler not in handlers:
+        handlers.insert(0, handler)
+
+def parse(props):
+    "turn a vm device dictionary into a blkdev object"
+    for handler in handlers:
+        if handler.handles(**props):
+            return handler(**props)
+    return BlkDev(**props)
diff --git a/tools/python/xen/remus/image.py b/tools/python/xen/remus/image.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/image.py
@@ -0,0 +1,227 @@
+# VM image file manipulation
+
+import logging, struct
+
+import vm
+
+SIGNATURE = 'LinuxGuestRecord'
+LONGLEN = struct.calcsize('L')
+INTLEN = struct.calcsize('i')
+PAGE_SIZE = 4096
+# ~0L
+P2M_EXT_SIG = 4294967295L
+# frames per page
+FPP = 1024
+LTAB_MASK = 0xf << 28
+BATCH_SIZE = 1024
+IDXLEN = INTLEN + BATCH_SIZE * LONGLEN
+
+logging.basicConfig(level=logging.DEBUG)
+log = logging.getLogger()
+
+class VMParseException(Exception): pass
+
+class VMImage(object):
+    def __init__(self, img=None):
+        """img may be a path or a file object.
+        If compact is True, apply checkpoints to base image instead
+        of simply concatenating them.
+        """
+        self.img = img
+
+        self.dom = None
+        self.fd = None
+        self.header = None
+        self.nr_pfns = 0
+        # p2m extension header (unparsed)
+        self.p2mext = None
+
+        if self.img:
+            self.open(self.img)
+
+    def open(self, img):
+        if isinstance(img, str):
+            self.fd = file(img, 'rb')
+        else:
+            self.fd = img
+
+        self.readheader()
+
+    def readheader(self):
+        sig = self.fd.read(len(SIGNATURE))
+        if sig != SIGNATURE:
+            raise VMParseException("Bad signature in image")
+
+        hlen = self.fd.read(INTLEN)
+        hlen, = struct.unpack('!i', hlen)
+
+        self.header = self.fd.read(hlen)
+        self.dom = parseheader(self.header)
+
+    def readp2mfl(self):
+        "read the P2M frame list"
+        pfnlen = self.fd.read(LONGLEN)
+        self.nr_pfns, = struct.unpack('L', pfnlen)
+        p2m0 = self.fd.read(LONGLEN)
+
+        p2mhdr = p2m0
+        p2m0, = struct.unpack('L', p2m0)
+        if p2m0 == P2M_EXT_SIG:
+            elen = self.fd.read(INTLEN)
+            elen, = struct.unpack('I', elen)
+            
+            self.p2mext = self.fd.read(elen)
+
+            p2m0 = self.fd.read(LONGLEN)
+            p2m0, = struct.unpack('L', p2m0)
+        p2mfl = [p2m0]
+
+        p2mfle = (self.nr_pfns + FPP - 1)/FPP - 1
+        p2ms = self.fd.read(LONGLEN * p2mfle)
+        p2mfl.extend(struct.unpack('%dL' % p2mfle, p2ms))
+
+        self.p2mfl = p2mfl
+
+    def flush(self):
+        self.ofd.write(self.tail)
+
+class Writer(object):
+    """compress a stream of checkpoints into a single image of the
+    last checkpoint"""
+    def __init__(self, fd, compact=False):
+        self.fd = fd
+        self.compact = compact
+
+        self.vm = None
+        self.tail = None
+        # offset to first batch of pages
+        self.imgstart = 0
+        # PFN mappings
+        self.pfns = []
+
+    def __del__(self):
+        self.close()
+
+    def writeheader(self):
+        hlen = struct.pack('!i', len(self.vm.header))
+        header = ''.join([SIGNATURE, hlen, self.vm.header])
+        self.fd.write(header)
+
+    def writep2mfl(self):
+        p2m = [struct.pack('L', self.vm.nr_pfns)]
+        if self.vm.p2mext:
+            p2m.extend([struct.pack('L', P2M_EXT_SIG), self.vm.p2mext])
+        p2m.append(struct.pack('%dL' % len(self.vm.p2mfl), *self.vm.p2mfl))
+        self.fd.write(''.join(p2m))
+
+    def writebatch(self, batch):
+        def offset(pfn):
+            isz = (pfn / BATCH_SIZE + 1) * IDXLEN
+            return self.imgstart + isz + pfn * PAGE_SIZE
+
+        if not self.compact:
+            return self.fd.write(batch)
+
+        batch = parsebatch(batch)
+        # sort pages for better disk seek behaviour
+        batch.sort(lambda x, y: cmp(x[0] & ~LTAB_MASK, y[0] & ~LTAB_MASK))
+
+        for pfndesc, page in batch:
+            pfn = pfndesc & ~LTAB_MASK
+            if pfn > self.vm.nr_pfns:
+                log.error('INVALID PFN: %d' % pfn)
+            if len(self.pfns) <= pfn:
+                self.pfns.extend([0] * (pfn - len(self.pfns) + 1))
+            self.pfns[pfn] = pfndesc
+            self.fd.seek(offset(pfn))
+            self.fd.write(page)
+
+        #print "max offset: %d, %d" % (len(self.pfns), offset(self.pfns[-1]))
+
+    def writeindex(self):
+        "Write batch header in front of each page"
+        hdrlen = INTLEN + BATCH_SIZE * LONGLEN
+        batches = (len(self.pfns) + BATCH_SIZE - 1) / BATCH_SIZE
+
+        for i in xrange(batches):
+            offset = self.imgstart + i * (hdrlen + (PAGE_SIZE * BATCH_SIZE))
+            pfnoff = i * BATCH_SIZE
+            # python auto-clamps overreads
+            pfns = self.pfns[pfnoff:pfnoff + BATCH_SIZE]
+
+            self.fd.seek(offset)
+            self.fd.write(struct.pack('i', len(pfns)))
+            self.fd.write(struct.pack('%dL' % len(pfns), *pfns))
+
+    def slurp(self, ifd):
+        """Apply an incremental checkpoint to a loaded image.
+        accepts a path or a file object."""
+        if isinstance(ifd, str):
+            ifd = file(ifd, 'rb')
+
+        if not self.vm:
+            self.vm = VMImage(ifd)
+            self.writeheader()
+
+            self.vm.readp2mfl()
+            self.writep2mfl()
+            self.imgstart = self.fd.tell()
+
+        while True:
+            l, batch = readbatch(ifd)
+            if l <= 0:
+                break
+            self.writebatch(batch)
+        self.tail = batch + ifd.read()
+
+    def flush(self):
+        if self.tail:
+            self.fd.seek(0, 2)
+            self.fd.write(self.tail)
+            if self.compact:
+                self.writeindex()
+        self.tail = None
+
+    def close(self):
+        self.flush()
+
+def parseheader(header):
+    "parses a header sexpression"
+    return vm.parsedominfo(vm.strtosxpr(header))
+
+def makeheader(dominfo):
+    "create an image header from a VM dominfo sxpr"
+    items = [SIGNATURE]
+    sxpr = vm.sxprtostr(dominfo)
+    items.append(struct.pack('!i', len(sxpr)))
+    items.append(sxpr)
+    return ''.join(items)
+
+def readbatch(fd):
+    batch = []
+    batchlen = fd.read(INTLEN)
+    batch.append(batchlen)
+    batchlen, = struct.unpack('i', batchlen)
+    log.info("batch length: %d" % batchlen)
+    if batchlen <= 0:
+        return (batchlen, batch[0])
+
+    batchfns = fd.read(LONGLEN * batchlen)
+    batch.append(batchfns)
+    pages = fd.read(PAGE_SIZE * batchlen)
+    if len(pages) != PAGE_SIZE * batchlen:
+        log.error('SHORT READ: %d' % len(pages))
+    batch.append(pages)
+
+    return (batchlen, ''.join(batch))
+
+def parsebatch(batch):
+    "parse a batch string into pages"
+    batchlen, batch = batch[:INTLEN], batch[INTLEN:]
+    batchlen, = struct.unpack('i', batchlen)
+    #print 'batch length: %d' % batchlen
+    pfnlen = batchlen * LONGLEN
+    pfns = struct.unpack('%dL' % batchlen, batch[:pfnlen])
+    pagebuf = batch[pfnlen:]
+    pages = [pagebuf[i*PAGE_SIZE:(i+1)*PAGE_SIZE] for i in xrange(batchlen)]
+    return zip(pfns, pages)
diff --git a/tools/python/xen/remus/netlink.py 
b/tools/python/xen/remus/netlink.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/netlink.py
@@ -0,0 +1,314 @@
+# netlink wrappers
+
+import socket, struct
+import xen.lowlevel.netlink
+
+NETLINK_ROUTE          = 0
+
+NLM_F_REQUEST = 1 # It is request message.
+NLM_F_MULTI   = 2 # Multipart message, terminated by NLMSG_DONE
+NLM_F_ACK     = 4 # Reply with ack, with zero or error code
+NLM_F_ECHO    = 8 # Echo this request
+
+# Modifiers to GET request
+NLM_F_ROOT   = 0x100 # specify tree root
+NLM_F_MATCH  = 0x200 # return all matching
+NLM_F_ATOMIC = 0x400 # atomic GET
+NLM_F_DUMP   = NLM_F_ROOT|NLM_F_MATCH
+
+# Modifiers to NEW request
+NLM_F_REPLACE = 0x100 # Override existing
+NLM_F_EXCL    = 0x200 # Do not touch, if it exists
+NLM_F_CREATE  = 0x400 # Create, if it does not exist
+NLM_F_APPEND  = 0x800 # Add to end of list
+
+RTM_NEWLINK  = 16
+RTM_GETLINK  = 18
+RTM_NEWQDISC = 36
+RTM_DELQDISC = 37
+RTM_GETQDISC = 38
+
+IFLA_UNSPEC    = 0
+IFLA_ADDRESS   = 1
+IFLA_BROADCAST = 2
+IFLA_IFNAME    = 3
+IFLA_MTU       = 4
+IFLA_LINK      = 5
+IFLA_QDISC     = 6
+IFLA_STATS     = 7
+IFLA_COST      = 8
+IFLA_PRIORITY  = 9
+IFLA_MASTER    = 10
+IFLA_WIRELESS  = 11
+IFLA_PROTINFO  = 12
+IFLA_TXQLEN    = 13
+IFLA_MAP       = 14
+IFLA_WEIGHT    = 15
+
+TCA_UNSPEC  = 0
+TCA_KIND    = 1
+TCA_OPTIONS = 2
+TCA_STATS   = 3
+TCA_XSTATS  = 4
+TCA_RATE    = 5
+TCA_FCNT    = 6
+TCA_STATS2  = 7
+
+class RTNLException(Exception): pass
+
+def align(l, alignto=4):
+    return (l + alignto - 1) & ~(alignto - 1)
+
+class rtattr(object):
+    "rtattribute"
+    fmt = "HH"
+    fmtlen = struct.calcsize(fmt)
+
+    def __init__(self, msg=None):
+        if msg:
+            self.unpack(msg)
+        else:
+            self.rta_len = 0
+            self.rta_type = 0
+
+            self.body = ''
+
+    def __len__(self):
+        return align(self.rta_len)
+
+    def pack(self):
+        self.rta_len = align(self.fmtlen + len(self.body))
+        s = struct.pack(self.fmt, self.rta_len, self.rta_type) + self.body
+        pad = self.rta_len - len(s)
+        if pad:
+            s += '\0' * pad
+        return s
+
+    def unpack(self, msg):
+        args = struct.unpack(self.fmt, msg[:self.fmtlen])
+        self.rta_len, self.rta_type = args
+
+        self.body = msg[align(self.fmtlen):self.rta_len]
+
+class rtattrlist(object):
+    def __init__(self, msg):
+        self.start = msg
+
+    def __iter__(self):
+        body = self.start
+        while len(body) > rtattr.fmtlen:
+            rta = rtattr(body)
+            yield rta
+            body = body[len(rta):]
+
+class nlmsg(object):
+    "netlink message header"
+    fmt = "IHHII"
+    fmtlen = struct.calcsize(fmt)
+
+    def __init__(self, msg=None):
+        if msg:
+            self.unpack(msg)
+        else:
+            self.nlmsg_len = 0
+            self.nlmsg_type = 0
+            self.nlmsg_flags = 0
+            self.nlmsg_seq = 0
+            self.nlmsg_pid = 0
+
+            self.rta = ''
+            self.body = ''
+
+    def __len__(self):
+        return align(self.fmtlen + len(self.body) + len(self.rta))
+
+    def addattr(self, type, data):
+        attr = rtattr()
+        attr.rta_type = type
+        attr.body = data
+        self.rta += attr.pack()
+
+    def settype(self, cmd):
+        self.nlmsg_type = cmd
+
+    def pack(self):
+        return struct.pack(self.fmt, len(self), self.nlmsg_type,
+                           self.nlmsg_flags, self.nlmsg_seq,
+                           self.nlmsg_pid) + self.body + self.rta
+
+    def unpack(self, msg):
+        args = struct.unpack(self.fmt, msg[:self.fmtlen])
+        self.nlmsg_len, self.nlmsg_type, self.nlmsg_flags = args[:3]
+        self.nlmsg_seq, self.nlmsg_pid = args[3:]
+
+        self.body = msg[align(self.fmtlen):]
+        self.rta = ''
+
+    def __str__(self):
+        return '<netlink message, len %d, type %d>' % \
+            (self.nlmsg_len, self.nlmsg_type)
+
+class ifinfomsg(object):
+    "interface info message"
+    fmt = "BxHiII"
+    fmtlen = struct.calcsize(fmt)
+
+    def __init__(self, msg=None):
+        if msg:
+            self.unpack(msg)
+        else:
+            self.ifi_family = 0
+            self.ifi_type = 0
+            self.ifi_index = 0
+            self.ifi_flags = 0
+            self.ifi_change = 0
+
+            self.body = ''
+
+    def unpack(self, msg):
+        args = struct.unpack(self.fmt, msg[:self.fmtlen])
+        self.ifi_family, self.ifi_type, self.ifi_index= args[:3]
+        self.ifi_flags, self.ifi_change = args[3:]
+
+        self.body = msg[align(self.fmtlen):]
+
+    def __str__(self):
+        return '<ifinfo message, family %d, type %d, index %d>' % \
+            (self.ifi_family, self.ifi_type, self.ifi_index)
+
+class tcmsg(object):
+    "TC message"
+    fmt = "BxxxiIII"
+    fmtlen = struct.calcsize(fmt)
+
+    def __init__(self, msg=None):
+        if msg:
+            self.unpack(msg)
+        else:
+            self.tcm_family = socket.AF_UNSPEC
+            self.tcm_ifindex = 0
+            self.tcm_handle = 0
+            self.tcm_parent = 0
+            self.tcm_info = 0
+
+            self.rta = ''
+
+    def unpack(self, msg):
+        args = struct.unpack(self.fmt, msg[:self.fmtlen])
+        self.tcm_family, self.tcm_ifindex, self.tcm_handle = args[:3]
+        self.tcm_parent, self.tcm_info = args[3:]
+
+        self.rta = msg[align(self.fmtlen):]
+
+    def pack(self):
+        return struct.pack(self.fmt, self.tcm_family, self.tcm_ifindex,
+                           self.tcm_handle, self.tcm_parent, self.tcm_info)
+
+    def __str__(self):
+        return '<tc message, family %d, index %d>' % \
+            (self.tcm_family, self.tcm_ifindex)
+
+class newlinkmsg(object):
+    def __init__(self, nlmsg):
+        if nlmsg.nlmsg_type != RTM_NEWLINK:
+            raise RTNLException("wrong message type")
+        self.nlmsg = nlmsg
+        self.ifi = ifinfomsg(self.nlmsg.body)
+
+        self.rtattrs = {}
+        for rta in rtattrlist(self.ifi.body):
+            self.rtattrs[rta.rta_type] = rta.body
+
+class newqdiscmsg(object):
+    def __init__(self, nlmsg):
+        if nlmsg.nlmsg_type != RTM_NEWQDISC:
+            raise RTNLException("wrong message type")
+        self.nlmsg = nlmsg
+        self.t = tcmsg(self.nlmsg.body)
+
+        self.rtattrs = {}
+        for rta in rtattrlist(self.t.rta):
+            self.rtattrs[rta.rta_type] = rta.body
+
+class rtnl(object):
+    def __init__(self):
+        self._rth = xen.lowlevel.netlink.rtnl()
+        self._linkcache = None
+
+    def getlink(self, key, cached=False):
+        """returns the interface object corresponding to the key, which
+        may be an index number or device name."""
+        if not cached:
+            self._linkcache = None
+        if self._linkcache is None:
+            self._linkcache = self.getlinks()
+
+        if isinstance(key, int):
+            return self._linkcache.get(key)
+
+        for k, v in self._linkcache.iteritems():
+            if v['name'] == key:
+                return v
+
+        return None
+
+    def getlinks(self):
+        """returns a dictionary of interfaces keyed by kernel
+        interface index"""
+        links = {}
+        def dumpfilter(addr, msgstr):
+            msg = newlinkmsg(nlmsg(msgstr))
+            idx = msg.ifi.ifi_index
+            ifname = msg.rtattrs[IFLA_IFNAME].strip('\0')
+            address = msg.rtattrs.get(IFLA_ADDRESS)
+
+            link = {'index': idx,
+                    'type': msg.ifi.ifi_type,
+                    'name': ifname,
+                    'address': address}
+            links[idx] = link
+
+        self._rth.wilddump_request(socket.AF_UNSPEC, RTM_GETLINK)
+        self._rth.dump_filter(dumpfilter)
+
+        return links
+
+    def getqdisc(self, dev):
+        """returns the queueing discipline on device dev, which may be
+        specified by kernel index or device name"""
+        qdiscs = self.getqdiscs(dev)
+        if qdiscs:
+            return qdiscs.values()[0]
+        return None
+
+    def getqdiscs(self, dev=None):
+        """returns a dictionary of queueing disciplines keyed by kernel
+        interface index"""
+        qdiscs = {}
+        def dumpfilter(addr, msgstr):
+            msg = newqdiscmsg(nlmsg(msgstr))
+            idx = msg.t.tcm_ifindex
+            handle = msg.t.tcm_handle
+            kind = msg.rtattrs[TCA_KIND].strip('\0')
+            opts = msg.rtattrs.get(TCA_OPTIONS)
+
+            qdisc = {'index': idx,
+                     'handle': handle,
+                     'kind': kind,
+                     'options': opts}
+            qdiscs[idx] = qdisc
+
+        tcm = tcmsg()
+        if dev:
+            link = self.getlink(dev)
+            if not link:
+                raise QdiscException('device %s not found' % dev)
+            tcm.tcm_ifindex = link['index']
+
+        msg = tcm.pack()
+        self._rth.dump_request(RTM_GETQDISC, msg)
+        self._rth.dump_filter(dumpfilter)
+        return qdiscs
+        
+    def talk(self, req):
+        self._rth.talk(req)
diff --git a/tools/python/xen/remus/profile.py 
b/tools/python/xen/remus/profile.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/profile.py
@@ -0,0 +1,56 @@
+"""Simple profiling module
+"""
+
+import time
+
+class ProfileBlock(object):
+    """A section of code to be profiled"""
+    def __init__(self, name):
+        self.name = name
+
+    def enter(self):
+        print "PROF: entered %s at %f" % (self.name, time.time())
+
+    def exit(self):
+        print "PROF: exited %s at %f" % (self.name, time.time())
+
+class NullProfiler(object):
+    def enter(self, name):
+        pass
+
+    def exit(self, name=None):
+        pass
+
+class Profiler(object):
+    def __init__(self):
+        self.blocks = {}
+        self.running = []
+
+    def enter(self, name):
+        try:
+            block = self.blocks[name]
+        except KeyError:
+            block = ProfileBlock(name)
+            self.blocks[name] = block
+
+        block.enter()
+        self.running.append(block)
+
+    def exit(self, name=None):
+        if name is not None:
+            block = None
+            while self.running:
+                tmp = self.running.pop()
+                if tmp.name == name:
+                    block = tmp
+                    break
+                tmp.exit()
+            if not block:
+                raise KeyError('block %s not running' % name)
+        else:
+            try:
+                block = self.running.pop()
+            except IndexError:
+                raise KeyError('no block running')
+
+        block.exit()
diff --git a/tools/python/xen/remus/qdisc.py b/tools/python/xen/remus/qdisc.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/qdisc.py
@@ -0,0 +1,178 @@
+import socket, struct
+
+import netlink
+
+qdisc_kinds = {}
+
+TC_H_ROOT = 0xFFFFFFFF
+
+class QdiscException(Exception): pass
+
+class request(object):
+    "qdisc request message"
+    def __init__(self, cmd, flags=0, dev=None, handle=0):
+        self.n = netlink.nlmsg()
+        self.t = netlink.tcmsg()
+
+        self.n.nlmsg_flags = netlink.NLM_F_REQUEST|flags
+        self.n.nlmsg_type = cmd
+        self.t.tcm_family = socket.AF_UNSPEC
+
+        if not handle:
+            handle = TC_H_ROOT
+        self.t.tcm_parent = handle
+        
+        if dev:
+            self.t.tcm_ifindex = dev
+
+    def pack(self):
+        t = self.t.pack()
+        self.n.body = t
+        return self.n.pack()
+
+class addrequest(request):
+    def __init__(self, dev, handle, qdisc):
+        flags = netlink.NLM_F_EXCL|netlink.NLM_F_CREATE
+        super(addrequest, self).__init__(netlink.RTM_NEWQDISC, flags=flags,
+                                         dev=dev, handle=handle)
+        self.n.addattr(netlink.TCA_KIND, qdisc.kind)
+        opts = qdisc.pack()
+        if opts:
+            self.n.addattr(netlink.TCA_OPTIONS, opts)
+
+class delrequest(request):
+    def __init__(self, dev, handle):
+        super(delrequest, self).__init__(netlink.RTM_DELQDISC, dev=dev,
+                                         handle=handle)
+
+class changerequest(request):
+    def __init__(self, dev, handle, qdisc):
+        super(changerequest, self).__init__(netlink.RTM_NEWQDISC,
+                                            dev=dev, handle=handle)
+        self.n.addattr(netlink.TCA_KIND, qdisc.kind)
+        opts = qdisc.pack()
+        if opts:
+            self.n.addattr(netlink.TCA_OPTIONS, opts)
+
+class Qdisc(object):
+    def __new__(cls, qdict=None, *args, **opts):
+        if qdict:
+            kind = qdict.get('kind')
+            cls = qdisc_kinds.get(kind, cls)
+        obj = super(Qdisc, cls).__new__(cls, qdict=qdict, *args, **opts)
+        return obj
+
+    def __init__(self, qdict):
+        self._qdict = qdict
+        self.kind = qdict['kind']
+        self.handle = qdict['handle'] >> 16
+
+    def parse(self, opts):
+        if opts:
+            raise QdiscException('cannot parse qdisc parameters')
+
+    def optstr(self):
+        if self.qdict['options']:
+            return '[cannot parse qdisc parameters]'
+        else:
+            return ''
+
+    def pack(self):
+        return ''
+
+TC_PRIO_MAX = 15
+class PrioQdisc(Qdisc):
+    fmt = 'i%sB' % (TC_PRIO_MAX + 1)
+
+    def __init__(self, qdict):
+        super(PrioQdisc, self).__init__(qdict)
+
+        if qdict.get('options'):
+            self.unpack(qdict['options'])
+        else:
+            self.bands = 3
+            self.priomap = [1, 2, 2, 2, 1, 2, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1]
+
+    def pack(self):
+        #return struct.pack(self.fmt, self.bands, *self.priomap)
+        return ''
+
+    def unpack(self, opts):
+        args = struct.unpack(self.fmt, opts)
+        self.bands = args[0]
+        self.priomap = args[1:]
+
+    def optstr(self):
+        mapstr = ' '.join([str(p) for p in self.priomap])
+        return 'bands %d priomap  %s' % (self.bands, mapstr)
+
+qdisc_kinds['prio'] = PrioQdisc
+qdisc_kinds['pfifo_fast'] = PrioQdisc
+
+class CfifoQdisc(Qdisc):
+    fmt = 'II'
+
+    def __init__(self, qdict):
+        super(CfifoQdisc, self).__init__(qdict)
+
+        if qdict.get('options'):
+            self.unpack(qdict['options'])
+        else:
+            self.epoch = 0
+            self.vmid = 0
+
+    def pack(self):
+        return struct.pack(self.fmt, self.epoch, self.vmid)
+
+    def unpack(self, opts):
+        self.epoch, self.vmid = struct.unpack(self.fmt, opts)
+
+    def parse(self, opts):
+        args = list(opts)
+        try:
+            while args:
+                arg = args.pop(0)
+                if arg == 'epoch':
+                    self.epoch = int(args.pop(0))
+                    continue
+                if arg.lower() == 'vmid':
+                    self.vmid = int(args.pop(0))
+                    continue
+        except Exception, inst:
+            raise QdiscException(str(inst))
+
+    def optstr(self):
+        return 'epoch %d vmID %d' % (self.epoch, self.vmid)
+
+qdisc_kinds['cfifo'] = CfifoQdisc
+
+TC_QUEUE_CHECKPOINT = 0
+TC_QUEUE_RELEASE = 1
+
+class QueueQdisc(Qdisc):
+    fmt = 'I'
+
+    def __init__(self, qdict=None):
+        if not qdict:
+            qdict = {'kind': 'queue',
+                     'handle': TC_H_ROOT}
+        super(QueueQdisc, self).__init__(qdict)
+
+        self.action = 0
+
+    def pack(self):
+        return struct.pack(self.fmt, self.action)
+
+    def parse(self, args):
+        if not args:
+            raise QdiscException('no action given')
+        arg = args[0]
+
+        if arg == 'checkpoint':
+            self.action = TC_QUEUE_CHECKPOINT
+        elif arg == 'release':
+            self.action = TC_QUEUE_RELEASE
+        else:
+            raise QdiscException('unknown action')
+
+qdisc_kinds['queue'] = QueueQdisc
diff --git a/tools/python/xen/remus/save.py b/tools/python/xen/remus/save.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/save.py
@@ -0,0 +1,172 @@
+#!/usr/bin/env python
+
+import os, select, socket, threading, time, signal, xmlrpclib
+
+from xen.xend.XendClient import server
+from xen.xend.xenstore.xswatch import xswatch
+
+import xen.lowlevel.xc
+from xen.xend.xenstore import xsutil
+xc = xen.lowlevel.xc.xc()
+
+import xen.lowlevel.checkpoint
+
+import vm, image
+
+XCFLAGS_LIVE =      1
+
+xcsave = '/usr/lib/xen/bin/xc_save'
+
+class _proxy(object):
+    "proxy simulates an object without inheritance"
+    def __init__(self, obj):
+        self._obj = obj
+
+    def __getattr__(self, name):
+        return getattr(self._obj, name)
+
+    def proxy(self, obj):
+        self._obj = obj
+
+class CheckpointError(Exception): pass
+
+class CheckpointingFile(_proxy):
+    """Tee writes into separate file objects for each round.
+    This is necessary because xc_save gets a single file descriptor
+    for the duration of checkpointing.
+    """
+    def __init__(self, path):
+        self.path = path
+
+        self.round = 0
+        self.rfd, self.wfd = os.pipe()
+        self.fd = file(path, 'wb')
+
+        # this pipe is used to notify the writer thread of checkpoints
+        self.cprfd, self.cpwfd = os.pipe()
+
+        super(CheckpointingFile, self).__init__(self.fd)
+
+        wt = threading.Thread(target=self._wrthread, name='disk-write-thread')
+        wt.setDaemon(True)
+        wt.start()
+        self.wt = wt
+
+    def fileno(self):
+        return self.wfd
+
+    def close(self):
+        os.close(self.wfd)
+        # closing wfd should signal writer to stop
+        self.wt.join()
+        os.close(self.rfd)
+        os.close(self.cprfd)
+        os.close(self.cpwfd)
+        self.fd.close()
+        self.wt = None
+
+    def checkpoint(self):
+        os.write(self.cpwfd, '1')
+
+    def _wrthread(self):
+        while True:
+            r, o, e = select.select((self.rfd, self.cprfd), (), ())
+            if self.rfd in r:
+                data = os.read(self.rfd, 256 * 1024)
+                if not data:
+                    break
+                self.fd.write(data)
+            if self.cprfd in r:
+                junk = os.read(self.cprfd, 1)
+                self.round += 1
+                self.fd = file('%s.%d' % (self.path, self.round), 'wb')
+                self.proxy(self.fd)
+
+class MigrationSocket(_proxy):
+    def __init__(self, address):
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.connect(address)
+
+        sock.send("receive\n")
+        sock.recv(80)
+
+        fd = os.fdopen(sock.fileno(), 'w+')
+
+        self.sock = sock
+        super(MigrationSocket, self).__init__(fd)
+
+class Keepalive(object):
+    "Call a keepalive method at intervals"
+    def __init__(self, method, interval=0.1):
+        self.keepalive = method
+        self.interval = interval
+
+        self.thread = None
+        self.running = False
+
+    def start(self):
+        if not self.interval:
+            return
+        self.thread = threading.Thread(target=self.run, 
name='keepalive-thread')
+        self.thread.setDaemon(True)
+        self.running = True
+        self.thread.start()
+
+    def stop(self):
+        if not self.thread:
+            return
+        self.running = False
+        self.thread.join()
+        self.thread = None
+
+    def run(self):
+        while self.running:
+            self.keepalive()
+            time.sleep(self.interval)
+        self.keepalive(stop=True)
+
+class Saver(object):
+    def __init__(self, domid, fd, suspendcb=None, resumecb=None,
+                 checkpointcb=None, interval=0):
+        """Create a Saver object for taking guest checkpoints.
+        domid:        name, number or UUID of a running domain
+        fd:           a stream to which checkpoint data will be written.
+        suspendcb:    callback invoked after guest is suspended
+        resumecb:     callback invoked before guest resumes
+        checkpointcb: callback invoked when a checkpoint is complete. Return
+                      True to take another checkpoint, or False to stop.
+        """
+        self.fd = fd
+        self.suspendcb = suspendcb
+        self.resumecb = resumecb
+        self.checkpointcb = checkpointcb
+        self.interval = interval
+
+        self.vm = vm.VM(domid)
+        
+        self.checkpointer = None
+
+    def start(self):
+        vm.getshadowmem(self.vm)
+
+        hdr = image.makeheader(self.vm.dominfo)
+        self.fd.write(hdr)
+        self.fd.flush()
+
+        self.checkpointer = xen.lowlevel.checkpoint.checkpointer()
+        try:
+            self.checkpointer.open(self.vm.domid)
+            self.checkpointer.start(self.fd, self.suspendcb, self.resumecb,
+                                    self.checkpointcb, self.interval)
+            self.checkpointer.close()
+        except xen.lowlevel.checkpoint.error, e:
+            raise CheckpointError(e)
+
+    def _resume(self):
+        """low-overhead version of XendDomainInfo.resumeDomain"""
+        # TODO: currently assumes SUSPEND_CANCEL is available
+        if True:
+            xc.domain_resume(self.vm.domid, 1)
+            xsutil.ResumeDomain(self.vm.domid)
+        else:
+            server.xend.domain.resumeDomain(self.vm.domid)
diff --git a/tools/python/xen/remus/tapdisk.py 
b/tools/python/xen/remus/tapdisk.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/tapdisk.py
@@ -0,0 +1,4 @@
+import blkdev
+
+class TapDisk(BlkDev):
+    pass
diff --git a/tools/python/xen/remus/util.py b/tools/python/xen/remus/util.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/util.py
@@ -0,0 +1,31 @@
+# utility functions
+
+import os, subprocess
+
+class PipeException(Exception):
+    def __init__(self, message, errno):
+        self.errno = errno
+        message = '%s: %d, %s' % (message, errno, os.strerror(errno))
+        Exception.__init__(self, message)
+
+def canonifymac(mac):
+    return ':'.join(['%02x' % int(field, 16) for field in mac.split(':')])
+
+def runcmd(args, cwd=None):
+    # TODO: stdin handling
+    if type(args) == str:
+        args = args.split(' ')
+    try:
+        proc = subprocess.Popen(args, stdout=subprocess.PIPE,
+                                stderr=subprocess.PIPE, close_fds=True,
+                                cwd=cwd)
+        stdout = proc.stdout.read()
+        stderr = proc.stderr.read()
+        proc.wait()
+        if proc.returncode:
+            print ' '.join(args)
+            print stderr.strip()
+            raise PipeException('%s failed' % args[0], proc.returncode)
+        return stdout
+    except (OSError, IOError), inst:
+        raise PipeException('could not run %s' % args[0], inst.errno)
diff --git a/tools/python/xen/remus/vbd.py b/tools/python/xen/remus/vbd.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/vbd.py
@@ -0,0 +1,9 @@
+import blkdev
+
+class VBD(blkdev.BlkDev):
+    def handles(self, **props):
+        uname = props.get('uname', '')
+        return uname.startswith('phy:')
+    handles = classmethod(handles)
+
+blkdev.register(VBD)
diff --git a/tools/python/xen/remus/vdi.py b/tools/python/xen/remus/vdi.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/vdi.py
@@ -0,0 +1,121 @@
+#code to play with vdis and snapshots
+
+import os
+
+def run(cmd):
+    fd = os.popen(cmd)
+    res = [l for l in fd if l.rstrip()]
+    return not fd.close(), res
+
+
+_blockstore = '/blockstore.dat'
+
+def set_blockstore(blockstore):
+    global _blockstore
+    __blockstore = blockstore
+
+
+class SnapShot:
+    def __init__(self, vdi, block, index):
+       self.__vdi = vdi
+       self.__block = block
+       self.__index = index
+    
+       #TODO add snapshot date and radix
+
+    def __str__(self):
+       return '%d %d %d' % (self.__vdi.id(), self.__block, self.__index)
+
+    def vdi(self):
+       return self.__vdi
+
+    def block(self):
+       return self.__block
+
+    def index(self):
+       return self.__index
+
+    def match(self, block, index):
+       return self.__block == block and self.__index == index
+
+
+class VDIException(Exception):
+       pass
+
+
+class VDI:
+    def __init__(self, id, name):
+       self.__id = id
+       self.__name = name
+
+    def __str__(self):
+       return 'vdi: %d %s' % (self.__id, self.__name)
+
+    def id(self):
+       return self.__id
+
+    def name(self):
+       return self.__name
+
+    def list_snapshots(self):
+       res, ls = run('vdi_snap_list %s %d' % (_blockstore, self.__id))
+       if res:
+           return [SnapShot(self, int(l[0]), int(l[1])) for l in [l.split() 
for l in ls[1:]]]
+       else:
+           raise VDIException("Error reading snapshot list")
+
+    def snapshot(self):
+       res, ls = run('vdi_checkpoint %s %d' % (_blockstore, self.__id))
+       if res:
+           _, block, idx = ls[0].split()
+           return SnapShot(self, int(block), int(idx))
+       else:
+           raise VDIException("Error taking vdi snapshot")
+
+
+def create(name, snap):
+    res, _ = run('vdi_create %s %s %d %d' 
+                % (_blockstore, name, snap.block(), snap.index()))
+    if res:
+       return lookup_by_name(name)
+    else:
+       raise VDIException('Unable to create vdi from snapshot')
+
+
+def fill(name, img_file):
+    res, _ = run('vdi_create %s %s' % (_blockstore, name))
+
+    if res:
+       vdi = lookup_by_name(name)
+       res, _ = run('vdi_fill %d %s' % (vdi.id(), img_file))
+       if res:
+           return vdi
+    raise VDIException('Unable to create vdi from disk img file')
+
+
+def list_vdis():
+    vdis = []
+    res, lines = run('vdi_list %s' % _blockstore)
+    if res:
+       for l in lines:
+           r = l.split()
+           vdis.append(VDI(int(r[0]), r[1]))
+       return vdis
+    else:
+       raise VDIException("Error doing vdi list")
+
+
+def lookup_by_id(id):
+    vdis = list_vdis()
+    for v in vdis:
+       if v.id() == id:
+           return v
+    raise VDIException("No match from vdi id")
+
+
+def lookup_by_name(name):
+    vdis = list_vdis()
+    for v in vdis:
+       if v.name() == name:
+           return v
+    raise VDIException("No match for vdi name")        
diff --git a/tools/python/xen/remus/vif.py b/tools/python/xen/remus/vif.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/vif.py
@@ -0,0 +1,14 @@
+from xen.remus.util import canonifymac
+
+class VIF(object):
+    def __init__(self, **props):
+        self.__dict__.update(props)
+        if 'mac' in props:
+            self.mac = canonifymac(props['mac'])
+
+    def __str__(self):
+        return self.mac
+
+def parse(props):
+    "turn a vm device dictionary into a vif object"
+    return VIF(**props)
diff --git a/tools/python/xen/remus/vm.py b/tools/python/xen/remus/vm.py
new file mode 100644
--- /dev/null
+++ b/tools/python/xen/remus/vm.py
@@ -0,0 +1,156 @@
+#!/usr/bin/env python
+
+import xmlrpclib
+
+from xen.xend.XendClient import server
+from xen.xend import sxp
+# XXX XendDomain is voodoo to let balloon import succeed
+from xen.xend import XendDomain, balloon
+
+import vif
+import blkdev
+# need a nicer way to load disk drivers
+import vbd
+
+class VMException(Exception): pass
+
+class VM(object):
+    "Representation of a virtual machine"
+    def __init__(self, domid=None, dominfo=None):
+        self.dominfo = dominfo
+
+        self.domid = -1
+        self.name = 'unknown'
+        self.dom = {}
+        self.disks = []
+        self.vifs = []
+
+        if domid:
+            try:
+                self.dominfo = server.xend.domain(domid, 'all')
+            except xmlrpclib.Fault:
+                raise VMException('error looking up domain %s' % str(domid))
+
+        if self.dominfo:
+            self.loaddominfo()
+
+    def loaddominfo(self):
+        self.dom = parsedominfo(self.dominfo)
+        self.domid = self.dom['domid']
+        self.name = self.dom['name']
+
+        self.disks = getdisks(self.dom)
+        self.vifs = getvifs(self.dom)
+
+    def __str__(self):
+        return 'VM %d (%s), MACs: [%s], disks: [%s]' % \
+               (self.domid, self.name, self.epoch, ', '.join(self.macs),
+                ', '.join([str(d) for d in self.disks]))
+
+def parsedominfo(dominfo):
+    "parses a dominfo sexpression in the form of python lists of lists"
+    def s2d(s):
+        r = {}
+        for elem in s:
+            if len(elem) == 0:
+                continue 
+            name = elem[0]
+            if len(elem) == 1:
+                val = None
+            else:
+                val = elem[1]
+            if isinstance(val, list):
+                val = s2d(elem[1:])
+            if isinstance(name, list):
+                # hack for ['cpus', [[1]]]
+                return s2d(elem)
+            if name in r:
+                for k, v in val.iteritems():
+                    if k in r[name]:
+                        if not isinstance(r[name][k], list):
+                            r[name][k] = [r[name][k]]
+                        r[name][k].append(v)
+                    else:
+                        r[name][k] = v
+            else:
+                r[name] = val
+        return r
+
+    return s2d(dominfo[1:])
+
+def domtosxpr(dom):
+    "convert a dominfo into a python sxpr"
+    def d2s(d):
+        r = []
+        for k, v in d.iteritems():
+            elem = [k]
+            if isinstance(v, dict):
+                elem.extend(d2s(v))
+            else:
+                if v is None:
+                    v = ''
+                elem.append(v)
+            r.append(elem)
+        return r
+
+    sxpr = ['domain']
+    sxpr.extend(d2s(dom))
+    return sxpr
+
+def strtosxpr(s):
+    "convert a string to a python sxpr"
+    p = sxp.Parser()
+    p.input(s)
+    return p.get_val()
+
+def sxprtostr(sxpr):
+    "convert an sxpr to string"
+    return sxp.to_string(sxpr)
+
+def getvifs(dom):
+    "return vif objects for devices in dom"
+    vifs = dom['device'].get('vif', [])
+    if type(vifs) != list:
+        vifs = [vifs]
+
+    return [vif.parse(v) for v in vifs]
+
+def getdisks(dom):
+    "return block device objects for devices in dom"
+    disks = dom['device'].get('vbd', [])
+    if type(disks) != list:
+        disks = [disks]
+    
+    # tapdisk1 devices
+    tap1s = dom['device'].get('tap', [])
+    if type(tap1s) != list:
+        disks.append(tap1s)
+    else:
+        disks.extend(tap1s)
+
+    # tapdisk2 devices
+    tap2s = dom['device'].get('tap2', [])
+    if type(tap2s) != list:
+        disks.append(tap2s)
+    else:
+        disks.extend(tap2s)
+
+    return [blkdev.parse(disk) for disk in disks]
+
+def fromxend(domid):
+    "create a VM object from xend information"
+    return VM(domid)
+
+def getshadowmem(vm):
+    "Balloon down domain0 to create free memory for shadow paging."
+    maxmem = int(vm.dom['maxmem'])
+    shadow = int(vm.dom['shadow_memory'])
+    vcpus = int(vm.dom['vcpus'])
+
+    # from XendDomainInfo.checkLiveMigrateMemory:
+    # 1MB per vcpu plus 4Kib/Mib of RAM.  This is higher than 
+    # the minimum that Xen would allocate if no value were given.
+    needed = vcpus * 1024 + maxmem * 4 - shadow * 1024
+    if needed > 0:
+        print "Freeing %d kB for shadow mode" % needed
+        balloon.free(needed, vm.dominfo)

_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-devel