This is an archived copy of the Xen.org mailing list, which we have preserved to ensure that existing links to archives are not broken. The live archive, which contains the latest emails, can be found at http://lists.xen.org/
Home Products Support Community News


[Xen-changelog] [xen-unstable] Remus: add control script to activate rem

To: xen-changelog@xxxxxxxxxxxxxxxxxxx
Subject: [Xen-changelog] [xen-unstable] Remus: add control script to activate remus on a VM
From: Xen patchbot-unstable <patchbot-unstable@xxxxxxxxxxxxxxxxxxx>
Date: Fri, 13 Nov 2009 07:50:23 -0800
Delivery-date: Fri, 13 Nov 2009 07:50:53 -0800
Envelope-to: www-data@xxxxxxxxxxxxxxxxxxx
List-help: <mailto:xen-changelog-request@lists.xensource.com?subject=help>
List-id: BK change log <xen-changelog.lists.xensource.com>
List-post: <mailto:xen-changelog@lists.xensource.com>
List-subscribe: <http://lists.xensource.com/mailman/listinfo/xen-changelog>, <mailto:xen-changelog-request@lists.xensource.com?subject=subscribe>
List-unsubscribe: <http://lists.xensource.com/mailman/listinfo/xen-changelog>, <mailto:xen-changelog-request@lists.xensource.com?subject=unsubscribe>
Reply-to: xen-devel@xxxxxxxxxxxxxxxxxxx
Sender: xen-changelog-bounces@xxxxxxxxxxxxxxxxxxx
# HG changeset patch
# User Keir Fraser <keir.fraser@xxxxxxxxxx>
# Date 1258126443 0
# Node ID d92fdc31444ce3ca68d7a1b7ab8e6c924e28bf20
# Parent  ea0e302362bb49c679e203bc7f0d8c9165c6f9d9
Remus: add control script to activate remus on a VM

Signed-off-by: Brendan Cully <brendan@xxxxxxxxx>
 tools/Makefile       |    1 
 tools/remus/Makefile |   20 ++
 tools/remus/README   |    4 
 tools/remus/remus    |  362 +++++++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 387 insertions(+)

diff -r ea0e302362bb -r d92fdc31444c tools/Makefile
--- a/tools/Makefile    Fri Nov 13 15:33:37 2009 +0000
+++ b/tools/Makefile    Fri Nov 13 15:34:03 2009 +0000
@@ -33,6 +33,7 @@ SUBDIRS-$(CONFIG_IOEMU) += ioemu-dir
 SUBDIRS-$(CONFIG_IOEMU) += ioemu-dir
 SUBDIRS-y += xenpmd
 SUBDIRS-y += libxl
+SUBDIRS-y += remus
 # These don't cross-compile
