import threading import sys from xen.xend.xenstore.xsutil import xshandle_new from xen.xend.xenstore.xswatch import xswatch class xsblockingchannel: """ The XenStore Blocking Channel provides an interface into a communication channel that will block waiting for specific values. This is helpful for stateful communications across the XenStore. """ def __init__(self, toolNode): """ toolNode - The node within /tools """ self.xs = xshandle_new() self.path = "/tool/%s" % toolNode self.xs.watch(self.path, self) self.xs.watch("@domainRelease", self) def __del__(self): self.xs.unwatch(self.path, self) self.xs.unwatch("@domainRelease", self) xs.dealloc() def write(self, value): """ value - A value to be written to the toNode """ th = self.xs.transaction_start() self.xs.write(th, self.path, value) res = self.xs.transaction_end(th) print("xsbc.write " + value + " res" + str(res)) def waitFor(self, waitValue, timeout=5): """ Perform a blocking wait for value on the fromNode. """ while True: th = self.xs.transaction_start() val = self.xs.read(th, self.path) self.xs.transaction_end(th) print("Got val %s want %s" % (val, waitValue)) if val == waitValue: return True self.we = None watchThread = threading.Thread(target=readWatchThread, args=[self]) watchThread.start() watchThread.run() try: watchThread.join(timeout) except: sys.exit(1) if watchThread.isAlive(): # Trigger the watch ourselves print("xsbc: wait on %s timed out after %ds" % (self.path, timeout)) th = self.xs.transaction_start() self.xs.write(th, self.path, "timed out") self.xs.transaction_end(th) return False if self.we != None: path = self.we[0] if path == "@releaseDomain": print("Got %s" % path) return False def cleanup(self): """ Remove the node from xenstore. """ th = self.xs.transaction_start() self.xs.rm(th, self.path) self.xs.transaction_end(th) def readWatchThread(self): we = self.xs.read_watch() self.we = we