# HG changeset patch # User David Scott # Date 1282565816 -3600 # Node ID a10ff0cebf46034aa72dc4eb07b903416b8fe2f0 # Parent 2b18e28b785b150c42ff1ffe52eaae906365bc42 CA-43021: hook in 'sparse_dd' for improved VM.copy performance On local LVHD, VM.copies of freshly installed guests are much quicker: Guest Previous VM.copy time New VM.copy time Speedup ---------------------------------------------------------------- Debian Lenny 2:11 1:18 40% Windows 7 14:18 7:57 44% Signed-off-by: David Scott diff -r 2b18e28b785b -r a10ff0cebf46 ocaml/xapi/OMakefile --- a/ocaml/xapi/OMakefile Mon Aug 23 13:16:55 2010 +0100 +++ b/ocaml/xapi/OMakefile Mon Aug 23 13:16:56 2010 +0100 @@ -135,6 +135,7 @@ config_file_io \ slave_backup \ sm_fs_ops \ + sparse_dd_wrapper \ vmopshelpers \ vm_config \ vmops \ diff -r 2b18e28b785b -r a10ff0cebf46 ocaml/xapi/import_raw_vdi.ml --- a/ocaml/xapi/import_raw_vdi.ml Mon Aug 23 13:16:55 2010 +0100 +++ b/ocaml/xapi/import_raw_vdi.ml Mon Aug 23 13:16:56 2010 +0100 @@ -61,11 +61,8 @@ Server_helpers.exec_with_new_task "VDI.import" (fun __context -> - Sm_fs_ops.with_block_attached_device __context rpc session_id vdi `RW - (fun device -> - let fd = Unix.openfile device [ Unix.O_WRONLY ] 0 in - finally - (fun () -> + Sm_fs_ops.with_open_block_attached_device __context rpc session_id vdi `RW + (fun fd -> try if chunked then receive_chunks s fd @@ -73,8 +70,6 @@ Unixext.fsync fd; with Unix.Unix_error(Unix.EIO, _, _) -> raise (Api_errors.Server_error (Api_errors.vdi_io_error, ["Device I/O errors"])) - ) - (fun () -> Unix.close fd) ) ); TaskHelper.complete ~__context []; diff -r 2b18e28b785b -r a10ff0cebf46 ocaml/xapi/sm_fs_ops.ml --- a/ocaml/xapi/sm_fs_ops.ml Mon Aug 23 13:16:55 2010 +0100 +++ b/ocaml/xapi/sm_fs_ops.ml Mon Aug 23 13:16:56 2010 +0100 @@ -34,6 +34,19 @@ let vbd = List.hd vbds in f ("/dev/" ^ (Db.VBD.get_device ~__context ~self:vbd))) +(** Block-attach a VDI to dom0, open the device and pass the file descriptor to [f] *) +let with_open_block_attached_device __context rpc session_id vdi mode f = + with_block_attached_device __context rpc session_id vdi mode + (fun path -> + let mode' = match mode with + | `RO -> [ Unix.O_RDONLY ] + | `RW -> [ Unix.O_RDWR ] in + let fd = Unix.openfile path mode' 0 in + finally + (fun () -> f fd) + (fun () -> Unix.close fd) + ) + (** Execute a function with a list of device paths after attaching a bunch of VDIs to dom0 *) let with_block_attached_devices (__context: Context.t) rpc (session_id: API.ref_session) (vdis: API.ref_VDI list) mode f = let rec loop acc = function @@ -41,35 +54,13 @@ | vdi :: vdis -> with_block_attached_device __context rpc session_id vdi mode (fun path -> loop (path :: acc) vdis) in loop [] vdis -(** Open an import_raw_vdi HTTP connection and run [f] with the socket *) -let with_import_vdi __context rpc session_id vdi f = - let subtask_of = Context.get_task_id __context in - Server_helpers.exec_with_new_task "VDI.import" - (fun __context -> - (* Find a suitable host for the SR containing the VDI *) - let sr = Db.VDI.get_SR ~__context ~self:vdi in - let host = Importexport.find_host_for_sr ~__context sr in - let address = Db.Host.get_address ~__context ~self:host in - let importtask = Client.Task.create rpc session_id "VDI.import" "" in - - let headers = Xapi_http.http_request - ~cookie:(["session_id", Ref.string_of session_id; - "task_id", Ref.string_of importtask; - "vdi", Ref.string_of vdi; - "chunked", "true"]) - Http.Put address Constants.import_raw_vdi_uri in - let writer _ _ sock = f sock; true in - if not (Xmlrpcclient.do_secure_http_rpc ~use_stunnel_cache:false - ~task_id:(Ref.string_of (Context.get_task_id __context)) - ~host:address ~port:Xapi_globs.default_ssl_port ~headers ~body:"" writer) - then failwith "with_import_vdi"; - debug "Waiting for import task (%s) to complete" (Ref.string_of importtask); - (* wait for the task to complete before cleaning anything up *) - while Client.Task.get_status rpc session_id importtask = `pending do - Thread.delay 1.; - done; - Client.Task.destroy rpc session_id importtask; - ) +(** Return a URL suitable for passing to the sparse_dd process *) +let import_vdi_url ~__context rpc session_id vdi = + (* Find a suitable host for the SR containing the VDI *) + let sr = Db.VDI.get_SR ~__context ~self:vdi in + let host = Importexport.find_host_for_sr ~__context sr in + let address = Db.Host.get_address ~__context ~self:host in + Printf.sprintf "https://%s%s?vdi=%s&session_id=%s" address Constants.import_raw_vdi_uri (Ref.string_of vdi) (Ref.string_of session_id) (** Catch those smint exceptions and convert into meaningful internal errors *) let with_api_errors f x = @@ -167,91 +158,6 @@ raise e ) -exception Cancelled -exception NonZero - -(** The copying routine can operate on anything which looks like a file-descriptor/Stream *) -module type Stream = sig - type stream - val write: stream -> int64 -> string -> int -> int -> unit -end - -(** Writes directly to a file *) -module FileStream = struct - type stream = Unix.file_descr - let write stream stream_offset buf off len = - let newoff = Unix.LargeFile.lseek stream stream_offset Unix.SEEK_SET in - (* Printf.printf "Unix.write buf len %d; offset %d; len %d\n" (String.length buf) offset len; *) - let n = Unix.write stream buf off len in - if n < len then failwith "Short write" -end - -(** Marshals data across the network in chunks *) -module NetworkStream = struct - open Sparse_encoding - type stream = Unix.file_descr - let write stream stream_offset buf off len = - let copy = String.create len in - String.blit buf off copy 0 len; - let x = { Chunk.start = stream_offset; data = copy } in - Chunk.marshal stream x -end - -module DD(Output: Stream) = struct - -(* dd with sparseness check *) -let sparse_dd refresh_session ~__context sparse ifd stream size bs : unit = - let round v = int_of_float (v *. 50.0) in - let update = - let oldvalue = ref (-1.0) in - fun value -> - if round value <> round !oldvalue then begin - TaskHelper.exn_if_cancelling ~__context; - TaskHelper.operate_on_db_task ~__context - (fun self -> - Db.Task.set_progress ~__context ~self ~value); - end; - oldvalue := value - in - - let buf = String.create bs in - - let allzero s n = - try - for i=0 to n-1 do - if s.[i] <> '\000' then raise NonZero - done; - true - with NonZero -> false - in - - let rec do_block offset = - refresh_session (); - - update ((Int64.to_float offset) /. (Int64.to_float size)); - let remaining = Int64.sub size offset in - if remaining=0L - then () (* EOF *) - else - begin - let this_chunk = Int64.to_int (min remaining (Int64.of_int bs)) in - Unixext.really_read ifd buf 0 this_chunk; - - if not sparse || (not (allzero buf this_chunk)) - then Output.write stream offset buf 0 this_chunk; - - do_block (Int64.add offset (Int64.of_int this_chunk)) - end - in - do_block 0L; - Output.write stream 0L "" 0 0; (* end of stream is a zero-sized chunk *) - update 1.0 - -end - -module LocalDD = DD(FileStream) -module RemoteDD = DD(NetworkStream) - (* SCTX-286: thin provisioning is thrown away over VDI.copy, VM.import(VM.export). Return true if the newly created vdi must have zeroes written into it; default to false under the assumption that "proper" storage devices (ie not our legacy LVM stuff) always @@ -286,47 +192,30 @@ let copy_vdi ~__context vdi_src vdi_dst = TaskHelper.set_cancellable ~__context; Helpers.call_api_functions ~__context (fun rpc session_id -> - let refresh_session = Xapi_session.consider_touching_session rpc session_id in - (* Use the sparse copy unless we must write zeroes into the new VDI *) let sparse = not (must_write_zeroes_into_new_vdi ~__context vdi_dst) in (* Copy locally unless this host can't see the destination SR *) - let sr_dst = Db.VDI.get_SR ~__context ~self:vdi_dst in - let local_copy = Importexport.check_sr_availability ~__context sr_dst in + let can_local_copy = Importexport.check_sr_availability ~__context (Db.VDI.get_SR ~__context ~self:vdi_dst) in let size = Db.VDI.get_virtual_size ~__context ~self:vdi_src in - let blocksize = 1024*1024 in - debug "Sm_fs_ops.copy_vdi: %s-copying %Ld in blocks of %d%s preserving sparseness" (if local_copy then "locally" else "remotely") size blocksize (if sparse then "" else " NOT"); + let local_copy = can_local_copy && not (Xapi_fist.force_remote_vdi_copy ()) in - let local_dd = LocalDD.sparse_dd refresh_session ~__context sparse in - let remote_dd = RemoteDD.sparse_dd refresh_session ~__context sparse in + debug "Sm_fs_ops.copy_vdi: %s-copying %Ld%s preserving sparseness" (if local_copy then "locally" else "remotely") size (if sparse then "" else " NOT"); try + let remote_uri = import_vdi_url ~__context rpc session_id vdi_dst in + debug "remote_uri = %s" remote_uri; with_block_attached_device __context rpc session_id vdi_src `RO (fun device_src -> - let ifd=Unix.openfile device_src [Unix.O_RDONLY] 0o600 in - finally - (fun () -> - if local_copy && not (Xapi_fist.force_remote_vdi_copy ()) - then with_block_attached_device __context rpc session_id vdi_dst `RW - (fun device_dst -> - let ofd=Unix.openfile device_dst [Unix.O_WRONLY; Unix.O_SYNC] 0o600 in - finally - (fun () -> - local_dd ifd ofd size blocksize - ) - (fun () -> Unix.close ofd) - - ) - else with_import_vdi __context rpc session_id vdi_dst - (fun ofd -> - remote_dd ifd ofd size blocksize - ) + if local_copy + then with_block_attached_device __context rpc session_id vdi_dst `RW + (fun device_dst -> + Sparse_dd_wrapper.dd ~__context sparse device_src device_dst size ) - (fun () -> Unix.close ifd) + else Sparse_dd_wrapper.dd ~__context sparse device_src remote_uri size ) with | Unix.Unix_error(Unix.EIO, _, _) -> diff -r 2b18e28b785b -r a10ff0cebf46 ocaml/xapi/sm_fs_ops.mli --- a/ocaml/xapi/sm_fs_ops.mli Mon Aug 23 13:16:55 2010 +0100 +++ b/ocaml/xapi/sm_fs_ops.mli Mon Aug 23 13:16:56 2010 +0100 @@ -17,6 +17,7 @@ val with_block_attached_devices : Context.t -> (XMLRPC.xmlrpc -> XMLRPC.xmlrpc) -> API.ref_session -> API.ref_VDI list -> API.vbd_mode -> (string list -> 'a) -> 'a val with_block_attached_device : Context.t -> (XMLRPC.xmlrpc -> XMLRPC.xmlrpc) -> API.ref_session -> API.ref_VDI -> API.vbd_mode -> (string -> 'a) -> 'a +val with_open_block_attached_device : Context.t -> (XMLRPC.xmlrpc -> XMLRPC.xmlrpc) -> API.ref_session -> API.ref_VDI -> API.vbd_mode -> (Unix.file_descr -> 'a) -> 'a val with_new_fs_vdi : Context.t -> name_label:string -> name_description:string -> sR:API.ref_SR -> required_space:int64 -> _type:API.vdi_type -> sm_config:API.string_to_string_map -> (API.ref_VDI -> string -> 'a) -> 'a val with_fs_vdi : Context.t -> API.ref_VDI -> (string -> 'a) -> 'a diff -r 2b18e28b785b -r a10ff0cebf46 ocaml/xapi/sparse_dd_wrapper.ml --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/ocaml/xapi/sparse_dd_wrapper.ml Mon Aug 23 13:16:56 2010 +0100 @@ -0,0 +1,65 @@ +(* + * Copyright (C) 2006-2009 Citrix Systems Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) +(* Allows xapi to drive the sparse_dd program *) + +open Vmopshelpers +open Pervasiveext +open Client +open Printf + +module D=Debug.Debugger(struct let name="xapi" end) +open D + +(** Use the new external sparse_dd program *) +let dd ~__context prezeroed infile outfile size = + let pipe_read, pipe_write = Unix.pipe () in + let to_close = ref [ pipe_read; pipe_write ] in + let close x = if List.mem x !to_close then (Unix.close x; to_close := List.filter (fun y -> y <> x) !to_close) in + finally + (fun () -> + match Forkhelpers.with_logfile_fd "sparse_dd" + (fun log_fd -> + let pid = Forkhelpers.safe_close_and_exec None (Some pipe_write) (Some log_fd) [] + "/opt/xensource/libexec/sparse_dd" + ([ "-machine"; "-src"; infile; "-dest"; outfile; "-size"; Int64.to_string size ] @ + (if prezeroed then [ "-prezeroed" ] else [])) in + close pipe_write; + (* Read Progress: output from the binary *) + let buf = String.create 128 in + let finished = ref false in + while not (!finished) do + let n = Unix.read pipe_read buf 0 (String.length buf) in + if n = 0 then finished := true else debug "sparse_dd: %s" (String.sub buf 0 n); + try + Scanf.sscanf (String.sub buf 0 n) "Progress: %d" + (fun progress -> + TaskHelper.exn_if_cancelling ~__context; + TaskHelper.operate_on_db_task ~__context + (fun self -> Db.Task.set_progress ~__context ~self ~value:(float_of_int progress /. 100.)) + ) + with _ -> () + done; + match Forkhelpers.waitpid pid with + | (_, Unix.WEXITED 0) -> () + | (_, Unix.WEXITED n) -> error "sparse_dd exit: %d" n; failwith "sparse_dd" + ) with + | Forkhelpers.Success _ -> () + | Forkhelpers.Failure (log, exn) -> + error "Failure from sparse_dd: %s" log; + raise exn + ) + (fun () -> + close pipe_read; + close pipe_write) + \ No newline at end of file