diff -r ea0e302362bb -r d92fdc31444c tools/remus/Makefile
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/remus/Makefile      Fri Nov 13 15:34:03 2009 +0000
@@ -0,0 +1,20 @@
+include $(XEN_ROOT)/tools/Rules.mk
+SCRIPTS = remus
+.PHONY: all
+all: build
+.PHONY: build
+       echo "Nothing to do"
+.PHONY: install
+.PHONY: clean
+       echo "Nothing to do"
diff -r ea0e302362bb -r d92fdc31444c tools/remus/README
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/remus/README        Fri Nov 13 15:34:03 2009 +0000
@@ -0,0 +1,4 @@
+Remus provides fault tolerance for virtual machines by sending continuous
+checkpoints to a backup, which will activate if the target VM fails.
+See the website at http://nss.cs.ubc.ca/remus/ for details.
diff -r ea0e302362bb -r d92fdc31444c tools/remus/remus
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/remus/remus Fri Nov 13 15:34:03 2009 +0000
@@ -0,0 +1,362 @@
+#!/usr/bin/env python
+# This is a save process which also buffers outgoing I/O between
+# rounds, so that external viewers never see anything that hasn't
+# been committed at the backup
+# TODO: fencing.
+import optparse, os, re, select, signal, sys, time
+from xen.remus import save, vm
+from xen.xend import XendOptions
+from xen.remus import netlink, qdisc, util
+class CfgException(Exception): pass
+class Cfg(object):
+    def __init__(self):
+        # must be set
+        self.domid = 0
+        self.host = 'localhost'
+        self.port = XendOptions.instance().get_xend_relocation_port()
+        self.interval = 200
+        self.netbuffer = True
+        self.nobackup = False
+        self.timer = False
+        parser = optparse.OptionParser()
+        parser.usage = '%prog [options] domain [destination]'
+        parser.add_option('-i', '--interval', dest='interval', type='int',
+                          metavar='MS',
+                          help='checkpoint every MS milliseconds')
+        parser.add_option('-p', '--port', dest='port', type='int',
+                          help='send stream to port PORT', metavar='PORT')
+        parser.add_option('', '--no-net', dest='nonet', action='store_true',
+                          help='run without net buffering (benchmark option)')
+        parser.add_option('', '--timer', dest='timer', action='store_true',
+                          help='force pause at checkpoint interval 
+        parser.add_option('', '--no-backup', dest='nobackup',
+                          action='store_true',
+                          help='prevent backup from starting up (benchmark '
+                          'option)')
+        self.parser = parser
+    def usage(self):
+        self.parser.print_help()
+    def getargs(self):
+        opts, args = self.parser.parse_args()
+        if opts.interval:
+            self.interval = opts.interval
+        if opts.port:
+            self.port = opts.port
+        if opts.nonet:
+            self.netbuffer = False
+        if opts.timer:
+            self.timer = True
+        if not args:
+            raise CfgException('Missing domain')
+        self.domid = args[0]
+        if (len(args) > 1):
+            self.host = args[1]
+class ReplicatedDiskException(Exception): pass
+class BufferedDevice(object):
+    'Base class for buffered devices'
+    def postsuspend(self):
+        'called after guest has suspended'
+        pass
+    def preresume(self):
+        'called before guest resumes'
+        pass
+    def commit(self):
+        'called when backup has acknowledged checkpoint reception'
+        pass
+class ReplicatedDisk(BufferedDevice):
+    """
+    Send a checkpoint message to a replicated disk while the domain
+    is paused between epochs.
+    """
+    FIFODIR = '/var/run/tap'
+    def __init__(self, disk):
+        # look up disk, make sure it is tap:buffer, and set up socket
+        # to request commits.
+        self.ctlfd = None
+        if not disk.uname.startswith('tap:remus:') and not 
+            raise ReplicatedDiskException('Disk is not replicated: %s' %
+                                        str(disk))
+        fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', 
+        absfifo = os.path.join(self.FIFODIR, fifo)
+        absmsgfifo = absfifo + '.msg'
+        self.installed = False
+        self.ctlfd = open(absfifo, 'w+b')
+        self.msgfd = open(absmsgfifo, 'r+b')
+    def __del__(self):
+        self.uninstall()
+    def setup(self):
+        #self.ctlfd.write('buffer')
+        #self.ctlfd.flush()
+        self.installed = True
+    def uninstall(self):
+        if self.ctlfd:
+            self.ctlfd.close()
+            self.ctlfd = None
+    def postsuspend(self):
+        if not self.installed:
+            self.setup()
+        os.write(self.ctlfd.fileno(), 'flush')
+    def commit(self):
+        msg = os.read(self.msgfd.fileno(), 4)
+        if msg != 'done':
+            print 'Unknown message: %s' % msg
+class NetbufferException(Exception): pass
+class Netbuffer(BufferedDevice):
+    """
+    Buffer a protected domain's network output between rounds so that
+    nothing is issued that a failover might not know about.
+    """
+    # shared rtnetlink handle
+    rth = None
+    def __init__(self, domid):
+        self.installed = False
+        if not self.rth:
+            self.rth = netlink.rtnl()
+        self.devname = self._startimq(domid)
+        dev = self.rth.getlink(self.devname)
+        if not dev:
+            raise NetbufferException('could not find device %s' % self.devname)
+        self.dev = dev['index']
+        self.handle = qdisc.TC_H_ROOT
+        self.q = qdisc.QueueQdisc()
+    def __del__(self):
+        self.uninstall()
+    def postsuspend(self):
+        if not self.installed:
+            self._setup()
+        self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT)
+    def commit(self):
+        '''Called when checkpoint has been acknowledged by
+        the backup'''
+        self._sendqmsg(qdisc.TC_QUEUE_RELEASE)
+    def _sendqmsg(self, action):
+        self.q.action = action
+        req = qdisc.changerequest(self.dev, self.handle, self.q)
+        self.rth.talk(req.pack())
+    def _setup(self):
+        q = self.rth.getqdisc(self.dev)
+        if q:
+            if q['kind'] == 'queue':
+                self.installed = True
+                return
+            if q['kind'] != 'pfifo_fast':
+                raise NetbufferException('there is already a queueing '
+                                         'discipline on %s' % self.devname)
+        print 'installing buffer on %s' % self.devname
+        req = qdisc.addrequest(self.dev, self.handle, self.q)
+        self.rth.talk(req.pack())
+        self.installed = True
+    def uninstall(self):
+        if self.installed:
+            req = qdisc.delrequest(self.dev, self.handle)
+            self.rth.talk(req.pack())
+            self.installed = False
+    def _startimq(self, domid):
+        # stopgap hack to set up IMQ for an interface. Wrong in many ways.
+        imqebt = '/usr/lib/xen/bin/imqebt'
+        imqdev = 'imq0'
+        vid = 'vif%d.0' % domid
+        for mod in ['sch_queue', 'imq', 'ebt_imq']:
+            util.runcmd(['modprobe', mod])
+        util.runcmd("ip link set %s up" % (imqdev))
+        util.runcmd("ebtables -F FORWARD")
+        util.runcmd("ebtables -A FORWARD -i %s -j imq --todev %s" % (vid, 
+        return imqdev
+class SignalException(Exception): pass
+def run(cfg):
+    closure = lambda: None
+    closure.cmd = None
+    def sigexception(signo, frame):
+        raise SignalException(signo)
+    def die():
+        # I am not sure what the best way to die is. xm destroy is another 
+        # or we could attempt to trigger some instant reboot.
+        print "dying..."
+        print util.runcmd(['sudo', 'ifdown', 'eth2'])
+        # dangling imq0 handle on vif locks up the system
+        for buf in bufs:
+            buf.uninstall()
+        print util.runcmd(['sudo', 'xm', 'destroy', cfg.domid])
+        print util.runcmd(['sudo', 'ifup', 'eth2'])
+    def getcommand():
+        """Get a command to execute while running.
+        Commands include:
+          s: die prior to postsuspend hook
+          s2: die after postsuspend hook
+          r: die prior to preresume hook
+          r2: die after preresume hook
+          c: die prior to commit hook
+          c2: die after commit hook
+          """
+        r, w, x = select.select([sys.stdin], [], [], 0)
+        if sys.stdin not in r:
+            return
+        cmd = sys.stdin.readline().strip()
+        if cmd not in ('s', 's2', 'r', 'r2', 'c', 'c2'):
+            print "unknown command: %s" % cmd
+        closure.cmd = cmd
+    signal.signal(signal.SIGTERM, sigexception)
+    dom = vm.VM(cfg.domid)
+    # set up I/O buffers
+    bufs = []
+    # disks must commit before network can be released
+    for disk in dom.disks:
+        try:
+            bufs.append(ReplicatedDisk(disk))
+        except ReplicatedDiskException, e:
+            print e
+            continue
+    if cfg.netbuffer:
+        for vif in dom.vifs:
+            bufs.append(Netbuffer(dom.domid))
+    fd = save.MigrationSocket((cfg.host, cfg.port))
+    def postsuspend():
+        'Begin external checkpointing after domain has paused'
+        if not cfg.timer:
+            # when not using a timer thread, sleep until now + interval
+            closure.starttime = time.time()
+        if closure.cmd == 's':
+            die()
+        for buf in bufs:
+            buf.postsuspend()
+        if closure.cmd == 's2':
+            die()
+    def preresume():
+        'Complete external checkpointing before domain resumes'
+        if closure.cmd == 'r':
+            die()
+        for buf in bufs:
+            buf.preresume()
+        if closure.cmd == 'r2':
+            die()
+    def commit():
+        'commit network buffer'
+        if closure.cmd == 'c':
+            die()
+        print >> sys.stderr, "PROF: flushed memory at %0.6f" % (time.time())
+        for buf in bufs:
+            buf.commit()
+        if closure.cmd == 'c2':
+            die()
+        # Since the domain is running at this point, it's a good time to
+        # check for control channel commands
+        getcommand()
+        if not cfg.timer:
+            endtime = time.time()
+            elapsed = (endtime - closure.starttime) * 1000
+            if elapsed < cfg.interval:
+                time.sleep((cfg.interval - elapsed) / 1000.0)
+        # False ends checkpointing
+        return True
+    if cfg.timer:
+        interval = cfg.interval
+    else:
+        interval = 0
+    rc = 0
+    checkpointer = save.Saver(cfg.domid, fd, postsuspend, preresume, commit,
+                              interval)
+    try:
+        checkpointer.start()
+    except save.CheckpointError, e:
+        print e
+        rc = 1
+    except KeyboardInterrupt:
+        pass
+    except SignalException:
+        print '*** signalled ***'
+    for buf in bufs:
+        buf.uninstall()
+    if cfg.nobackup:
+        # lame attempt to kill backup if protection is stopped deliberately.
+        # It would be much better to move this into the heartbeat "protocol".
+        print util.runcmd(['sudo', '-u', os.getlogin(), 'ssh', cfg.host, 
'sudo', 'xm', 'destroy', dom.name])
+    sys.exit(rc)
+cfg = Cfg()
+    cfg.getargs()
+except CfgException, inst:
+    print str(inst)
+    cfg.usage()
+    sys.exit(1)
+    run(cfg)
+except vm.VMException, inst:
+    print str(inst)
+    sys.exit(1)

Xen-changelog mailing list

<Prev in Thread] Current Thread [Next in Thread>
  • [Xen-changelog] [xen-unstable] Remus: add control script to activate remus on a VM, Xen patchbot-unstable <=