WARNING - OLD ARCHIVES

This is an archived copy of the Xen.org mailing list, which we have preserved to ensure that existing links to archives are not broken. The live archive, which contains the latest emails, can be found at http://lists.xen.org/
   
 
 
Xen 
 
Home Products Support Community News
 
   
 

xen-api

[Xen-API] [PATCH 8 of 8] CA-43021: hook in 'sparse_dd' for improved VM.c

To: xen-api@xxxxxxxxxxxxxxxxxxx
Subject: [Xen-API] [PATCH 8 of 8] CA-43021: hook in 'sparse_dd' for improved VM.copy performance
From: David Scott <dave.scott@xxxxxxxxxxxxx>
Date: Mon, 23 Aug 2010 13:17:48 +0100
Delivery-date: Mon, 23 Aug 2010 05:42:05 -0700
Envelope-to: www-data@xxxxxxxxxxxxxxxxxxx
In-reply-to: <patchbomb.1282565860@ely>
List-help: <mailto:xen-api-request@lists.xensource.com?subject=help>
List-id: Discussion of API issues surrounding Xen <xen-api.lists.xensource.com>
List-post: <mailto:xen-api@lists.xensource.com>
List-subscribe: <http://lists.xensource.com/mailman/listinfo/xen-api>, <mailto:xen-api-request@lists.xensource.com?subject=subscribe>
List-unsubscribe: <http://lists.xensource.com/mailman/listinfo/xen-api>, <mailto:xen-api-request@lists.xensource.com?subject=unsubscribe>
References: <patchbomb.1282565860@ely>
Sender: xen-api-bounces@xxxxxxxxxxxxxxxxxxx
User-agent: Mercurial-patchbomb/1.4.3
# HG changeset patch
# User David Scott <dave.scott@xxxxxxxxxxxxx>
# Date 1282565816 -3600
# Node ID a10ff0cebf46034aa72dc4eb07b903416b8fe2f0
# Parent  2b18e28b785b150c42ff1ffe52eaae906365bc42
CA-43021: hook in 'sparse_dd' for improved VM.copy performance

On local LVHD, VM.copies of freshly installed guests are much quicker:

Guest          Previous VM.copy time    New VM.copy time   Speedup
----------------------------------------------------------------
Debian Lenny   2:11                     1:18               40%
Windows 7      14:18                    7:57               44%

Signed-off-by: David Scott <dave.scott@xxxxxxxxxxxxx>

diff -r 2b18e28b785b -r a10ff0cebf46 ocaml/xapi/OMakefile
--- a/ocaml/xapi/OMakefile      Mon Aug 23 13:16:55 2010 +0100
+++ b/ocaml/xapi/OMakefile      Mon Aug 23 13:16:56 2010 +0100
@@ -135,6 +135,7 @@
        config_file_io \
        slave_backup \
        sm_fs_ops \
+       sparse_dd_wrapper \
        vmopshelpers \
        vm_config \
        vmops \
diff -r 2b18e28b785b -r a10ff0cebf46 ocaml/xapi/import_raw_vdi.ml
--- a/ocaml/xapi/import_raw_vdi.ml      Mon Aug 23 13:16:55 2010 +0100
+++ b/ocaml/xapi/import_raw_vdi.ml      Mon Aug 23 13:16:56 2010 +0100
@@ -61,11 +61,8 @@
            
                Server_helpers.exec_with_new_task "VDI.import" 
                (fun __context -> 
-                Sm_fs_ops.with_block_attached_device __context rpc session_id 
vdi `RW
-                  (fun device ->
-                     let fd = Unix.openfile device  [ Unix.O_WRONLY ] 0 in
-                     finally 
-                       (fun () -> 
+                Sm_fs_ops.with_open_block_attached_device __context rpc 
session_id vdi `RW
+                  (fun fd ->
                           try
                             if chunked
                             then receive_chunks s fd
@@ -73,8 +70,6 @@
                             Unixext.fsync fd;
                           with Unix.Unix_error(Unix.EIO, _, _) ->
                             raise (Api_errors.Server_error 
(Api_errors.vdi_io_error, ["Device I/O errors"]))
-                       )
-                       (fun () -> Unix.close fd)
                   )
            );
            TaskHelper.complete ~__context [];
