WARNING - OLD ARCHIVES

This is an archived copy of the Xen.org mailing list, which we have preserved to ensure that existing links to archives are not broken. The live archive, which contains the latest emails, can be found at http://lists.xen.org/
   
 
 
Xen 
 
Home Products Support Community News
 
   
 

xen-changelog

[Xen-changelog] xsnode.py:

To: xen-changelog@xxxxxxxxxxxxxxxxxxx
Subject: [Xen-changelog] xsnode.py:
From: BitKeeper Bot <riel@xxxxxxxxxxx>
Date: Fri, 17 Jun 2005 16:23:55 +0000
Delivery-date: Sat, 18 Jun 2005 01:02:31 +0000
Envelope-to: www-data@xxxxxxxxxxxxxxxxxxx
List-help: <mailto:xen-changelog-request@lists.xensource.com?subject=help>
List-id: BK change log <xen-changelog.lists.xensource.com>
List-post: <mailto:xen-changelog@lists.xensource.com>
List-subscribe: <http://lists.xensource.com/cgi-bin/mailman/listinfo/xen-changelog>, <mailto:xen-changelog-request@lists.xensource.com?subject=subscribe>
List-unsubscribe: <http://lists.xensource.com/cgi-bin/mailman/listinfo/xen-changelog>, <mailto:xen-changelog-request@lists.xensource.com?subject=unsubscribe>
Reply-to: Xen Development List <xen-devel@xxxxxxxxxxxxxxxxxxx>
Sender: xen-changelog-bounces@xxxxxxxxxxxxxxxxxxx
ChangeSet 1.1713.1.21, 2005/06/17 17:23:55+01:00, cl349@xxxxxxxxxxxxxxxxxxxx

        xsnode.py:
          Updated watches/event code from Mike Wray.
        Signed-off-by: Mike Wray <mike.wray@xxxxxx>
        Signed-off-by: Christian Limpach <Christian.Limpach@xxxxxxxxxxxx>



 xsnode.py |  219 +++++++++++++++++++++++++++++++++++++++-----------------------
 1 files changed, 141 insertions(+), 78 deletions(-)


diff -Nru a/tools/python/xen/xend/xenstore/xsnode.py 
b/tools/python/xen/xend/xenstore/xsnode.py
--- a/tools/python/xen/xend/xenstore/xsnode.py  2005-06-17 21:03:36 -04:00
+++ b/tools/python/xen/xend/xenstore/xsnode.py  2005-06-17 21:03:36 -04:00
@@ -2,7 +2,9 @@
 import os
 import os.path
 import select
+import socket
 import sys
+import threading
 import time
 
 from xen.lowlevel import xs
@@ -12,18 +14,26 @@
 SELECT_TIMEOUT = 2.0
 
 def getEventPath(event):
-    return os.path.join("/_event", event)
+    if event and event.startswith("/"):
+        event = event[1:]
+    return os.path.join("/event", event)
 
 def getEventIdPath(event):
-    return os.path.join(eventPath(event), "@eid")
+    return os.path.join(getEventPath(event), "@eid")
 
 class Subscription:
 
-    def __init__(self, event, fn, id):
-        self.event = event
+    def __init__(self, path, fn, sid):
+        self.path = path
         self.watcher = None
         self.fn = fn
-        self.id = id
+        self.sid = sid
+
+    def getPath(self):
+        return self.path
+
+    def getSid(self):
+        return self.sid
 
     def watch(self, watcher):
         self.watcher = watcher
@@ -34,10 +44,11 @@
         if watcher:
             self.watcher = None
             watcher.delSubs(self)
+        return watcher
 
-    def notify(self, event):
+    def notify(self, path, val):
         try:
-            self.fn(event, id)
+            self.fn(self, path, val)
         except SystemExitException:
             raise
         except:
@@ -45,45 +56,45 @@
 
 class Watcher:
 
-    def __init__(self, store, event):
-        self.path = getEventPath(event)
-        self.eidPath = getEventIdPath(event)
+    def __init__(self, store, path):
+        self.path = path
         store.mkdirs(self.path)
-        if not store.exists(self.eidPath):
-            store.writeInt(self.eidPath, 0)
         self.xs = None
-        self.subs = []
+        self.subscriptions = []
 
-    def __getattr__(self, k, v):
-        if k == "fileno":
-            if self.xs:
-                return self.xs.fileno
-            else:
-                return -1
+    def fileno(self):
+        if self.xs:
+            return self.xs.fileno
         else:
-            return self.__dict__.get(k, v)
+            return -1
+
+    def getPath(self):
+        return self.path
 
     def addSubs(self, subs):
