WARNING - OLD ARCHIVES

This is an archived copy of the Xen.org mailing list, which we have preserved to ensure that existing links to archives are not broken. The live archive, which contains the latest emails, can be found at http://lists.xen.org/
   
 
 
Xen 
 
Home Products Support Community News
 
   
 

xen-changelog

[Xen-changelog] [xen-unstable] [XEND] Add Task support in Xen API implem

To: xen-changelog@xxxxxxxxxxxxxxxxxxx
Subject: [Xen-changelog] [xen-unstable] [XEND] Add Task support in Xen API implementation.
From: Xen patchbot-unstable <patchbot-unstable@xxxxxxxxxxxxxxxxxxx>
Date: Thu, 25 Jan 2007 08:55:17 -0800
Delivery-date: Thu, 25 Jan 2007 08:57:36 -0800
Envelope-to: www-data@xxxxxxxxxxxxxxxxxx
List-help: <mailto:xen-changelog-request@lists.xensource.com?subject=help>
List-id: BK change log <xen-changelog.lists.xensource.com>
List-post: <mailto:xen-changelog@lists.xensource.com>
List-subscribe: <http://lists.xensource.com/cgi-bin/mailman/listinfo/xen-changelog>, <mailto:xen-changelog-request@lists.xensource.com?subject=subscribe>
List-unsubscribe: <http://lists.xensource.com/cgi-bin/mailman/listinfo/xen-changelog>, <mailto:xen-changelog-request@lists.xensource.com?subject=unsubscribe>
Reply-to: xen-devel@xxxxxxxxxxxxxxxxxxx
Sender: xen-changelog-bounces@xxxxxxxxxxxxxxxxxxx
# HG changeset patch
# User Alastair Tse <atse@xxxxxxxxxxxxx>
# Date 1169645186 0
# Node ID 248a9c36d81670735f5b1d89bbf50ba985903fe5
# Parent  259470f0856b4642dd0a4fab6bfba331f8c5454b
[XEND] Add Task support in Xen API implementation.

Added progress tracking to some common methods like VM.start so the
progress during async invocation.

Signed-off-by: Alastair Tse <atse@xxxxxxxxxxxxx>
---
 tools/python/xen/xend/XendAPI.py             |  297 +++++++++++++++++++++------
 tools/python/xen/xend/XendAPIConstants.py    |    1 
 tools/python/xen/xend/XendDomainInfo.py      |   14 -
 tools/python/xen/xend/XendTask.py            |  226 ++++++++++++++++++++
 tools/python/xen/xend/XendTaskManager.py     |  110 ++++++++++
 tools/python/xen/xend/server/XMLRPCServer.py |    2 
 6 files changed, 581 insertions(+), 69 deletions(-)

diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendAPI.py
--- a/tools/python/xen/xend/XendAPI.py  Wed Jan 24 14:36:03 2007 +0000
+++ b/tools/python/xen/xend/XendAPI.py  Wed Jan 24 13:26:26 2007 +0000
@@ -22,12 +22,13 @@ import traceback
 import traceback
 
 from xen.xend import XendDomain, XendDomainInfo, XendNode
-from xen.xend import XendLogging
+from xen.xend import XendLogging, XendTaskManager
 
 from xen.xend.XendAuthSessions import instance as auth_manager
 from xen.xend.XendError import *
 from xen.xend.XendClient import ERROR_INVALID_DOMAIN
 from xen.xend.XendLogging import log
+from xen.xend.XendTask import XendTask
 
 from xen.xend.XendAPIConstants import *
 from xen.util.xmlrpclib2 import stringify
@@ -237,15 +238,26 @@ def valid_sr(func):
                       'SR_HANDLE_INVALID', func, *args, **kwargs)
 
 def valid_pif(func):
-    """Decorator to verify if sr_ref is valid before calling
+    """Decorator to verify if pif_ref is valid before calling
     method.
 
