# HG changeset patch
# User Keir Fraser <keir.fraser@xxxxxxxxxx>
# Date 1272962165 -3600
# Node ID 0f403a63ef6bdba3867c77bfb0da35d2e386d192
# Parent ca9519f09563566977d8009b347b93cd01d9089f
Remus: use IFB for net buffer on newer kernels
IMQ does not work with ebtables on 2.6.31, and IFB is not a
third-party patch.
Signed-off-by: Brendan Cully <brendan@xxxxxxxxx>
---
tools/python/xen/remus/device.py | 280 +++++++++++++++++++++++++++++++++------
tools/remus/remus | 5
2 files changed, 243 insertions(+), 42 deletions(-)
diff -r ca9519f09563 -r 0f403a63ef6b tools/python/xen/remus/device.py
--- a/tools/python/xen/remus/device.py Tue May 04 09:35:42 2010 +0100
+++ b/tools/python/xen/remus/device.py Tue May 04 09:36:05 2010 +0100
@@ -6,6 +6,9 @@ import os
import netlink, qdisc, util
+class ReplicatedDiskException(Exception): pass
+class BufferedNICException(Exception): pass
+
class CheckpointedDevice(object):
'Base class for buffered devices'
@@ -20,8 +23,6 @@ class CheckpointedDevice(object):
def commit(self):
'called when backup has acknowledged checkpoint reception'
pass
-
-class ReplicatedDiskException(Exception): pass
class ReplicatedDisk(CheckpointedDevice):
"""
@@ -62,36 +63,223 @@ class ReplicatedDisk(CheckpointedDevice)
if msg != 'done':
print 'Unknown message: %s' % msg
-class BufferedNICException(Exception): pass
+### Network
+
+# shared rtnl handle
+_rth = None
+def getrth():
+ global _rth
+
+ if not _rth:
+ _rth = netlink.rtnl()
+ return _rth
+
+class Netbuf(object):
+ "Proxy for netdev with a queueing discipline"
+
+ @staticmethod
+ def devclass():
+ "returns the name of this device class"
+ return 'unknown'
+
+ @classmethod
+ def available(cls):
+ "returns True if this module can proxy the device"
+ return cls._hasdev(cls.devclass())
+
+ def __init__(self, devname):
+ self.devname = devname
+ self.vif = None
+
+ # override in subclasses
+ def install(self, vif):
+ "set up proxy on device"
+ raise BufferedNICException('unimplemented')
+
+ def uninstall(self):
+ "remove proxy on device"
+ raise BufferedNICException('unimplemented')
+
+ # protected
+ @staticmethod
+ def _hasdev(devclass):
+ """check for existence of device, attempting to load kernel
+ module if not present"""
+ devname = '%s0' % devclass
+ rth = getrth()
+
+ if rth.getlink(devname):
+ return True
+ if util.modprobe(devclass) and rth.getlink(devname):
+ return True
+
+ return False
+
+class IFBBuffer(Netbuf):
+ """Capture packets arriving on a VIF using an ingress filter and tc
+ mirred action to forward them to an IFB device.
+ """
+
+ @staticmethod
+ def devclass():
+ return 'ifb'
+
+ def install(self, vif):
+ self.vif = vif
+ # voodoo from
http://www.linuxfoundation.org/collaborate/workgroups/networking/ifb#Typical_Usage
+ util.runcmd('ip link set %s up' % self.devname)
+ util.runcmd('tc qdisc add dev %s ingress' % vif.dev)
+ util.runcmd('tc filter add dev %s parent ffff: proto ip pref 10 '
+ 'u32 match u32 0 0 action mirred egress redirect '
+ 'dev %s' % (vif.dev, self.devname))
+
+ def uninstall(self):
+ util.runcmd('tc filter del dev %s parent ffff: proto ip pref 10 u32' \
+ % self.vif.dev)
+ util.runcmd('tc qdisc del dev %s ingress' % self.vif.dev)
+ util.runcmd('ip link set %s down' % self.devname)
+
+class IMQBuffer(Netbuf):
+ """Redirect packets coming in on vif to an IMQ device."""
+
+ imqebt = '/usr/lib/xen/bin/imqebt'
+
+ @staticmethod
+ def devclass():
+ return 'imq'
+
+ def install(self, vif):
+ # stopgap hack to set up IMQ for an interface. Wrong in many ways.
+ self.vif = vif
+
+ for mod in ['imq', 'ebt_imq']:
+ util.runcmd(['modprobe', mod])
+ util.runcmd("ip link set %s up" % self.devname)
+ util.runcmd("%s -F FORWARD" % self.imqebt)
+ util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (self.imqebt,
vif.dev, self.devname))
+
+ def uninstall(self):
+ util.runcmd("%s -F FORWARD" % self.imqebt)
+ util.runcmd('ip link set %s down' % self.devname)
+
+# in order of desirability
+netbuftypes = [IFBBuffer, IMQBuffer]
+
+def selectnetbuf():
+ "Find the best available buffer type"
+ for driver in netbuftypes:
+ if driver.available():
+ return driver
+
+ raise BufferedNICException('no net buffer available')
+
+class Netbufpool(object):
+ """Allocates/releases proxy netdevs (IMQ/IFB)
+
+ A file contains a list of entries of the form <pid>:<device>\n
+ To allocate a device, lock the file, then claim a new device if
+ one is free. If there are no free devices, check each PID for liveness
+ and take a device if the PID is dead, otherwise return failure.
+ Add an entry to the file before releasing the lock.
+ """
+ def __init__(self, netbufclass):
+ "Create a pool of Device"
+ self.netbufclass = netbufclass
+ self.path = '/var/run/remus/' + self.netbufclass.devclass()
+
+ self.devices = self.getdevs()
+
+ pooldir = os.path.dirname(self.path)
+ if not os.path.exists(pooldir):
+ os.makedirs(pooldir, 0755)
+
+ def get(self):
+ "allocate a free device"
+ def getfreedev(table):
+ for dev in self.devices:
+ if dev not in table or not util.checkpid(table[dev]):
+ return dev
+
+ return None
+
+ lock = util.Lock(self.path)
+ table = self.load()
+
+ dev = getfreedev(table)
+ if not dev:
+ lock.unlock()
+ raise BufferedNICException('no free devices')
+ dev = self.netbufclass(dev)
+
+ table[dev.devname] = os.getpid()
+
+ self.save(table)
+ lock.unlock()
+
+ return dev
+
+ def put(self, dev):
+ "release claim on device"
+ lock = util.Lock(self.path)
+ table = self.load()
+
+ del table[dev.devname]
+
+ self.save(table)
+ lock.unlock()
+
+ # private
+ def load(self):
+ """load and parse allocation table"""
+ table = {}
+ if not os.path.exists(self.path):
+ return table
+
+ fd = open(self.path)
+ for line in fd.readlines():
+ iface, pid = line.strip().split()
+ table[iface] = int(pid)
+ fd.close()
+ return table
+
+ def save(self, table):
+ """write table to disk"""
+ lines = ['%s %d\n' % (iface, table[iface]) for iface in sorted(table)]
+ fd = open(self.path, 'w')
+ fd.writelines(lines)
+ fd.close()
+
+ def getdevs(self):
+ """find all available devices of our device type"""
+ ifaces = []
+ for line in util.runcmd('ifconfig -a -s').splitlines():
+ iface = line.split()[0]
+ if iface.startswith(self.netbufclass.devclass()):
+ ifaces.append(iface)
+
+ return ifaces
class BufferedNIC(CheckpointedDevice):
"""
Buffer a protected domain's network output between rounds so that
nothing is issued that a failover might not know about.
"""
- # shared rtnetlink handle
- rth = None
-
- def __init__(self, domid):
+
+ def __init__(self, vif):
self.installed = False
-
- if not self.rth:
- self.rth = netlink.rtnl()
-
- self.devname = self._startimq(domid)
- dev = self.rth.getlink(self.devname)
- if not dev:
- raise BufferedNICException('could not find device %s' %
self.devname)
- self.dev = dev['index']
- self.handle = qdisc.TC_H_ROOT
- self.q = qdisc.QueueQdisc()
+ self.vif = vif
+
+ self.pool = Netbufpool(selectnetbuf())
+ self.rth = getrth()
+
+ self.setup()
def __del__(self):
self.uninstall()
def postsuspend(self):
if not self.installed:
- self._setup()
+ self.install()
self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT)
@@ -100,41 +288,53 @@ class BufferedNIC(CheckpointedDevice):
the backup'''
self._sendqmsg(qdisc.TC_QUEUE_RELEASE)
+ # private
def _sendqmsg(self, action):
self.q.action = action
- req = qdisc.changerequest(self.dev, self.handle, self.q)
+ req = qdisc.changerequest(self.bufdevno, self.handle, self.q)
self.rth.talk(req.pack())
-
- def _setup(self):
- q = self.rth.getqdisc(self.dev)
+ return True
+
+ def setup(self):
+ """install Remus queue on VIF outbound traffic"""
+ self.bufdev = self.pool.get()
+
+ devname = self.bufdev.devname
+ bufdev = self.rth.getlink(devname)
+ if not bufdev:
+ raise BufferedNICException('could not find device %s' % devname)
+
+ self.bufdev.install(self.vif)
+
+ self.bufdevno = bufdev['index']
+ self.handle = qdisc.TC_H_ROOT
+ self.q = qdisc.QueueQdisc()
+
+ if not util.modprobe('sch_queue'):
+ raise BufferedNICException('could not load sch_queue module')
+
+ def install(self):
+ devname = self.bufdev.devname
+ q = self.rth.getqdisc(self.bufdevno)
if q:
if q['kind'] == 'queue':
self.installed = True
return
if q['kind'] != 'pfifo_fast':
raise BufferedNICException('there is already a queueing '
- 'discipline on %s' % self.devname)
-
- print 'installing buffer on %s' % self.devname
- req = qdisc.addrequest(self.dev, self.handle, self.q)
+ 'discipline on %s' % devname)
+
+ print ('installing buffer on %s... ' % devname),
+ req = qdisc.addrequest(self.bufdevno, self.handle, self.q)
self.rth.talk(req.pack())
self.installed = True
+ print 'done.'
def uninstall(self):
if self.installed:
- req = qdisc.delrequest(self.dev, self.handle)
+ req = qdisc.delrequest(self.bufdevno, self.handle)
self.rth.talk(req.pack())
self.installed = False
- def _startimq(self, domid):
- # stopgap hack to set up IMQ for an interface. Wrong in many ways.
- imqebt = '/usr/lib/xen/bin/imqebt'
- imqdev = 'imq0'
- vid = 'vif%d.0' % domid
- for mod in ['sch_queue', 'imq', 'ebt_imq']:
- util.runcmd(['modprobe', mod])
- util.runcmd("ip link set %s up" % (imqdev))
- util.runcmd("%s -F FORWARD" % (imqebt))
- util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid,
imqdev))
-
- return imqdev
+ self.bufdev.uninstall()
+ self.pool.put(self.bufdev)
diff -r ca9519f09563 -r 0f403a63ef6b tools/remus/remus
--- a/tools/remus/remus Tue May 04 09:35:42 2010 +0100
+++ b/tools/remus/remus Tue May 04 09:36:05 2010 +0100
@@ -9,7 +9,8 @@ import optparse, os, re, select, signal,
import optparse, os, re, select, signal, sys, time
from xen.remus import save, util, vm
-from xen.remus.device import ReplicatedDisk, BufferedNIC
+from xen.remus.device import ReplicatedDisk, ReplicatedDiskException
+from xen.remus.device import BufferedNIC, BufferedNICException
from xen.xend import XendOptions
class CfgException(Exception): pass
@@ -115,7 +116,7 @@ def run(cfg):
if cfg.netbuffer:
for vif in dom.vifs:
- bufs.append(Netbuffer(dom.domid))
+ bufs.append(BufferedNIC(vif))
fd = save.MigrationSocket((cfg.host, cfg.port))
_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog
|