# HG changeset patch
# User Alastair Tse <atse@xxxxxxxxxxxxx>
# Date 1169645186 0
# Node ID 248a9c36d81670735f5b1d89bbf50ba985903fe5
# Parent 259470f0856b4642dd0a4fab6bfba331f8c5454b
[XEND] Add Task support in Xen API implementation.
Added progress tracking to some common methods like VM.start so the
progress during async invocation.
Signed-off-by: Alastair Tse <atse@xxxxxxxxxxxxx>
---
tools/python/xen/xend/XendAPI.py | 297 +++++++++++++++++++++------
tools/python/xen/xend/XendAPIConstants.py | 1
tools/python/xen/xend/XendDomainInfo.py | 14 -
tools/python/xen/xend/XendTask.py | 226 ++++++++++++++++++++
tools/python/xen/xend/XendTaskManager.py | 110 ++++++++++
tools/python/xen/xend/server/XMLRPCServer.py | 2
6 files changed, 581 insertions(+), 69 deletions(-)
diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendAPI.py
--- a/tools/python/xen/xend/XendAPI.py Wed Jan 24 14:36:03 2007 +0000
+++ b/tools/python/xen/xend/XendAPI.py Wed Jan 24 13:26:26 2007 +0000
@@ -22,12 +22,13 @@ import traceback
import traceback
from xen.xend import XendDomain, XendDomainInfo, XendNode
-from xen.xend import XendLogging
+from xen.xend import XendLogging, XendTaskManager
from xen.xend.XendAuthSessions import instance as auth_manager
from xen.xend.XendError import *
from xen.xend.XendClient import ERROR_INVALID_DOMAIN
from xen.xend.XendLogging import log
+from xen.xend.XendTask import XendTask
from xen.xend.XendAPIConstants import *
from xen.util.xmlrpclib2 import stringify
@@ -237,15 +238,26 @@ def valid_sr(func):
'SR_HANDLE_INVALID', func, *args, **kwargs)
def valid_pif(func):
- """Decorator to verify if sr_ref is valid before calling
+ """Decorator to verify if pif_ref is valid before calling
method.
- @param func: function with params: (self, session, sr_ref)
+ @param func: function with params: (self, session, pif_ref)
@rtype: callable object
"""
return lambda *args, **kwargs: \
_check_ref(lambda r: r in XendNode.instance().pifs,
'PIF_HANDLE_INVALID', func, *args, **kwargs)
+
+def valid_task(func):
+ """Decorator to verify if task_ref is valid before calling
+ method.
+
+ @param func: function with params: (self, session, task_ref)
+ @rtype: callable object
+ """
+ return lambda *args, **kwargs: \
+ _check_ref(XendTaskManager.get_task,
+ 'TASK_HANDLE_INVALID', func, *args, **kwargs)
# -----------------------------
# Bridge to Legacy XM API calls
@@ -288,18 +300,17 @@ class XendAPI:
def __init__(self, auth):
self.auth = auth
-
Base_attr_ro = ['uuid']
Base_attr_rw = []
- Base_methods = ['destroy', 'get_by_uuid', 'get_record']
- Base_funcs = ['create', 'get_all']
+ Base_methods = [('destroy', None), ('get_record', 'Struct')]
+ Base_funcs = [('get_all', 'Set'), ('get_by_uuid', None)]
# Xen API: Class Session
# ----------------------------------------------------------------
# NOTE: Left unwrapped by __init__
session_attr_ro = ['this_host', 'this_user']
- session_methods = ['logout']
+ session_methods = [('logout', None)]
# session_funcs = ['login_with_password']
def session_login_with_password(self, *args):
@@ -346,7 +357,77 @@ class XendAPI:
# Xen API: Class Tasks
# ----------------------------------------------------------------
- # TODO: NOT IMPLEMENTED YET
+
+ task_attr_ro = ['status',
+ 'progress',
+ 'eta',
+ 'type',
+ 'result',
+ 'error_code',
+ 'error_info']
+
+ task_attr_rw = ['name_label',
+ 'name_description']
+
+ task_funcs = [('get_by_name_label', 'Set(task)')]
+
+ def task_get_status(self, session, task_ref):
+ task = XendTaskManager.get_task(task_ref)
+ return xen_api_success(task.get_status())
+
+ def task_get_progress(self, session, task_ref):
+ task = XendTaskManager.get_task(task_ref)
+ return xen_api_success(task.progress)
+
+ def task_get_eta(self, session, task_ref):
+ task = XendTaskManager.get_task(task_ref)
+ return xen_api_success(task.eta)
+
+ def task_get_type(self, session, task_ref):
+ task = XendTaskManager.get_task(task_ref)
+ return xen_api_success(task.type)
+
+ def task_get_result(self, session, task_ref):
+ task = XendTaskManager.get_task(task_ref)
+ return xen_api_success(task.result)
+
+ def task_get_error_code(self, session, task_ref):
+ task = XendTaskManager.get_task(task_ref)
+ return xen_api_success(task.error_code)
+
+ def task_get_error_info(self, session, task_ref):
+ task = XendTaskManager.get_task(task_ref)
+ return xen_api_success(task.error_info)
+
+ def task_get_name_label(self, session, task_ref):
+ task = XendTaskManager.get_task(task_ref)
+ return xen_api_success(task.name_label)
+
+ def task_get_name_description(self, session, task_ref):
+ task = XendTaskManager.get_task(task_ref)
+ return xen_api_success(task.name_description)
+
+ def task_set_name_label(self, session, task_ref, label):
+ task = XendTaskManager.get_task(task_ref)
+ task.name_label = label
+ return xen_api_success_void()
+
+ def task_set_name_description(self, session, task_ref, desc):
+ task = XendTaskManager.get_task(task_ref)
+ task.name_description = desc
+ return xen_api_success_void()
+
+ def task_get_all(self, session):
+ tasks = XendTaskManager.get_all_tasks()
+ return xen_api_success(tasks)
+
+ def task_destroy(self, session, task_uuid):
+ XendTaskManager.destroy_task(task_uuid)
+ return xen_api_success_void()
+
+ def task_get_record(self, session, task_ref):
+ task = XendTaskManager.get_task(task_ref)
+ return xen_api_success(task.get_record())
# Xen API: Class Host
# ----------------------------------------------------------------
@@ -358,12 +439,12 @@ class XendAPI:
host_attr_rw = ['name_label',
'name_description']
- host_methods = ['disable',
- 'enable',
- 'reboot',
- 'shutdown']
-
- host_funcs = ['get_by_name_label']
+ host_methods = [('disable', None),
+ ('enable', None),
+ ('reboot', None),
+ ('shutdown', None)]
+
+ host_funcs = [('get_by_name_label', 'Set(host)')]
# attributes
def host_get_name_label(self, session, host_ref):
@@ -456,8 +537,6 @@ class XendAPI:
# class methods
def host_cpu_get_all(self, session):
return xen_api_success(XendNode.instance().get_host_cpu_refs())
- def host_cpu_create(self, session, struct):
- return xen_api_error(XEND_ERROR_UNSUPPORTED)
# Xen API: Class network
@@ -468,7 +547,9 @@ class XendAPI:
'name_description',
'default_gateway',
'default_netmask']
-
+
+ network_funcs = [('create', 'network')]
+
def network_create(self, _, name_label, name_description,
default_gateway, default_netmask):
return xen_api_success(
@@ -534,7 +615,7 @@ class XendAPI:
PIF_attr_inst = PIF_attr_rw
- PIF_methods = ['create_VLAN']
+ PIF_methods = [('create_VLAN', 'int')]
def _get_PIF(self, ref):
return XendNode.instance().pifs[ref]
@@ -659,18 +740,19 @@ class XendAPI:
'platform_keymap',
'otherConfig']
- VM_methods = ['clone',
- 'start',
- 'pause',
- 'unpause',
- 'clean_shutdown',
- 'clean_reboot',
- 'hard_shutdown',
- 'hard_reboot',
- 'suspend',
- 'resume']
-
- VM_funcs = ['get_by_name_label']
+ VM_methods = [('clone', 'VM'),
+ ('start', None),
+ ('pause', None),
+ ('unpause', None),
+ ('clean_shutdown', None),
+ ('clean_reboot', None),
+ ('hard_shutdown', None),
+ ('hard_reboot', None),
+ ('suspend', None),
+ ('resume', None)]
+
+ VM_funcs = [('create', 'VM'),
+ ('get_by_name_label', 'Set(VM)')]
# parameters required for _create()
VM_attr_inst = [
@@ -991,7 +1073,8 @@ class XendAPI:
def VM_create(self, session, vm_struct):
xendom = XendDomain.instance()
- domuuid = xendom.create_domain(vm_struct)
+ domuuid = XendTask.log_progress(0, 100,
+ xendom.create_domain, vm_struct)
return xen_api_success(domuuid)
# object methods
@@ -1052,31 +1135,49 @@ class XendAPI:
def VM_clean_reboot(self, session, vm_ref):
xendom = XendDomain.instance()
xeninfo = xendom.get_vm_by_uuid(vm_ref)
- xeninfo.shutdown("reboot")
- return xen_api_success_void()
+ XendTask.log_progress(0, 100, xeninfo.shutdown, "reboot")
+ return xen_api_success_void()
+
def VM_clean_shutdown(self, session, vm_ref):
xendom = XendDomain.instance()
xeninfo = xendom.get_vm_by_uuid(vm_ref)
- xeninfo.shutdown("poweroff")
- return xen_api_success_void()
+ XendTask.log_progress(0, 100, xeninfo.shutdown, "poweroff")
+ return xen_api_success_void()
+
def VM_clone(self, session, vm_ref):
return xen_api_error(XEND_ERROR_UNSUPPORTED)
+
def VM_destroy(self, session, vm_ref):
- return do_vm_func("domain_delete", vm_ref)
+ return XendTask.log_progress(0, 100, do_vm_func,
+ "domain_delete", vm_ref)
+
def VM_hard_reboot(self, session, vm_ref):
- return xen_api_error(XEND_ERROR_UNSUPPORTED)
+ return xen_api_error(XEND_ERROR_UNSUPPORTED)
+
def VM_hard_shutdown(self, session, vm_ref):
- return do_vm_func("domain_destroy", vm_ref)
+ return XendTask.log_progress(0, 100, do_vm_func,
+ "domain_destroy", vm_ref)
def VM_pause(self, session, vm_ref):
- return do_vm_func("domain_pause", vm_ref)
+ return XendTask.log_progress(0, 100, do_vm_func,
+ "domain_pause", vm_ref)
+
def VM_resume(self, session, vm_ref, start_paused):
- return do_vm_func("domain_resume", vm_ref, start_paused =
start_paused)
+ return XendTask.log_progress(0, 100, do_vm_func,
+ "domain_resume", vm_ref,
+ start_paused = start_paused)
+
def VM_start(self, session, vm_ref, start_paused):
- return do_vm_func("domain_start", vm_ref, start_paused = start_paused)
+ return XendTask.log_progress(0, 100, do_vm_func,
+ "domain_start", vm_ref,
+ start_paused = start_paused)
+
def VM_suspend(self, session, vm_ref):
- return do_vm_func("domain_suspend", vm_ref)
+ return XendTask.log_progress(0, 100, do_vm_func,
+ "domain_suspend", vm_ref)
+
def VM_unpause(self, session, vm_ref):
- return do_vm_func("domain_unpause", vm_ref)
+ return XendTask.log_progress(0, 100, do_vm_func,
+ "domain_unpause", vm_ref)
# Xen API: Class VBD
# ----------------------------------------------------------------
@@ -1095,8 +1196,9 @@ class XendAPI:
VBD_attr_inst = VBD_attr_rw + ['image']
- VBD_methods = ['media_change']
-
+ VBD_methods = [('media_change', None)]
+ VBD_funcs = [('create', 'VBD')]
+
# object methods
def VBD_get_record(self, session, vbd_ref):
xendom = XendDomain.instance()
@@ -1191,6 +1293,9 @@ class XendAPI:
VIF_attr_inst = VIF_attr_rw
+ VIF_funcs = [('create', 'VIF')]
+
+
# object methods
def VIF_get_record(self, session, vif_ref):
xendom = XendDomain.instance()
@@ -1242,8 +1347,9 @@ class XendAPI:
'read_only']
VDI_attr_inst = VDI_attr_ro + VDI_attr_rw
- VDI_methods = ['snapshot']
- VDI_funcs = ['get_by_name_label']
+ VDI_methods = [('snapshot', 'VDI')]
+ VDI_funcs = [('create', 'VDI'),
+ ('get_by_name_label', 'Set(VDI)')]
def _get_VDI(self, ref):
return XendNode.instance().get_sr().xen_api_get_by_uuid(ref)
@@ -1369,6 +1475,8 @@ class XendAPI:
VTPM_attr_inst = VTPM_attr_rw
+ VTPM_funcs = [('create', 'VTPM')]
+
# object methods
def VTPM_get_record(self, session, vtpm_ref):
xendom = XendDomain.instance()
@@ -1467,8 +1575,9 @@ class XendAPI:
'name_label',
'name_description']
- SR_methods = ['clone']
- SR_funcs = ['get_by_name_label']
+ SR_methods = [('clone', 'SR')]
+ SR_funcs = [('get_by_name_label', 'Set(SR)'),
+ ('get_by_uuid', 'SR')]
# Class Functions
def SR_get_all(self, session):
@@ -1542,6 +1651,67 @@ class XendAPI:
XendNode.instance().save()
return xen_api_success_void()
+
+class XendAPIAsyncProxy:
+ """ A redirector for Async.Class.function calls to XendAPI
+ but wraps the call for use with the XendTaskManager.
+
+ @ivar xenapi: Xen API instance
+ @ivar method_map: Mapping from XMLRPC method name to callable objects.
+ """
+
+ method_prefix = 'Async.'
+
+ def __init__(self, xenapi):
+ """Initialises the Async Proxy by making a map of all
+ implemented Xen API methods for use with XendTaskManager.
+
+ @param xenapi: XendAPI instance
+ """
+ self.xenapi = xenapi
+ self.method_map = {}
+ for method_name in dir(self.xenapi):
+ method = getattr(self.xenapi, method_name)
+ if method_name[0] != '_' and hasattr(method, 'async') \
+ and method.async == True:
+ self.method_map[method.api] = method
+
+ def _dispatch(self, method, args):
+ """Overridden method so that SimpleXMLRPCServer will
+ resolve methods through this method rather than through
+ inspection.
+
+ @param method: marshalled method name from XMLRPC.
+ @param args: marshalled arguments from XMLRPC.
+ """
+
+ # Only deal with method names that start with "Async."
+ if not method.startswith(self.method_prefix):
+ raise Exception('Method %s not supported' % method)
+
+ # Require 'session' argument to be present.
+ if len(args) < 1:
+ raise Exception('Not enough arguments')
+
+ # Lookup synchronous version of the method
+ synchronous_method_name = method[len(self.method_prefix):]
+ if synchronous_method_name not in self.method_map:
+ raise Exception('Method %s not supported' % method)
+
+ method = self.method_map[synchronous_method_name]
+
+ # Validate the session before proceeding
+ session = args[0]
+ if not auth_manager().is_session_valid(session):
+ return xen_api_error(['SESSION_INVALID', session])
+
+ # create and execute the task, and return task_uuid
+ return_type = getattr(method, 'return_type', None)
+ task_uuid = XendTaskManager.create_task(method, args,
+ synchronous_method_name,
+ return_type,
+ synchronous_method_name)
+ return xen_api_success(task_uuid)
def _decorate():
"""Initialise Xen API wrapper by making sure all functions
@@ -1561,7 +1731,8 @@ def _decorate():
'VDI' : valid_vdi,
'VTPM' : valid_vtpm,
'SR' : valid_sr,
- 'PIF' : valid_pif
+ 'PIF' : valid_pif,
+ 'task' : valid_task,
}
# Cheat methods
@@ -1582,17 +1753,11 @@ def _decorate():
setattr(XendAPI, get_by_uuid, _get_by_uuid)
setattr(XendAPI, get_uuid, _get_uuid)
- # 2. get_record is just getting all the attributes, so provide
- # a fake template implementation.
- #
- # TODO: ...
-
-
# Wrapping validators around XMLRPC calls
# ---------------------------------------
for cls, validator in classes.items():
- def doit(n, takes_instance):
+ def doit(n, takes_instance, async_support = False, return_type = None):
n_ = n.replace('.', '_')
try:
f = getattr(XendAPI, n_)
@@ -1604,6 +1769,10 @@ def _decorate():
for v in validators:
f = v(f)
f.api = n
+ f.async = async_support
+ if return_type:
+ f.return_type = return_type
+
setattr(XendAPI, n_, f)
except AttributeError:
log.warn("API call: %s not found" % n)
@@ -1616,19 +1785,21 @@ def _decorate():
# wrap validators around readable class attributes
for attr_name in ro_attrs + rw_attrs + XendAPI.Base_attr_ro:
- doit('%s.get_%s' % (cls, attr_name), True)
+ doit('%s.get_%s' % (cls, attr_name), True, async_support = False)
# wrap validators around writable class attrributes
for attr_name in rw_attrs + XendAPI.Base_attr_rw:
- doit('%s.set_%s' % (cls, attr_name), True)
+ doit('%s.set_%s' % (cls, attr_name), True, async_support = False)
# wrap validators around methods
- for method_name in methods + XendAPI.Base_methods:
- doit('%s.%s' % (cls, method_name), True)
+ for method_name, return_type in methods + XendAPI.Base_methods:
+ doit('%s.%s' % (cls, method_name), True, async_support = True)
# wrap validators around class functions
- for func_name in funcs + XendAPI.Base_funcs:
- doit('%s.%s' % (cls, func_name), False)
+ for func_name, return_type in funcs + XendAPI.Base_funcs:
+ doit('%s.%s' % (cls, func_name), False, async_support = True,
+ return_type = return_type)
+
_decorate()
diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendAPIConstants.py
--- a/tools/python/xen/xend/XendAPIConstants.py Wed Jan 24 14:36:03 2007 +0000
+++ b/tools/python/xen/xend/XendAPIConstants.py Wed Jan 24 13:26:26 2007 +0000
@@ -74,3 +74,4 @@ XEN_API_VDI_TYPE = ['system', 'user', 'e
XEN_API_VDI_TYPE = ['system', 'user', 'ephemeral']
XEN_API_DRIVER_TYPE = ['ioemu', 'paravirtualised']
XEN_API_VBD_TYPE = ['CD', 'Disk']
+XEN_API_TASK_STATUS_TYPE = ['pending', 'success', 'failure']
diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendDomainInfo.py
--- a/tools/python/xen/xend/XendDomainInfo.py Wed Jan 24 14:36:03 2007 +0000
+++ b/tools/python/xen/xend/XendDomainInfo.py Wed Jan 24 13:26:26 2007 +0000
@@ -44,6 +44,7 @@ from xen.xend.XendBootloader import boot
from xen.xend.XendBootloader import bootloader, bootloader_tidy
from xen.xend.XendError import XendError, VmError
from xen.xend.XendDevices import XendDevices
+from xen.xend.XendTask import XendTask
from xen.xend.xenstore.xstransact import xstransact, complete
from xen.xend.xenstore.xsutil import GetDomainPath, IntroduceDomain,
ResumeDomain
from xen.xend.xenstore.xswatch import xswatch
@@ -387,12 +388,13 @@ class XendDomainInfo:
if self.state == DOM_STATE_HALTED:
try:
- self._constructDomain()
- self._initDomain()
- self._storeVmDetails()
- self._storeDomDetails()
- self._registerWatches()
- self.refreshShutdown()
+ XendTask.log_progress(0, 30, self._constructDomain)
+ XendTask.log_progress(31, 60, self._initDomain)
+
+ XendTask.log_progress(61, 70, self._storeVmDetails)
+ XendTask.log_progress(71, 80, self._storeDomDetails)
+ XendTask.log_progress(81, 90, self._registerWatches)
+ XendTask.log_progress(91, 100, self.refreshShutdown)
# save running configuration if XendDomains believe domain is
# persistent
diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendTask.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/xend/XendTask.py Wed Jan 24 13:26:26 2007 +0000
@@ -0,0 +1,226 @@
+#===========================================================================
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of version 2.1 of the GNU Lesser General Public
+# License as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#============================================================================
+# Copyright (C) 2007 XenSource Ltd
+#============================================================================
+
+from xen.xend.XendAPIConstants import XEN_API_TASK_STATUS_TYPE
+from xen.xend.XendLogging import log
+import thread
+import threading
+
+class XendTask(threading.Thread):
+ """Represents a Asynchronous Task used by Xen API.
+
+ Basically proxies the callable object in a thread and returns the
+ results via self.{type,result,error_code,error_info}.
+
+ @cvar task_progress: Thread local storage for progress tracking.
+ It is a dict indexed by thread_id. Note that the
+ thread_id may be reused when the previous
+ thread with the thread_id ends.
+
+ @cvar task_progress_lock: lock on thread access to task_progress
+
+ """
+
+ # progress stack:
+ # thread_id : [(start_task, end_task),
+ # (start_sub_task, end_sub_task)..]
+ # example : (0, 100), (50, 100) (50, 100) ...
+ # That would mean that the task is 75% complete.
+ # as it is 50% of the last 50% of the task.
+
+ task_progress = {}
+ task_progress_lock = threading.Lock()
+
+ def __init__(self, uuid, func, args, func_name, return_type = None,
+ label = None, desc = None):
+ """
+ @param uuid: UUID of the task
+ @type uuid: string
+ @param func: Method to call (from XendAPI)
+ @type func: callable object
+ @param args: arguments to pass to function
+ @type args: list or tuple
+ @param label: name label of the task.
+ @type label: string
+ @param desc: name description of the task.
+ @type desc: string
+ @param func_name: function name, eg ('VM.start')
+ @type desc: string
+ """
+
+ threading.Thread.__init__(self)
+ self.status_lock = threading.Lock()
+ self.status = XEN_API_TASK_STATUS_TYPE[0]
+
+ self.progress = 0
+ self.eta = None # TODO: we have no time estimates
+ self.type = return_type
+ self.uuid = uuid
+
+ self.result = None
+ self.error_code = ''
+ self.error_info = []
+
+ self.name_label = label or func.__name__
+ self.name_description = desc
+ self.thread_id = 0
+
+ self.func_name = func_name
+ self.func = func
+ self.args = args
+
+ def set_status(self, new_status):
+ self.status_lock.acquire()
+ try:
+ self.status = new_status
+ finally:
+ self.status_lock.release()
+
+ def get_status(self):
+ self.status_lock.acquire()
+ try:
+ return self.status
+ finally:
+ self.status_lock.release()
+
+ def run(self):
+ """Runs the method and stores the result for later access.
+
+ Is invoked by threading.Thread.start().
+ """
+
+ self.thread_id = thread.get_ident()
+ self.task_progress_lock.acquire()
+ try:
+ self.task_progress[self.thread_id] = {}
+ self.progress = 0
+ finally:
+ self.task_progress_lock.release()
+
+ try:
+ result = self.func(*self.args)
+ if result['Status'] == 'Success':
+ self.result = result['Value']
+ self.set_status(XEN_API_TASK_STATUS_TYPE[1])
+ else:
+ self.error_code = result['ErrorDescription'][0]
+ self.error_info = result['ErrorDescription'][1:]
+ self.set_status(XEN_API_TASK_STATUS_TYPE[2])
+ except Exception, e:
+ log.exception('Error running Async Task')
+ self.error_code = 'INTERNAL ERROR'
+ self.error_info = [str(e)]
+ self.set_status(XEN_API_TASK_STATUS_TYPE[2])
+
+ self.task_progress_lock.acquire()
+ try:
+ del self.task_progress[self.thread_id]
+ self.progress = 100
+ finally:
+ self.task_progress_lock.release()
+
+ def get_record(self):
+ """Returns a Xen API compatible record."""
+ return {
+ 'uuid': self.uuid,
+ 'name_label': self.name_label,
+ 'name_description': self.name_description,
+ 'status': self.status,
+ 'progress': self.get_progress(),
+ 'eta': self.eta,
+ 'type': self.type,
+ 'result': self.result,
+ 'error_code': self.error_code,
+ 'error_info': self.error_info,
+ }
+
+ def get_progress(self):
+ """ Checks the thread local progress storage. """
+ if self.status != XEN_API_TASK_STATUS_TYPE[0]:
+ return 100
+
+ self.task_progress_lock.acquire()
+ try:
+ # Pop each progress range in the stack and map it on to
+ # the next progress range until we find out cumulative
+ # progress based on the (start, end) range of each level
+ start = 0
+ prog_stack = self.task_progress.get(self.thread_id, [])[:]
+ if len(prog_stack) > 0:
+ start, stop = prog_stack.pop()
+ while prog_stack:
+ new_start, new_stop = prog_stack.pop()
+ start = new_start + ((new_stop - new_start)/100.0 * start)
+
+ # only update progress if it increases, this will prevent
+ # progress from going backwards when tasks are popped off
+ # the stack
+ if start > self.progress:
+ self.progress = int(start)
+ finally:
+ self.task_progress_lock.release()
+
+ return self.progress
+
+
+ def log_progress(cls, progress_min, progress_max,
+ func, *args, **kwds):
+ """ Callable function wrapper that logs the progress of the
+ function to thread local storage for task progress calculation.
+
+ This is a class method so other parts of Xend will update
+ the task progress by calling:
+
+ XendTask.push_progress(progress_min, progress_max,
+ func, *args, **kwds)
+
+ The results of the progress is stored in thread local storage
+ and the result of the func(*args, **kwds) is returned back
+ to the caller.
+
+ """
+ thread_id = thread.get_ident()
+ retval = None
+
+ # Log the start of the method
+ cls.task_progress_lock.acquire()
+ try:
+ if type(cls.task_progress.get(thread_id)) != list:
+ cls.task_progress[thread_id] = []
+
+ cls.task_progress[thread_id].append((progress_min,
+ progress_max))
+ finally:
+ cls.task_progress_lock.release()
+
+ # Execute the method
+ retval = func(*args, **kwds)
+
+ # Log the end of the method by popping the progress range
+ # off the stack.
+ cls.task_progress_lock.acquire()
+ try:
+ cls.task_progress[thread_id].pop()
+ finally:
+ cls.task_progress_lock.release()
+
+ return retval
+
+ log_progress = classmethod(log_progress)
+
+
+
diff -r 259470f0856b -r 248a9c36d816 tools/python/xen/xend/XendTaskManager.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/xend/XendTaskManager.py Wed Jan 24 13:26:26 2007 +0000
@@ -0,0 +1,110 @@
+#===========================================================================
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of version 2.1 of the GNU Lesser General Public
+# License as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#============================================================================
+# Copyright (C) 2007 XenSource Ltd
+#============================================================================
+
+"""
+Task Manager for Xen API asynchronous tasks.
+
+Stores all tasks in a simple dictionary in module's own local storage to
+avoid the 'instance()' methods.
+
+Tasks are indexed by UUID.
+
+"""
+
+from xen.xend.XendTask import XendTask
+from xen.xend import uuid
+import threading
+
+tasks = {}
+tasks_lock = threading.Lock()
+
+def create_task(func, args, func_name, return_type = None, label = ''):
+ """Creates a new Task and registers it with the XendTaskManager.
+
+ @param func: callable object XMLRPC method
+ @type func: callable object
+ @param args: tuple or list of arguments
+ @type args: tuple or list
+ @param func_name: XMLRPC method name, so we can estimate the progress
+ @type func_name: string
+
+ @return: Task UUID
+ @rtype: string.
+ """
+ task_uuid = uuid.createString()
+ try:
+ tasks_lock.acquire()
+ task = XendTask(task_uuid, func, args, func_name,
+ return_type = return_type, label = label)
+ tasks[task_uuid] = task
+ finally:
+ tasks_lock.release()
+
+ task.start()
+
+ return task_uuid
+
+def destroy_task(task_uuid):
+ """Destroys a task.
+
+ @param task_uuid: Task UUID
+ @type task_uuid: string.
+ """
+ try:
+ tasks_lock.acquire()
+ if task_uuid in tasks:
+ del tasks[task_uuid]
+ finally:
+ tasks_lock.release()
+
+def get_all_tasks():
+ """ Returns all the UUID of tracked tasks, completed or pending.
+
+ @returns: list of UUIDs
+ @rtype: list of strings
+ """
+ try:
+ tasks_lock.acquire()
+ return tasks.keys()
+ finally:
+ tasks_lock.release()
+
+def get_task(task_uuid):
+ """ Retrieves a task by UUID.
+
+ @rtype: XendTask or None
+ @return: Task denoted by UUID.
+ """
+ try:
+ tasks_lock.acquire()
+ return tasks.get(task_uuid)
+ finally:
+ tasks_lock.release()
+
+def get_tasks_by_name(task_name):
+ """ Retrieves a task by UUID.
+
+ @rtype: XendTask or None
+ @return: Task denoted by UUID.
+ """
+ try:
+ tasks_lock.acquire()
+ return [t.uuid for t in tasks if t.name_label == name]
+ finally:
+ tasks_lock.release()
+
+
diff -r 259470f0856b -r 248a9c36d816
tools/python/xen/xend/server/XMLRPCServer.py
--- a/tools/python/xen/xend/server/XMLRPCServer.py Wed Jan 24 14:36:03
2007 +0000
+++ b/tools/python/xen/xend/server/XMLRPCServer.py Wed Jan 24 13:26:26
2007 +0000
@@ -139,6 +139,8 @@ class XMLRPCServer:
meth = getattr(self.xenapi, meth_name)
if callable(meth) and hasattr(meth, 'api'):
self.server.register_function(meth, getattr(meth, 'api'))
+
+ self.server.register_instance(XendAPI.XendAPIAsyncProxy(self.xenapi))
# Legacy deprecated xm xmlrpc api
# --------------------------------------------------------------------
_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog
|