-    @param func: function with params: (self, session, sr_ref)
+    @param func: function with params: (self, session, pif_ref)
     @rtype: callable object
     """
     return lambda *args, **kwargs: \
            _check_ref(lambda r: r in XendNode.instance().pifs,
                       'PIF_HANDLE_INVALID', func, *args, **kwargs)
+
+def valid_task(func):
+    """Decorator to verify if task_ref is valid before calling
+    method.
+
+    @param func: function with params: (self, session, task_ref)
+    @rtype: callable object
+    """
+    return lambda *args, **kwargs: \
+           _check_ref(XendTaskManager.get_task,
+                      'TASK_HANDLE_INVALID', func, *args, **kwargs)
 
 # -----------------------------
 # Bridge to Legacy XM API calls
@@ -288,18 +300,17 @@ class XendAPI:
     def __init__(self, auth):
         self.auth = auth
 
-
     Base_attr_ro = ['uuid']
     Base_attr_rw = []
-    Base_methods = ['destroy', 'get_by_uuid', 'get_record']
-    Base_funcs   = ['create', 'get_all']
+    Base_methods = [('destroy', None), ('get_record', 'Struct')]
+    Base_funcs   = [('get_all', 'Set'), ('get_by_uuid', None)]
 
     # Xen API: Class Session
     # ----------------------------------------------------------------
     # NOTE: Left unwrapped by __init__
 
     session_attr_ro = ['this_host', 'this_user']
-    session_methods = ['logout']
+    session_methods = [('logout', None)]
     # session_funcs = ['login_with_password']    
 
     def session_login_with_password(self, *args):
@@ -346,7 +357,77 @@ class XendAPI:
 
     # Xen API: Class Tasks
     # ----------------------------------------------------------------
-    # TODO: NOT IMPLEMENTED YET    
+
+    task_attr_ro = ['status',
+                    'progress',
+                    'eta',                    
+                    'type',
+                    'result',
+                    'error_code',
+                    'error_info']
+
+    task_attr_rw = ['name_label',
+                    'name_description']
+
+    task_funcs = [('get_by_name_label', 'Set(task)')]
+
+    def task_get_status(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.get_status())
+
+    def task_get_progress(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.progress)
+
+    def task_get_eta(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.eta)
+
+    def task_get_type(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.type)
+
+    def task_get_result(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.result)
+
+    def task_get_error_code(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.error_code)
+
+    def task_get_error_info(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.error_info)
+
+    def task_get_name_label(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.name_label)
+
+    def task_get_name_description(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.name_description)
+
+    def task_set_name_label(self, session, task_ref, label):
+        task = XendTaskManager.get_task(task_ref)
+        task.name_label = label
+        return xen_api_success_void()
+
+    def task_set_name_description(self, session, task_ref, desc):
+        task = XendTaskManager.get_task(task_ref)
+        task.name_description = desc
+        return xen_api_success_void()    
+
+    def task_get_all(self, session):
+        tasks = XendTaskManager.get_all_tasks()
+        return xen_api_success(tasks)
+
+    def task_destroy(self, session, task_uuid):
+        XendTaskManager.destroy_task(task_uuid)
+        return xen_api_success_void()
+
+    def task_get_record(self, session, task_ref):
+        task = XendTaskManager.get_task(task_ref)
+        return xen_api_success(task.get_record())
 
     # Xen API: Class Host
     # ----------------------------------------------------------------    
@@ -358,12 +439,12 @@ class XendAPI:
     host_attr_rw = ['name_label',
                     'name_description']
 
-    host_methods = ['disable',
-                    'enable',
-                    'reboot',
-                    'shutdown']
-    
-    host_funcs = ['get_by_name_label']
+    host_methods = [('disable', None),
+                    ('enable', None),
+                    ('reboot', None),
+                    ('shutdown', None)]
+    
+    host_funcs = [('get_by_name_label', 'Set(host)')]
 
     # attributes
     def host_get_name_label(self, session, host_ref):
@@ -456,8 +537,6 @@ class XendAPI:
     # class methods
     def host_cpu_get_all(self, session):
         return xen_api_success(XendNode.instance().get_host_cpu_refs())
-    def host_cpu_create(self, session, struct):
-        return xen_api_error(XEND_ERROR_UNSUPPORTED)
 
 
     # Xen API: Class network
@@ -468,7 +547,9 @@ class XendAPI:
                        'name_description',
                        'default_gateway',
                        'default_netmask']
-
+    
+    network_funcs = [('create', 'network')]
+    
     def network_create(self, _, name_label, name_description,
                        default_gateway, default_netmask):
         return xen_api_success(
@@ -534,7 +615,7 @@ class XendAPI:
 
     PIF_attr_inst = PIF_attr_rw
 
-    PIF_methods = ['create_VLAN']
+    PIF_methods = [('create_VLAN', 'int')]
 
     def _get_PIF(self, ref):
         return XendNode.instance().pifs[ref]
@@ -659,18 +740,19 @@ class XendAPI:
                   'platform_keymap',
                   'otherConfig']
 
-    VM_methods = ['clone',
-                  'start',
-                  'pause',
-                  'unpause',
-                  'clean_shutdown',
-                  'clean_reboot',
-                  'hard_shutdown',
-                  'hard_reboot',
-                  'suspend',
-                  'resume']
-    
-    VM_funcs  = ['get_by_name_label']
+    VM_methods = [('clone', 'VM'),
+                  ('start', None),
+                  ('pause', None),
+                  ('unpause', None),
+                  ('clean_shutdown', None),
+                  ('clean_reboot', None),
+                  ('hard_shutdown', None),
+                  ('hard_reboot', None),
+                  ('suspend', None),
+                  ('resume', None)]
+    
+    VM_funcs  = [('create', 'VM'),
+                 ('get_by_name_label', 'Set(VM)')]
 
     # parameters required for _create()
     VM_attr_inst = [
@@ -991,7 +1073,8 @@ class XendAPI:
     
     def VM_create(self, session, vm_struct):
         xendom = XendDomain.instance()
-        domuuid = xendom.create_domain(vm_struct)
+        domuuid = XendTask.log_progress(0, 100,
+                                        xendom.create_domain, vm_struct)
         return xen_api_success(domuuid)
     
     # object methods
@@ -1052,31 +1135,49 @@ class XendAPI:
     def VM_clean_reboot(self, session, vm_ref):
         xendom = XendDomain.instance()
         xeninfo = xendom.get_vm_by_uuid(vm_ref)
-        xeninfo.shutdown("reboot")
-        return xen_api_success_void()
+        XendTask.log_progress(0, 100, xeninfo.shutdown, "reboot")
+        return xen_api_success_void()
+    
     def VM_clean_shutdown(self, session, vm_ref):
         xendom = XendDomain.instance()
         xeninfo = xendom.get_vm_by_uuid(vm_ref)
-        xeninfo.shutdown("poweroff")
-        return xen_api_success_void()
+        XendTask.log_progress(0, 100, xeninfo.shutdown, "poweroff")        
+        return xen_api_success_void()
+    
     def VM_clone(self, session, vm_ref):
         return xen_api_error(XEND_ERROR_UNSUPPORTED)
+    
     def VM_destroy(self, session, vm_ref):
-        return do_vm_func("domain_delete", vm_ref)
+        return XendTask.log_progress(0, 100, do_vm_func,
+                                     "domain_delete", vm_ref)
+    
     def VM_hard_reboot(self, session, vm_ref):
-        return xen_api_error(XEND_ERROR_UNSUPPORTED)    
+        return xen_api_error(XEND_ERROR_UNSUPPORTED)
+    
     def VM_hard_shutdown(self, session, vm_ref):
-        return do_vm_func("domain_destroy", vm_ref)    
+        return XendTask.log_progress(0, 100, do_vm_func,
+                                     "domain_destroy", vm_ref)    
     def VM_pause(self, session, vm_ref):
-        return do_vm_func("domain_pause", vm_ref)
+        return XendTask.log_progress(0, 100, do_vm_func,
+                                     "domain_pause", vm_ref)
+    
     def VM_resume(self, session, vm_ref, start_paused):
-        return do_vm_func("domain_resume", vm_ref, start_paused = 
start_paused)    
+        return XendTask.log_progress(0, 100, do_vm_func,
+                                     "domain_resume", vm_ref,
+                                     start_paused = start_paused)
+    
     def VM_start(self, session, vm_ref, start_paused):
-        return do_vm_func("domain_start", vm_ref, start_paused = start_paused)
+        return XendTask.log_progress(0, 100, do_vm_func,
+                                     "domain_start", vm_ref,
+                                     start_paused = start_paused)
+    
     def VM_suspend(self, session, vm_ref):
-        return do_vm_func("domain_suspend", vm_ref)    
+        return XendTask.log_progress(0, 100, do_vm_func,
+                                     "domain_suspend", vm_ref)
+    
     def VM_unpause(self, session, vm_ref):
-        return do_vm_func("domain_unpause", vm_ref)
+        return XendTask.log_progress(0, 100, do_vm_func,
+                                     "domain_unpause", vm_ref)
 
     # Xen API: Class VBD
     # ----------------------------------------------------------------
@@ -1095,8 +1196,9 @@ class XendAPI:
 
     VBD_attr_inst = VBD_attr_rw + ['image']
 
-    VBD_methods = ['media_change']
-
+    VBD_methods = [('media_change', None)]
+    VBD_funcs = [('create', 'VBD')]
+    
     # object methods
     def VBD_get_record(self, session, vbd_ref):
         xendom = XendDomain.instance()
@@ -1191,6 +1293,9 @@ class XendAPI:
 
     VIF_attr_inst = VIF_attr_rw
 
+    VIF_funcs = [('create', 'VIF')]
+
+                 
     # object methods
     def VIF_get_record(self, session, vif_ref):
         xendom = XendDomain.instance()
@@ -1242,8 +1347,9 @@ class XendAPI:
                    'read_only']
     VDI_attr_inst = VDI_attr_ro + VDI_attr_rw
 
-    VDI_methods = ['snapshot']
-    VDI_funcs = ['get_by_name_label']
+    VDI_methods = [('snapshot', 'VDI')]
+    VDI_funcs = [('create', 'VDI'),
+                  ('get_by_name_label', 'Set(VDI)')]
 
     def _get_VDI(self, ref):
         return XendNode.instance().get_sr().xen_api_get_by_uuid(ref)
@@ -1369,6 +1475,8 @@ class XendAPI:
 
     VTPM_attr_inst = VTPM_attr_rw
 
+    VTPM_funcs = [('create', 'VTPM')]
+    
     # object methods
     def VTPM_get_record(self, session, vtpm_ref):
         xendom = XendDomain.instance()
@@ -1467,8 +1575,9 @@ class XendAPI:
                     'name_label',
                     'name_description']
     
-    SR_methods = ['clone']
-    SR_funcs = ['get_by_name_label']
+    SR_methods = [('clone', 'SR')]
+    SR_funcs = [('get_by_name_label', 'Set(SR)'),
+                ('get_by_uuid', 'SR')]
 
     # Class Functions
     def SR_get_all(self, session):
@@ -1542,6 +1651,67 @@ class XendAPI:
         XendNode.instance().save()        
         return xen_api_success_void()
 
+
+class XendAPIAsyncProxy:
+    """ A redirector for Async.Class.function calls to XendAPI
+    but wraps the call for use with the XendTaskManager.
+
+    @ivar xenapi: Xen API instance
+    @ivar method_map: Mapping from XMLRPC method name to callable objects.
+    """
+
+    method_prefix = 'Async.'
+
+    def __init__(self, xenapi):
+        """Initialises the Async Proxy by making a map of all
+        implemented Xen API methods for use with XendTaskManager.
+
+        @param xenapi: XendAPI instance
+        """
+        self.xenapi = xenapi
+        self.method_map = {}
+        for method_name in dir(self.xenapi):
+            method = getattr(self.xenapi, method_name)            
+            if method_name[0] != '_' and hasattr(method, 'async') \
+                   and method.async == True:
+                self.method_map[method.api] = method
+
+    def _dispatch(self, method, args):
+        """Overridden method so that SimpleXMLRPCServer will
+        resolve methods through this method rather than through
+        inspection.
+
+        @param method: marshalled method name from XMLRPC.
+        @param args: marshalled arguments from XMLRPC.
+        """
+
+        # Only deal with method names that start with "Async."
+        if not method.startswith(self.method_prefix):
+            raise Exception('Method %s not supported' % method)
+
+        # Require 'session' argument to be present.
+        if len(args) < 1:
+            raise Exception('Not enough arguments')
+
+        # Lookup synchronous version of the method
+        synchronous_method_name = method[len(self.method_prefix):]
+        if synchronous_method_name not in self.method_map:
+            raise Exception('Method %s not supported' % method)
+        
+        method = self.method_map[synchronous_method_name]
+
+        # Validate the session before proceeding
+        session = args[0]
+        if not auth_manager().is_session_valid(session):
+            return xen_api_error(['SESSION_INVALID', session])
+
+        # create and execute the task, and return task_uuid
+        return_type = getattr(method, 'return_type', None)
+        task_uuid = XendTaskManager.create_task(method, args,
+                                                synchronous_method_name,
+                                                return_type,
+                                                synchronous_method_name)
+        return xen_api_success(task_uuid)
 
 def _decorate():
     """Initialise Xen API wrapper by making sure all functions