diff -r 2b18e28b785b -r a10ff0cebf46 ocaml/xapi/sm_fs_ops.ml
--- a/ocaml/xapi/sm_fs_ops.ml   Mon Aug 23 13:16:55 2010 +0100
+++ b/ocaml/xapi/sm_fs_ops.ml   Mon Aug 23 13:16:56 2010 +0100
@@ -34,6 +34,19 @@
        let vbd = List.hd vbds in
        f ("/dev/" ^ (Db.VBD.get_device ~__context ~self:vbd)))
 
+(** Block-attach a VDI to dom0, open the device and pass the file descriptor 
to [f] *)
+let with_open_block_attached_device __context rpc session_id vdi mode f = 
+       with_block_attached_device __context rpc session_id vdi mode
+       (fun path ->
+               let mode' = match mode with
+               | `RO -> [ Unix.O_RDONLY ]
+               | `RW -> [ Unix.O_RDWR ] in
+               let fd = Unix.openfile path mode' 0 in
+               finally
+               (fun () -> f fd)
+               (fun () -> Unix.close fd)
+       )
+
 (** Execute a function with a list of device paths after attaching a bunch of 
VDIs to dom0 *)
 let with_block_attached_devices (__context: Context.t) rpc (session_id: 
API.ref_session) (vdis: API.ref_VDI list) mode f = 
   let rec loop acc = function
@@ -41,35 +54,13 @@
     | vdi :: vdis -> with_block_attached_device __context rpc session_id vdi 
mode (fun path -> loop (path :: acc) vdis) in
   loop [] vdis
 