-        self.subs.append(subs)
+        self.subscriptions.append(subs)
         self.watch()
 
     def delSubs(self, subs):
-        self.subs.remove(subs)
-        if len(self.subs) == 0:
+        self.subscriptions.remove(subs)
+        if len(self.subscriptions) == 0:
             self.unwatch()
 
-    def getEvent(self):
-        return self.event
-
     def watch(self):
         if self.xs: return
         self.xs = xs.open()
-        self.xs.watch(path)
+        self.xs.watch(self.path)
 
     def unwatch(self):
         if self.xs:
-            self.xs.unwatch(self.path)
-            self.xs.close()
+            try:
+                self.xs.unwatch(self.path)
+            except Exception, ex:
+                print 'Watcher>unwatch>', ex
+            try:
+                self.xs.close()
+            except Exception, ex:
+                pass
             self.xs = None
             
     def watching(self):
@@ -92,22 +103,38 @@
     def getNotification(self):
         p = self.xs.read_watch()
         self.xs.acknowledge_watch()
-        eid = self.xs.readInt(self.eidPath)
         return p
 
-    def notify(self, subs):
-        p = self.getNotification()
-        for s in subs:
-            s.notify(p)
-            
+    def notify(self):
+        try:
+            p = self.getNotification()
+            v = self.xs.read(p)
+            for s in subscriptions:
+                s.notify(p, v)
+        except Exception, ex:
+            print 'Notify exception:', ex
+
+class EventWatcher(Watcher):
+
+    def __init__(self, store, path, event):
+        Watcher.__init__(self, store, path)
+        self.event = event
+        self.eidPath = getEventIdPath(event)
+        if not store.exists(self.eidPath):
+            store.write(self.eidPath, str(0))
+
+    def getEvent(self):
+        return self.event
+
 class XenStore:
 
+    xs = None
+    watchThread = None
+    subscription_id = 1
+    
     def __init__(self):
-        self.xs = None
-        #self.xs = xs.open()
-        self.subscription = {}
-        self.subscription_id = 0
-        self.events = {}
+        self.subscriptions = {}
+        self.watchers = {}
         self.write("/", "")
 
     def getxs(self):
@@ -119,8 +146,8 @@
                     ex = None
                     break
                 except Exception, ex:
-                    print >>stderr, "Exception connecting to xsdaemon:", ex
-                    print >>stderr, "Trying again..."
+                    print >>sys.stderr, "Exception connecting to xsdaemon:", ex
+                    print >>sys.stderr, "Trying again..."
                     time.sleep(1)
             else:
                 raise ex
@@ -217,70 +244,85 @@
         self.getxs().write(path, data, create=create, excl=excl)
 
     def begin(self, path):
-        self.getxs().begin_transaction(path)
+        self.getxs().transaction_start(path)
 
     def commit(self, abandon=False):
-        self.getxs().end_transaction(abort=abandon)
+        self.getxs().transaction_end(abort=abandon)
+
+    def watch(self, path, fn):
+        watcher = self.watchers.get(path)
+        if not watcher:
+            watcher = self.addWatcher(Watcher(self, path))
+        return self.addSubscription(watcher, fn)
+        
+    def unwatch(self, sid):
+        s = self.subscriptions.get(sid)
+        if not s: return
+        del self.subscriptions[s.sid]
+        watcher = s.unwatch()
+        if watcher and not watcher.watching():
+            del self.watchers[path]
 
     def subscribe(self, event, fn):
-        watcher = self.watchEvent(event)
-        self.subscription_id += 1
-        subs = Subscription(event, fn, self.subscription_id)
-        self.subscription[subs.id] = subs
-        subs.watch(watcher)
-        return subs.id
+        path = getEventPath(event)
+        watcher = self.watchers.get(path)
+        if not watcher:
+            watcher = self.addWatcher(EventWatcher(self, path, event))
+        return self.addSubscription(watcher, fn)
 
-    def unsubscribe(self, sid):
-        s = self.subscription.get(sid)
-        if not s: return
-        del self.subscription[s.id]
-        s.unwatch()
-        unwatchEvent(s.event)
+    unsubscribe = unwatch
 
     def sendEvent(self, event, data):
         eventPath = getEventPath(event)
         eidPath = getEventIdPath(event)
         try:
-            self.begin(eventPath)
+            #self.begin(eventPath)
             self.mkdirs(eventPath)
+            eid = 1
             if self.exists(eidPath):
-                eid = self.readInt(eidPath)
-                eid += 1
-            else:
-                eid = 1
-            self.writeInt(eidPath, eid)
+                data = self.read(eidPath)
+                print 'sendEvent>', 'data=', data, type(data)

_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog

<Prev in Thread] Current Thread [Next in Thread>