@@ -1561,7 +1731,8 @@ def _decorate():
         'VDI'     : valid_vdi,
         'VTPM'    : valid_vtpm,
         'SR'      : valid_sr,
-        'PIF'     : valid_pif
+        'PIF'     : valid_pif,
+        'task'    : valid_task,
         }
 
     # Cheat methods
@@ -1582,17 +1753,11 @@ def _decorate():
         setattr(XendAPI, get_by_uuid, _get_by_uuid)
         setattr(XendAPI, get_uuid,    _get_uuid)
 
-    # 2. get_record is just getting all the attributes, so provide
-    #    a fake template implementation.
-    # 
-    # TODO: ...
-
-
     # Wrapping validators around XMLRPC calls
     # ---------------------------------------
 
     for cls, validator in classes.items():
-        def doit(n, takes_instance):
+        def doit(n, takes_instance, async_support = False, return_type = None):
             n_ = n.replace('.', '_')
             try:
                 f = getattr(XendAPI, n_)
@@ -1604,6 +1769,10 @@ def _decorate():
                 for v in validators:
                     f = v(f)
                     f.api = n
+                    f.async = async_support
+                    if return_type:
+                        f.return_type = return_type
+                    
                 setattr(XendAPI, n_, f)
             except AttributeError:
                 log.warn("API call: %s not found" % n)