-(** Open an import_raw_vdi HTTP connection and run [f] with the socket *)
-let with_import_vdi __context rpc session_id vdi f = 
-       let subtask_of = Context.get_task_id __context in
-       Server_helpers.exec_with_new_task "VDI.import" 
-       (fun __context -> 
-               (* Find a suitable host for the SR containing the VDI *)
-               let sr = Db.VDI.get_SR ~__context ~self:vdi in
-               let host = Importexport.find_host_for_sr ~__context sr in
-               let address = Db.Host.get_address ~__context ~self:host in
-               let importtask = Client.Task.create rpc session_id "VDI.import" 
"" in
-
-                       let headers = Xapi_http.http_request 
-                               ~cookie:(["session_id", Ref.string_of 
session_id;
-                                         "task_id", Ref.string_of importtask;
-                                         "vdi", Ref.string_of vdi;
-                                         "chunked", "true"])
-                               Http.Put address Constants.import_raw_vdi_uri in
-                       let writer _ _ sock = f sock; true in
-                       if not (Xmlrpcclient.do_secure_http_rpc 
~use_stunnel_cache:false 
-                               ~task_id:(Ref.string_of (Context.get_task_id 
__context))
-                               ~host:address ~port:Xapi_globs.default_ssl_port 
~headers ~body:"" writer)
-                       then failwith "with_import_vdi";
-                       debug "Waiting for import task (%s) to complete" 
(Ref.string_of importtask);
-                       (* wait for the task to complete before cleaning 
anything up *)
-                       while Client.Task.get_status rpc session_id importtask 
= `pending do
-                               Thread.delay 1.;
-                       done;
-                       Client.Task.destroy rpc session_id importtask;
-       )
+(** Return a URL suitable for passing to the sparse_dd process *)
+let import_vdi_url ~__context rpc session_id vdi = 
+        (* Find a suitable host for the SR containing the VDI *)
+        let sr = Db.VDI.get_SR ~__context ~self:vdi in
+        let host = Importexport.find_host_for_sr ~__context sr in
+        let address = Db.Host.get_address ~__context ~self:host in
+        Printf.sprintf "https://%s%s?vdi=%s&session_id=%s"; address 
Constants.import_raw_vdi_uri (Ref.string_of vdi) (Ref.string_of session_id)
 
 (** Catch those smint exceptions and convert into meaningful internal errors *)
 let with_api_errors f x = 
@@ -167,91 +158,6 @@
              raise e
          )
          
-exception Cancelled
-exception NonZero
-
-(** The copying routine can operate on anything which looks like a 
file-descriptor/Stream *)
-module type Stream = sig
-       type stream
-       val write: stream -> int64 -> string -> int -> int -> unit
-end
-
-(** Writes directly to a file *)
-module FileStream = struct
-       type stream = Unix.file_descr
-       let write stream stream_offset buf off len =
-               let newoff = Unix.LargeFile.lseek stream stream_offset 
Unix.SEEK_SET in
-               (* Printf.printf "Unix.write buf len %d; offset %d; len %d\n" 
(String.length buf) offset len; *)
-               let n = Unix.write stream buf off len in
-               if n < len then failwith "Short write"
-end
-
-(** Marshals data across the network in chunks *)
-module NetworkStream = struct
-       open Sparse_encoding
-       type stream = Unix.file_descr
-       let write stream stream_offset buf off len =
-               let copy = String.create len in
-               String.blit buf off copy 0 len;
-               let x = { Chunk.start = stream_offset; data = copy } in
-               Chunk.marshal stream x
-end
-
-module DD(Output: Stream) = struct
-
-(* dd with sparseness check *)
-let sparse_dd refresh_session ~__context sparse ifd stream size bs : unit =
-  let round v = int_of_float (v *. 50.0) in
-  let update = 
-    let oldvalue = ref (-1.0) in
-    fun value ->  
-      if round value <> round !oldvalue then begin
-       TaskHelper.exn_if_cancelling ~__context;
-       TaskHelper.operate_on_db_task ~__context 
-         (fun self -> 
-           Db.Task.set_progress ~__context ~self ~value);
-      end;
-      oldvalue := value
-  in
-
-  let buf = String.create bs in
-  
-  let allzero s n =
-    try
-      for i=0 to n-1 do
-        if s.[i] <> '\000' then raise NonZero
-      done;
-      true
-    with NonZero -> false
-  in
-
-  let rec do_block offset =
-    refresh_session ();
-
-    update ((Int64.to_float offset) /. (Int64.to_float size));   
-    let remaining = Int64.sub size offset in
-    if remaining=0L 
-    then ()  (* EOF *)
-    else
-      begin
-       let this_chunk = Int64.to_int (min remaining (Int64.of_int bs)) in
-       Unixext.really_read ifd buf 0 this_chunk;
-
-       if not sparse || (not (allzero buf this_chunk))
-       then Output.write stream offset buf 0 this_chunk;
-
-       do_block (Int64.add offset (Int64.of_int this_chunk))
-      end
-  in
-  do_block 0L;
-  Output.write stream 0L "" 0 0; (* end of stream is a zero-sized chunk *)
-  update 1.0
-
-end
-
-module LocalDD = DD(FileStream)
-module RemoteDD = DD(NetworkStream)
-
 (* SCTX-286: thin provisioning is thrown away over VDI.copy, 
VM.import(VM.export).
    Return true if the newly created vdi must have zeroes written into it; 
default to false
    under the assumption that "proper" storage devices (ie not our legacy LVM 
stuff) always 
@@ -286,47 +192,30 @@
 let copy_vdi ~__context vdi_src vdi_dst = 
   TaskHelper.set_cancellable ~__context;
   Helpers.call_api_functions ~__context (fun rpc session_id ->
-  let refresh_session = Xapi_session.consider_touching_session rpc session_id 
in
-
 
   (* Use the sparse copy unless we must write zeroes into the new VDI *)
   let sparse = not (must_write_zeroes_into_new_vdi ~__context vdi_dst) in
 
   (* Copy locally unless this host can't see the destination SR *)
-  let sr_dst = Db.VDI.get_SR ~__context ~self:vdi_dst in
-  let local_copy = Importexport.check_sr_availability ~__context sr_dst in
+  let can_local_copy = Importexport.check_sr_availability ~__context 
(Db.VDI.get_SR ~__context ~self:vdi_dst) in
 
   let size = Db.VDI.get_virtual_size ~__context ~self:vdi_src in
-  let blocksize = 1024*1024 in
 
-  debug "Sm_fs_ops.copy_vdi: %s-copying %Ld in blocks of %d%s preserving 
sparseness" (if local_copy then "locally" else "remotely") size blocksize (if 
sparse then "" else " NOT");
+  let local_copy = can_local_copy && not (Xapi_fist.force_remote_vdi_copy ()) 
in
 
-  let local_dd = LocalDD.sparse_dd refresh_session ~__context sparse in
-  let remote_dd = RemoteDD.sparse_dd refresh_session ~__context sparse in
+  debug "Sm_fs_ops.copy_vdi: %s-copying %Ld%s preserving sparseness" (if 
local_copy then "locally" else "remotely") size (if sparse then "" else " NOT");
 
 try
+       let remote_uri = import_vdi_url ~__context rpc session_id vdi_dst in
+       debug "remote_uri = %s" remote_uri;
        with_block_attached_device __context rpc session_id vdi_src `RO
        (fun device_src ->
-               let ifd=Unix.openfile device_src [Unix.O_RDONLY] 0o600 in
-               finally
-               (fun () ->
-                       if local_copy && not (Xapi_fist.force_remote_vdi_copy 
())
-                       then with_block_attached_device __context rpc 
session_id vdi_dst `RW
-                               (fun device_dst ->
-                                       let ofd=Unix.openfile device_dst 
[Unix.O_WRONLY; Unix.O_SYNC] 0o600 in
-                                       finally
-                                       (fun () ->
-                                               local_dd ifd ofd size blocksize
-                                       )
-                                       (fun () -> Unix.close ofd)
-
-                               )
-                       else with_import_vdi __context rpc session_id vdi_dst
-                               (fun ofd ->
-                                       remote_dd ifd ofd size blocksize
-                               )
+                       if local_copy
+               then with_block_attached_device __context rpc session_id 
vdi_dst `RW
+               (fun device_dst ->
+                       Sparse_dd_wrapper.dd ~__context sparse device_src 
device_dst size
                )
-               (fun () -> Unix.close ifd)
+               else Sparse_dd_wrapper.dd ~__context sparse device_src 
remote_uri size
        )
 with
 | Unix.Unix_error(Unix.EIO, _, _) ->
diff -r 2b18e28b785b -r a10ff0cebf46 ocaml/xapi/sm_fs_ops.mli
--- a/ocaml/xapi/sm_fs_ops.mli  Mon Aug 23 13:16:55 2010 +0100
+++ b/ocaml/xapi/sm_fs_ops.mli  Mon Aug 23 13:16:56 2010 +0100
@@ -17,6 +17,7 @@
 
 val with_block_attached_devices :    Context.t -> (XMLRPC.xmlrpc -> 
XMLRPC.xmlrpc) -> API.ref_session -> API.ref_VDI list -> API.vbd_mode -> 
(string list -> 'a) -> 'a
 val with_block_attached_device  :    Context.t -> (XMLRPC.xmlrpc -> 
XMLRPC.xmlrpc) -> API.ref_session -> API.ref_VDI -> API.vbd_mode -> (string -> 
'a) -> 'a
+val with_open_block_attached_device  :    Context.t -> (XMLRPC.xmlrpc -> 
XMLRPC.xmlrpc) -> API.ref_session -> API.ref_VDI -> API.vbd_mode -> 
(Unix.file_descr -> 'a) -> 'a
 val with_new_fs_vdi : Context.t -> name_label:string -> 
name_description:string -> sR:API.ref_SR -> required_space:int64 -> 
_type:API.vdi_type ->
   sm_config:API.string_to_string_map -> (API.ref_VDI -> string -> 'a) -> 'a
 val with_fs_vdi :   Context.t -> API.ref_VDI -> (string -> 'a) -> 'a
diff -r 2b18e28b785b -r a10ff0cebf46 ocaml/xapi/sparse_dd_wrapper.ml
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/ocaml/xapi/sparse_dd_wrapper.ml   Mon Aug 23 13:16:56 2010 +0100
@@ -0,0 +1,65 @@
+(*
+ * Copyright (C) 2006-2009 Citrix Systems Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; version 2.1 only. with the special
+ * exception on linking described in file LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Lesser General Public License for more details.
+ *)
+(* Allows xapi to drive the sparse_dd program *)
+
+open Vmopshelpers
+open Pervasiveext
+open Client
+open Printf
+
+module D=Debug.Debugger(struct let name="xapi" end)
+open D
+
+(** Use the new external sparse_dd program *)
+let dd ~__context prezeroed infile outfile size = 
+       let pipe_read, pipe_write = Unix.pipe () in
+       let to_close = ref [ pipe_read; pipe_write ] in
+       let close x = if List.mem x !to_close then (Unix.close x; to_close := 
List.filter (fun y -> y <> x) !to_close) in
+       finally
+       (fun () ->
+               match Forkhelpers.with_logfile_fd "sparse_dd"
+                       (fun log_fd ->
+                               let pid = Forkhelpers.safe_close_and_exec None 
(Some pipe_write) (Some log_fd) []
+                               "/opt/xensource/libexec/sparse_dd"
+                               ([ "-machine"; "-src"; infile; "-dest"; 
outfile; "-size"; Int64.to_string size ] @
+                               (if prezeroed then [ "-prezeroed" ] else [])) in
+                               close pipe_write;
+                               (* Read Progress: output from the binary *)
+                               let buf = String.create 128 in
+                               let finished = ref false in
+                               while not (!finished) do
+                                       let n = Unix.read pipe_read buf 0 
(String.length buf) in
+                                       if n = 0 then finished := true else 
debug "sparse_dd: %s" (String.sub buf 0 n);
+                                       try 
+                                               Scanf.sscanf (String.sub buf 0 
n) "Progress: %d"
+                                               (fun progress ->
+                                                       
TaskHelper.exn_if_cancelling ~__context;
+                                                       
TaskHelper.operate_on_db_task ~__context 
+                                                       (fun self -> 
Db.Task.set_progress ~__context ~self ~value:(float_of_int progress /. 100.))
+                                               )
+                                       with _ -> ()
+                               done;
+                               match Forkhelpers.waitpid pid with
+                               | (_, Unix.WEXITED 0) -> ()
+                               | (_, Unix.WEXITED n) -> error "sparse_dd exit: 
%d" n; failwith "sparse_dd"
+                       ) with
+               | Forkhelpers.Success _ -> ()
+               | Forkhelpers.Failure (log, exn) ->
+                       error "Failure from sparse_dd: %s" log;
+                       raise exn       
+       )
+       (fun () ->
+               close pipe_read;
+               close pipe_write)
+       
\ No newline at end of file
 ocaml/xapi/OMakefile            |    1 +
 ocaml/xapi/import_raw_vdi.ml    |    9 +-
 ocaml/xapi/sm_fs_ops.ml         |  171 +++++++--------------------------------
 ocaml/xapi/sm_fs_ops.mli        |    1 +
 ocaml/xapi/sparse_dd_wrapper.ml |   65 +++++++++++++++
 5 files changed, 99 insertions(+), 148 deletions(-)


Attachment: sm_fs_ops.patch
Description: Text Data

_______________________________________________
xen-api mailing list
xen-api@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/mailman/listinfo/xen-api