ChangeSet 1.1713.1.21, 2005/06/17 17:23:55+01:00, cl349@xxxxxxxxxxxxxxxxxxxx
xsnode.py:
Updated watches/event code from Mike Wray.
Signed-off-by: Mike Wray <mike.wray@xxxxxx>
Signed-off-by: Christian Limpach <Christian.Limpach@xxxxxxxxxxxx>
xsnode.py | 219 +++++++++++++++++++++++++++++++++++++++-----------------------
1 files changed, 141 insertions(+), 78 deletions(-)
diff -Nru a/tools/python/xen/xend/xenstore/xsnode.py
b/tools/python/xen/xend/xenstore/xsnode.py
--- a/tools/python/xen/xend/xenstore/xsnode.py 2005-06-17 21:03:36 -04:00
+++ b/tools/python/xen/xend/xenstore/xsnode.py 2005-06-17 21:03:36 -04:00
@@ -2,7 +2,9 @@
import os
import os.path
import select
+import socket
import sys
+import threading
import time
from xen.lowlevel import xs
@@ -12,18 +14,26 @@
SELECT_TIMEOUT = 2.0
def getEventPath(event):
- return os.path.join("/_event", event)
+ if event and event.startswith("/"):
+ event = event[1:]
+ return os.path.join("/event", event)
def getEventIdPath(event):
- return os.path.join(eventPath(event), "@eid")
+ return os.path.join(getEventPath(event), "@eid")
class Subscription:
- def __init__(self, event, fn, id):
- self.event = event
+ def __init__(self, path, fn, sid):
+ self.path = path
self.watcher = None
self.fn = fn
- self.id = id
+ self.sid = sid
+
+ def getPath(self):
+ return self.path
+
+ def getSid(self):
+ return self.sid
def watch(self, watcher):
self.watcher = watcher
@@ -34,10 +44,11 @@
if watcher:
self.watcher = None
watcher.delSubs(self)
+ return watcher
- def notify(self, event):
+ def notify(self, path, val):
try:
- self.fn(event, id)
+ self.fn(self, path, val)
except SystemExitException:
raise
except:
@@ -45,45 +56,45 @@
class Watcher:
- def __init__(self, store, event):
- self.path = getEventPath(event)
- self.eidPath = getEventIdPath(event)
+ def __init__(self, store, path):
+ self.path = path
store.mkdirs(self.path)
- if not store.exists(self.eidPath):
- store.writeInt(self.eidPath, 0)
self.xs = None
- self.subs = []
+ self.subscriptions = []
- def __getattr__(self, k, v):
- if k == "fileno":
- if self.xs:
- return self.xs.fileno
- else:
- return -1
+ def fileno(self):
+ if self.xs:
+ return self.xs.fileno
else:
- return self.__dict__.get(k, v)
+ return -1
+
+ def getPath(self):
+ return self.path
def addSubs(self, subs):
- self.subs.append(subs)
+ self.subscriptions.append(subs)
self.watch()
def delSubs(self, subs):
- self.subs.remove(subs)
- if len(self.subs) == 0:
+ self.subscriptions.remove(subs)
+ if len(self.subscriptions) == 0:
self.unwatch()
- def getEvent(self):
- return self.event
-
def watch(self):
if self.xs: return
self.xs = xs.open()
- self.xs.watch(path)
+ self.xs.watch(self.path)
def unwatch(self):
if self.xs:
- self.xs.unwatch(self.path)
- self.xs.close()
+ try:
+ self.xs.unwatch(self.path)
+ except Exception, ex:
+ print 'Watcher>unwatch>', ex
+ try:
+ self.xs.close()
+ except Exception, ex:
+ pass
self.xs = None
def watching(self):
@@ -92,22 +103,38 @@
def getNotification(self):
p = self.xs.read_watch()
self.xs.acknowledge_watch()
- eid = self.xs.readInt(self.eidPath)
return p
- def notify(self, subs):
- p = self.getNotification()
- for s in subs:
- s.notify(p)
-
+ def notify(self):
+ try:
+ p = self.getNotification()
+ v = self.xs.read(p)
+ for s in subscriptions:
+ s.notify(p, v)
+ except Exception, ex:
+ print 'Notify exception:', ex
+
+class EventWatcher(Watcher):
+
+ def __init__(self, store, path, event):
+ Watcher.__init__(self, store, path)
+ self.event = event
+ self.eidPath = getEventIdPath(event)
+ if not store.exists(self.eidPath):
+ store.write(self.eidPath, str(0))
+
+ def getEvent(self):
+ return self.event
+
class XenStore:
+ xs = None
+ watchThread = None
+ subscription_id = 1
+
def __init__(self):
- self.xs = None
- #self.xs = xs.open()
- self.subscription = {}
- self.subscription_id = 0
- self.events = {}
+ self.subscriptions = {}
+ self.watchers = {}
self.write("/", "")
def getxs(self):
@@ -119,8 +146,8 @@
ex = None
break
except Exception, ex:
- print >>stderr, "Exception connecting to xsdaemon:", ex
- print >>stderr, "Trying again..."
+ print >>sys.stderr, "Exception connecting to xsdaemon:", ex
+ print >>sys.stderr, "Trying again..."
time.sleep(1)
else:
raise ex
@@ -217,70 +244,85 @@
self.getxs().write(path, data, create=create, excl=excl)
def begin(self, path):
- self.getxs().begin_transaction(path)
+ self.getxs().transaction_start(path)
def commit(self, abandon=False):
- self.getxs().end_transaction(abort=abandon)
+ self.getxs().transaction_end(abort=abandon)
+
+ def watch(self, path, fn):
+ watcher = self.watchers.get(path)
+ if not watcher:
+ watcher = self.addWatcher(Watcher(self, path))
+ return self.addSubscription(watcher, fn)
+
+ def unwatch(self, sid):
+ s = self.subscriptions.get(sid)
+ if not s: return
+ del self.subscriptions[s.sid]
+ watcher = s.unwatch()
+ if watcher and not watcher.watching():
+ del self.watchers[path]
def subscribe(self, event, fn):
- watcher = self.watchEvent(event)
- self.subscription_id += 1
- subs = Subscription(event, fn, self.subscription_id)
- self.subscription[subs.id] = subs
- subs.watch(watcher)
- return subs.id
+ path = getEventPath(event)
+ watcher = self.watchers.get(path)
+ if not watcher:
+ watcher = self.addWatcher(EventWatcher(self, path, event))
+ return self.addSubscription(watcher, fn)
- def unsubscribe(self, sid):
- s = self.subscription.get(sid)
- if not s: return
- del self.subscription[s.id]
- s.unwatch()
- unwatchEvent(s.event)
+ unsubscribe = unwatch
def sendEvent(self, event, data):
eventPath = getEventPath(event)
eidPath = getEventIdPath(event)
try:
- self.begin(eventPath)
+ #self.begin(eventPath)
self.mkdirs(eventPath)
+ eid = 1
if self.exists(eidPath):
- eid = self.readInt(eidPath)
- eid += 1
- else:
- eid = 1
- self.writeInt(eidPath, eid)
+ data = self.read(eidPath)
+ print 'sendEvent>', 'data=', data, type(data)
_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog
|