@@ -1616,19 +1785,21 @@ def _decorate():
 
         # wrap validators around readable class attributes
         for attr_name in ro_attrs + rw_attrs + XendAPI.Base_attr_ro:
-            doit('%s.get_%s' % (cls, attr_name), True)
+            doit('%s.get_%s' % (cls, attr_name), True, async_support = False)
 
         # wrap validators around writable class attrributes
         for attr_name in rw_attrs + XendAPI.Base_attr_rw:
-            doit('%s.set_%s' % (cls, attr_name), True)
+            doit('%s.set_%s' % (cls, attr_name), True, async_support = False)
 
         # wrap validators around methods
-        for method_name in methods + XendAPI.Base_methods:
-            doit('%s.%s' % (cls, method_name), True)
+        for method_name, return_type in methods + XendAPI.Base_methods:
+            doit('%s.%s' % (cls, method_name), True, async_support = True)
 
         # wrap validators around class functions
-        for func_name in funcs + XendAPI.Base_funcs:
-            doit('%s.%s' % (cls, func_name), False)
+        for func_name, return_type in funcs + XendAPI.Base_funcs:
+            doit('%s.%s' % (cls, func_name), False, async_support = True,
+                 return_type = return_type)
+
 
 _decorate()
 
diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendAPIConstants.py
--- a/tools/python/xen/xend/XendAPIConstants.py Wed Jan 24 14:36:03 2007 +0000
+++ b/tools/python/xen/xend/XendAPIConstants.py Wed Jan 24 13:26:26 2007 +0000
@@ -74,3 +74,4 @@ XEN_API_VDI_TYPE = ['system', 'user', 'e
 XEN_API_VDI_TYPE = ['system', 'user', 'ephemeral']
 XEN_API_DRIVER_TYPE = ['ioemu', 'paravirtualised']
 XEN_API_VBD_TYPE = ['CD', 'Disk']
+XEN_API_TASK_STATUS_TYPE = ['pending', 'success', 'failure']
diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendDomainInfo.py
--- a/tools/python/xen/xend/XendDomainInfo.py   Wed Jan 24 14:36:03 2007 +0000
+++ b/tools/python/xen/xend/XendDomainInfo.py   Wed Jan 24 13:26:26 2007 +0000
@@ -44,6 +44,7 @@ from xen.xend.XendBootloader import boot
 from xen.xend.XendBootloader import bootloader, bootloader_tidy
 from xen.xend.XendError import XendError, VmError
 from xen.xend.XendDevices import XendDevices
+from xen.xend.XendTask import XendTask
 from xen.xend.xenstore.xstransact import xstransact, complete
 from xen.xend.xenstore.xsutil import GetDomainPath, IntroduceDomain, 
ResumeDomain
 from xen.xend.xenstore.xswatch import xswatch
@@ -387,12 +388,13 @@ class XendDomainInfo:
 
         if self.state == DOM_STATE_HALTED:
             try:
-                self._constructDomain()
-                self._initDomain()
-                self._storeVmDetails()
-                self._storeDomDetails()
-                self._registerWatches()
-                self.refreshShutdown()
+                XendTask.log_progress(0, 30, self._constructDomain)
+                XendTask.log_progress(31, 60, self._initDomain)
+                
+                XendTask.log_progress(61, 70, self._storeVmDetails)
+                XendTask.log_progress(71, 80, self._storeDomDetails)
+                XendTask.log_progress(81, 90, self._registerWatches)
+                XendTask.log_progress(91, 100, self.refreshShutdown)
 
                 # save running configuration if XendDomains believe domain is
                 # persistent
diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendTask.py
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/xend/XendTask.py Wed Jan 24 13:26:26 2007 +0000
@@ -0,0 +1,226 @@
+#===========================================================================
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of version 2.1 of the GNU Lesser General Public
+# License as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+#============================================================================
+# Copyright (C) 2007 XenSource Ltd
+#============================================================================
+
+from xen.xend.XendAPIConstants import XEN_API_TASK_STATUS_TYPE
+from xen.xend.XendLogging import log
+import thread
+import threading
+
+class XendTask(threading.Thread):
+    """Represents a Asynchronous Task used by Xen API.
+
+    Basically proxies the callable object in a thread and returns the
+    results via self.{type,result,error_code,error_info}.
+
+    @cvar task_progress: Thread local storage for progress tracking.
+                         It is a dict indexed by thread_id. Note that the
+                         thread_id may be reused when the previous
+                         thread with the thread_id ends.
+                         
+    @cvar task_progress_lock: lock on thread access to task_progress
+    
+    """
+
+    # progress stack:
+    # thread_id : [(start_task, end_task),
+    #              (start_sub_task, end_sub_task)..]
+    # example : (0, 100), (50, 100) (50, 100) ...
+    #           That would mean that the task is 75% complete.
+    #           as it is 50% of the last 50% of the task.
+    
+    task_progress = {}
+    task_progress_lock = threading.Lock()
+
+    def __init__(self, uuid, func, args, func_name, return_type = None,
+                 label = None, desc = None):
+        """
+        @param uuid: UUID of the task
+        @type uuid: string
+        @param func: Method to call (from XendAPI)
+        @type func: callable object
+        @param args: arguments to pass to function
+        @type args: list or tuple
+        @param label: name label of the task.
+        @type label: string
+        @param desc: name description of the task.
+        @type desc: string
+        @param func_name: function name, eg ('VM.start')
+        @type desc: string
+        """
+        
+        threading.Thread.__init__(self)
+        self.status_lock = threading.Lock()
+        self.status = XEN_API_TASK_STATUS_TYPE[0]
+
+        self.progress = 0
+        self.eta = None  # TODO: we have no time estimates
+        self.type = return_type
+        self.uuid = uuid
+        
+        self.result = None
+        self.error_code = ''
+        self.error_info = []
+        
+        self.name_label = label or func.__name__
+        self.name_description = desc
+        self.thread_id = 0
+
+        self.func_name = func_name 
+        self.func = func
+        self.args = args
+
+    def set_status(self, new_status):
+        self.status_lock.acquire()
+        try:
+            self.status = new_status
+        finally:
+            self.status_lock.release()
+
+    def get_status(self):
+        self.status_lock.acquire()
+        try:
+            return self.status
+        finally:
+            self.status_lock.release()        
+
+    def run(self):
+        """Runs the method and stores the result for later access.
+
+        Is invoked by threading.Thread.start().
+        """
+
+        self.thread_id = thread.get_ident()
+        self.task_progress_lock.acquire()
+        try:
+            self.task_progress[self.thread_id] = {}
+            self.progress = 0            
+        finally:
+            self.task_progress_lock.release()
+
+        try:
+            result = self.func(*self.args)
+            if result['Status'] == 'Success':
+                self.result = result['Value']
+                self.set_status(XEN_API_TASK_STATUS_TYPE[1])
+            else:
+                self.error_code = result['ErrorDescription'][0]
+                self.error_info = result['ErrorDescription'][1:]
+                self.set_status(XEN_API_TASK_STATUS_TYPE[2])                
+        except Exception, e:
+            log.exception('Error running Async Task')
+            self.error_code = 'INTERNAL ERROR'
+            self.error_info = [str(e)]
+            self.set_status(XEN_API_TASK_STATUS_TYPE[2])
+
+        self.task_progress_lock.acquire()
+        try:
+            del self.task_progress[self.thread_id]
+            self.progress = 100
+        finally:
+            self.task_progress_lock.release()
+    
+    def get_record(self):
+        """Returns a Xen API compatible record."""
+        return {
+            'uuid': self.uuid,            
+            'name_label': self.name_label,
+            'name_description': self.name_description,
+            'status': self.status,
+            'progress': self.get_progress(),
+            'eta': self.eta,
+            'type': self.type,
+            'result': self.result,
+            'error_code': self.error_code,
+            'error_info': self.error_info,
+        }
+
+    def get_progress(self):
+        """ Checks the thread local progress storage. """
+        if self.status != XEN_API_TASK_STATUS_TYPE[0]:
+            return 100
+        
+        self.task_progress_lock.acquire()
+        try:
+            # Pop each progress range in the stack and map it on to
+            # the next progress range until we find out cumulative
+            # progress based on the (start, end) range of each level
+            start = 0
+            prog_stack = self.task_progress.get(self.thread_id, [])[:]
+            if len(prog_stack) > 0:
+                start, stop = prog_stack.pop()
+                while prog_stack:
+                    new_start, new_stop = prog_stack.pop()
+                    start = new_start + ((new_stop - new_start)/100.0 * start)
+
+            # only update progress if it increases, this will prevent
+            # progress from going backwards when tasks are popped off
+            # the stack
+            if start > self.progress:
+                self.progress = int(start)
+        finally:
+            self.task_progress_lock.release()
+
+        return self.progress
+
+
+    def log_progress(cls, progress_min, progress_max,
+                     func, *args, **kwds):
+        """ Callable function wrapper that logs the progress of the
+        function to thread local storage for task progress calculation.
+
+        This is a class method so other parts of Xend will update
+        the task progress by calling:
+
+        XendTask.push_progress(progress_min, progress_max,
+                               func, *args, **kwds)
+
+        The results of the progress is stored in thread local storage
+        and the result of the func(*args, **kwds) is returned back
+        to the caller.
+
+        """
+        thread_id = thread.get_ident()
+        retval = None
+
+        # Log the start of the method
+        cls.task_progress_lock.acquire()
+        try:
+            if type(cls.task_progress.get(thread_id)) != list:
+                cls.task_progress[thread_id] = []
+                    
+            cls.task_progress[thread_id].append((progress_min,
+                                                 progress_max))
+        finally:
+            cls.task_progress_lock.release()
+
+        # Execute the method
+        retval = func(*args, **kwds)
+
+        # Log the end of the method by popping the progress range
+        # off the stack.
+        cls.task_progress_lock.acquire()
+        try:
+            cls.task_progress[thread_id].pop()
+        finally:
+            cls.task_progress_lock.release()
+
+        return retval
+
+    log_progress = classmethod(log_progress)
+
+    
+
diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendTaskManager.py
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/xend/XendTaskManager.py  Wed Jan 24 13:26:26 2007 +0000
@@ -0,0 +1,110 @@
+#===========================================================================
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of version 2.1 of the GNU Lesser General Public
+# License as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+#============================================================================
+# Copyright (C) 2007 XenSource Ltd
+#============================================================================
+
+"""
+Task Manager for Xen API asynchronous tasks.
+
+Stores all tasks in a simple dictionary in module's own local storage to
+avoid the 'instance()' methods.
+
+Tasks are indexed by UUID.
+
+"""
+
+from xen.xend.XendTask import XendTask
+from xen.xend import uuid
+import threading
+
+tasks = {}
+tasks_lock = threading.Lock()
+
+def create_task(func, args, func_name, return_type = None, label = ''):
+    """Creates a new Task and registers it with the XendTaskManager.
+
+    @param func: callable object XMLRPC method
+    @type func: callable object
+    @param args: tuple or list of arguments
+    @type args: tuple or list
+    @param func_name: XMLRPC method name, so we can estimate the progress
+    @type func_name: string
+    
+    @return: Task UUID
+    @rtype: string.
+    """
+    task_uuid = uuid.createString()
+    try:
+        tasks_lock.acquire()
+        task = XendTask(task_uuid, func, args, func_name,
+                        return_type = return_type, label = label)
+        tasks[task_uuid] = task
+    finally:
+        tasks_lock.release()
+
+    task.start()
+
+    return task_uuid
+
+def destroy_task(task_uuid):
+    """Destroys a task.
+
+    @param task_uuid: Task UUID
+    @type task_uuid: string.
+    """
+    try:
+        tasks_lock.acquire()
+        if task_uuid in tasks:
+            del tasks[task_uuid]
+    finally:
+        tasks_lock.release()
+
+def get_all_tasks():
+    """ Returns all the UUID of tracked tasks, completed or pending.
+
+    @returns: list of UUIDs
+    @rtype: list of strings
+    """
+    try:
+        tasks_lock.acquire()
+        return tasks.keys()
+    finally:
+        tasks_lock.release()
+
+def get_task(task_uuid):
+    """ Retrieves a task by UUID.
+
+    @rtype: XendTask or None
+    @return: Task denoted by UUID.
+    """
+    try:
+        tasks_lock.acquire()
+        return tasks.get(task_uuid)
+    finally:
+        tasks_lock.release()
+
+def get_tasks_by_name(task_name):
+    """ Retrieves a task by UUID.
+
+    @rtype: XendTask or None
+    @return: Task denoted by UUID.
+    """
+    try:
+        tasks_lock.acquire()
+        return [t.uuid for t in tasks if t.name_label == name]
+    finally:
+        tasks_lock.release()        
+
+
diff -r 259470f0856b -r 248a9c36d816 
tools/python/xen/xend/server/XMLRPCServer.py
--- a/tools/python/xen/xend/server/XMLRPCServer.py      Wed Jan 24 14:36:03 
2007 +0000
+++ b/tools/python/xen/xend/server/XMLRPCServer.py      Wed Jan 24 13:26:26 
2007 +0000
@@ -139,6 +139,8 @@ class XMLRPCServer:
                 meth = getattr(self.xenapi, meth_name)
                 if callable(meth) and hasattr(meth, 'api'):
                     self.server.register_function(meth, getattr(meth, 'api'))
+
+        self.server.register_instance(XendAPI.XendAPIAsyncProxy(self.xenapi))
                 
         # Legacy deprecated xm xmlrpc api
         # --------------------------------------------------------------------

_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog

<Prev in Thread] Current Thread [Next in Thread>
  • [Xen-changelog] [xen-unstable] [XEND] Add Task support in Xen API implementation., Xen patchbot-unstable <=