# HG changeset patch
# User David Scott <dave.scott@xxxxxxxxxxxxx>
# Date 1282565816 -3600
# Node ID a10ff0cebf46034aa72dc4eb07b903416b8fe2f0
# Parent 2b18e28b785b150c42ff1ffe52eaae906365bc42
CA-43021: hook in 'sparse_dd' for improved VM.copy performance
On local LVHD, VM.copies of freshly installed guests are much quicker:
Guest Previous VM.copy time New VM.copy time Speedup
----------------------------------------------------------------
Debian Lenny 2:11 1:18 40%
Windows 7 14:18 7:57 44%
Signed-off-by: David Scott <dave.scott@xxxxxxxxxxxxx>
diff -r 2b18e28b785b -r a10ff0cebf46 ocaml/xapi/OMakefile
--- a/ocaml/xapi/OMakefile Mon Aug 23 13:16:55 2010 +0100
+++ b/ocaml/xapi/OMakefile Mon Aug 23 13:16:56 2010 +0100
@@ -135,6 +135,7 @@
config_file_io \
slave_backup \
sm_fs_ops \
+ sparse_dd_wrapper \
vmopshelpers \
vm_config \
vmops \
diff -r 2b18e28b785b -r a10ff0cebf46 ocaml/xapi/import_raw_vdi.ml
--- a/ocaml/xapi/import_raw_vdi.ml Mon Aug 23 13:16:55 2010 +0100
+++ b/ocaml/xapi/import_raw_vdi.ml Mon Aug 23 13:16:56 2010 +0100
@@ -61,11 +61,8 @@
Server_helpers.exec_with_new_task "VDI.import"
(fun __context ->
- Sm_fs_ops.with_block_attached_device __context rpc session_id
vdi `RW
- (fun device ->
- let fd = Unix.openfile device [ Unix.O_WRONLY ] 0 in
- finally
- (fun () ->
+ Sm_fs_ops.with_open_block_attached_device __context rpc
session_id vdi `RW
+ (fun fd ->
try
if chunked
then receive_chunks s fd
@@ -73,8 +70,6 @@
Unixext.fsync fd;
with Unix.Unix_error(Unix.EIO, _, _) ->
raise (Api_errors.Server_error
(Api_errors.vdi_io_error, ["Device I/O errors"]))
- )
- (fun () -> Unix.close fd)
)
);
TaskHelper.complete ~__context [];
diff -r 2b18e28b785b -r a10ff0cebf46 ocaml/xapi/sm_fs_ops.ml
--- a/ocaml/xapi/sm_fs_ops.ml Mon Aug 23 13:16:55 2010 +0100
+++ b/ocaml/xapi/sm_fs_ops.ml Mon Aug 23 13:16:56 2010 +0100
@@ -34,6 +34,19 @@
let vbd = List.hd vbds in
f ("/dev/" ^ (Db.VBD.get_device ~__context ~self:vbd)))
+(** Block-attach a VDI to dom0, open the device and pass the file descriptor
to [f] *)
+let with_open_block_attached_device __context rpc session_id vdi mode f =
+ with_block_attached_device __context rpc session_id vdi mode
+ (fun path ->
+ let mode' = match mode with
+ | `RO -> [ Unix.O_RDONLY ]
+ | `RW -> [ Unix.O_RDWR ] in
+ let fd = Unix.openfile path mode' 0 in
+ finally
+ (fun () -> f fd)
+ (fun () -> Unix.close fd)
+ )
+
(** Execute a function with a list of device paths after attaching a bunch of
VDIs to dom0 *)
let with_block_attached_devices (__context: Context.t) rpc (session_id:
API.ref_session) (vdis: API.ref_VDI list) mode f =
let rec loop acc = function
@@ -41,35 +54,13 @@
| vdi :: vdis -> with_block_attached_device __context rpc session_id vdi
mode (fun path -> loop (path :: acc) vdis) in
loop [] vdis
-(** Open an import_raw_vdi HTTP connection and run [f] with the socket *)
-let with_import_vdi __context rpc session_id vdi f =
- let subtask_of = Context.get_task_id __context in
- Server_helpers.exec_with_new_task "VDI.import"
- (fun __context ->
- (* Find a suitable host for the SR containing the VDI *)
- let sr = Db.VDI.get_SR ~__context ~self:vdi in
- let host = Importexport.find_host_for_sr ~__context sr in
- let address = Db.Host.get_address ~__context ~self:host in
- let importtask = Client.Task.create rpc session_id "VDI.import"
"" in
-
- let headers = Xapi_http.http_request
- ~cookie:(["session_id", Ref.string_of
session_id;
- "task_id", Ref.string_of importtask;
- "vdi", Ref.string_of vdi;
- "chunked", "true"])
- Http.Put address Constants.import_raw_vdi_uri in
- let writer _ _ sock = f sock; true in
- if not (Xmlrpcclient.do_secure_http_rpc
~use_stunnel_cache:false
- ~task_id:(Ref.string_of (Context.get_task_id
__context))
- ~host:address ~port:Xapi_globs.default_ssl_port
~headers ~body:"" writer)
- then failwith "with_import_vdi";
- debug "Waiting for import task (%s) to complete"
(Ref.string_of importtask);
- (* wait for the task to complete before cleaning
anything up *)
- while Client.Task.get_status rpc session_id importtask
= `pending do
- Thread.delay 1.;
- done;
- Client.Task.destroy rpc session_id importtask;
- )
+(** Return a URL suitable for passing to the sparse_dd process *)
+let import_vdi_url ~__context rpc session_id vdi =
+ (* Find a suitable host for the SR containing the VDI *)
+ let sr = Db.VDI.get_SR ~__context ~self:vdi in
+ let host = Importexport.find_host_for_sr ~__context sr in
+ let address = Db.Host.get_address ~__context ~self:host in
+ Printf.sprintf "https://%s%s?vdi=%s&session_id=%s" address
Constants.import_raw_vdi_uri (Ref.string_of vdi) (Ref.string_of session_id)
(** Catch those smint exceptions and convert into meaningful internal errors *)
let with_api_errors f x =
@@ -167,91 +158,6 @@
raise e
)
-exception Cancelled
-exception NonZero
-
-(** The copying routine can operate on anything which looks like a
file-descriptor/Stream *)
-module type Stream = sig
- type stream
- val write: stream -> int64 -> string -> int -> int -> unit
-end
-
-(** Writes directly to a file *)
-module FileStream = struct
- type stream = Unix.file_descr
- let write stream stream_offset buf off len =
- let newoff = Unix.LargeFile.lseek stream stream_offset
Unix.SEEK_SET in
- (* Printf.printf "Unix.write buf len %d; offset %d; len %d\n"
(String.length buf) offset len; *)
- let n = Unix.write stream buf off len in
- if n < len then failwith "Short write"
-end
-
-(** Marshals data across the network in chunks *)
-module NetworkStream = struct
- open Sparse_encoding
- type stream = Unix.file_descr
- let write stream stream_offset buf off len =
- let copy = String.create len in
- String.blit buf off copy 0 len;
- let x = { Chunk.start = stream_offset; data = copy } in
- Chunk.marshal stream x
-end
-
-module DD(Output: Stream) = struct
-
-(* dd with sparseness check *)
-let sparse_dd refresh_session ~__context sparse ifd stream size bs : unit =
- let round v = int_of_float (v *. 50.0) in
- let update =
- let oldvalue = ref (-1.0) in
- fun value ->
- if round value <> round !oldvalue then begin
- TaskHelper.exn_if_cancelling ~__context;
- TaskHelper.operate_on_db_task ~__context
- (fun self ->
- Db.Task.set_progress ~__context ~self ~value);
- end;
- oldvalue := value
- in
-
- let buf = String.create bs in
-
- let allzero s n =
- try
- for i=0 to n-1 do
- if s.[i] <> '\000' then raise NonZero
- done;
- true
- with NonZero -> false
- in
-
- let rec do_block offset =
- refresh_session ();
-
- update ((Int64.to_float offset) /. (Int64.to_float size));
- let remaining = Int64.sub size offset in
- if remaining=0L
- then () (* EOF *)
- else
- begin
- let this_chunk = Int64.to_int (min remaining (Int64.of_int bs)) in
- Unixext.really_read ifd buf 0 this_chunk;
-
- if not sparse || (not (allzero buf this_chunk))
- then Output.write stream offset buf 0 this_chunk;
-
- do_block (Int64.add offset (Int64.of_int this_chunk))
- end
- in
- do_block 0L;
- Output.write stream 0L "" 0 0; (* end of stream is a zero-sized chunk *)
- update 1.0
-
-end
-
-module LocalDD = DD(FileStream)
-module RemoteDD = DD(NetworkStream)
-
(* SCTX-286: thin provisioning is thrown away over VDI.copy,
VM.import(VM.export).
Return true if the newly created vdi must have zeroes written into it;
default to false
under the assumption that "proper" storage devices (ie not our legacy LVM
stuff) always
@@ -286,47 +192,30 @@
let copy_vdi ~__context vdi_src vdi_dst =
TaskHelper.set_cancellable ~__context;
Helpers.call_api_functions ~__context (fun rpc session_id ->
- let refresh_session = Xapi_session.consider_touching_session rpc session_id
in
-
(* Use the sparse copy unless we must write zeroes into the new VDI *)
let sparse = not (must_write_zeroes_into_new_vdi ~__context vdi_dst) in
(* Copy locally unless this host can't see the destination SR *)
- let sr_dst = Db.VDI.get_SR ~__context ~self:vdi_dst in
- let local_copy = Importexport.check_sr_availability ~__context sr_dst in
+ let can_local_copy = Importexport.check_sr_availability ~__context
(Db.VDI.get_SR ~__context ~self:vdi_dst) in
let size = Db.VDI.get_virtual_size ~__context ~self:vdi_src in
- let blocksize = 1024*1024 in
- debug "Sm_fs_ops.copy_vdi: %s-copying %Ld in blocks of %d%s preserving
sparseness" (if local_copy then "locally" else "remotely") size blocksize (if
sparse then "" else " NOT");
+ let local_copy = can_local_copy && not (Xapi_fist.force_remote_vdi_copy ())
in
- let local_dd = LocalDD.sparse_dd refresh_session ~__context sparse in
- let remote_dd = RemoteDD.sparse_dd refresh_session ~__context sparse in
+ debug "Sm_fs_ops.copy_vdi: %s-copying %Ld%s preserving sparseness" (if
local_copy then "locally" else "remotely") size (if sparse then "" else " NOT");
try
+ let remote_uri = import_vdi_url ~__context rpc session_id vdi_dst in
+ debug "remote_uri = %s" remote_uri;
with_block_attached_device __context rpc session_id vdi_src `RO
(fun device_src ->
- let ifd=Unix.openfile device_src [Unix.O_RDONLY] 0o600 in
- finally
- (fun () ->
- if local_copy && not (Xapi_fist.force_remote_vdi_copy
())
- then with_block_attached_device __context rpc
session_id vdi_dst `RW
- (fun device_dst ->
- let ofd=Unix.openfile device_dst
[Unix.O_WRONLY; Unix.O_SYNC] 0o600 in
- finally
- (fun () ->
- local_dd ifd ofd size blocksize
- )
- (fun () -> Unix.close ofd)
-
- )
- else with_import_vdi __context rpc session_id vdi_dst
- (fun ofd ->
- remote_dd ifd ofd size blocksize
- )
+ if local_copy
+ then with_block_attached_device __context rpc session_id
vdi_dst `RW
+ (fun device_dst ->
+ Sparse_dd_wrapper.dd ~__context sparse device_src
device_dst size
)
- (fun () -> Unix.close ifd)
+ else Sparse_dd_wrapper.dd ~__context sparse device_src
remote_uri size
)
with
| Unix.Unix_error(Unix.EIO, _, _) ->
diff -r 2b18e28b785b -r a10ff0cebf46 ocaml/xapi/sm_fs_ops.mli
--- a/ocaml/xapi/sm_fs_ops.mli Mon Aug 23 13:16:55 2010 +0100
+++ b/ocaml/xapi/sm_fs_ops.mli Mon Aug 23 13:16:56 2010 +0100
@@ -17,6 +17,7 @@
val with_block_attached_devices : Context.t -> (XMLRPC.xmlrpc ->
XMLRPC.xmlrpc) -> API.ref_session -> API.ref_VDI list -> API.vbd_mode ->
(string list -> 'a) -> 'a
val with_block_attached_device : Context.t -> (XMLRPC.xmlrpc ->
XMLRPC.xmlrpc) -> API.ref_session -> API.ref_VDI -> API.vbd_mode -> (string ->
'a) -> 'a
+val with_open_block_attached_device : Context.t -> (XMLRPC.xmlrpc ->
XMLRPC.xmlrpc) -> API.ref_session -> API.ref_VDI -> API.vbd_mode ->
(Unix.file_descr -> 'a) -> 'a
val with_new_fs_vdi : Context.t -> name_label:string ->
name_description:string -> sR:API.ref_SR -> required_space:int64 ->
_type:API.vdi_type ->
sm_config:API.string_to_string_map -> (API.ref_VDI -> string -> 'a) -> 'a
val with_fs_vdi : Context.t -> API.ref_VDI -> (string -> 'a) -> 'a
diff -r 2b18e28b785b -r a10ff0cebf46 ocaml/xapi/sparse_dd_wrapper.ml
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/ocaml/xapi/sparse_dd_wrapper.ml Mon Aug 23 13:16:56 2010 +0100
@@ -0,0 +1,65 @@
+(*
+ * Copyright (C) 2006-2009 Citrix Systems Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; version 2.1 only. with the special
+ * exception on linking described in file LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *)
+(* Allows xapi to drive the sparse_dd program *)
+
+open Vmopshelpers
+open Pervasiveext
+open Client
+open Printf
+
+module D=Debug.Debugger(struct let name="xapi" end)
+open D
+
+(** Use the new external sparse_dd program *)
+let dd ~__context prezeroed infile outfile size =
+ let pipe_read, pipe_write = Unix.pipe () in
+ let to_close = ref [ pipe_read; pipe_write ] in
+ let close x = if List.mem x !to_close then (Unix.close x; to_close :=
List.filter (fun y -> y <> x) !to_close) in
+ finally
+ (fun () ->
+ match Forkhelpers.with_logfile_fd "sparse_dd"
+ (fun log_fd ->
+ let pid = Forkhelpers.safe_close_and_exec None
(Some pipe_write) (Some log_fd) []
+ "/opt/xensource/libexec/sparse_dd"
+ ([ "-machine"; "-src"; infile; "-dest";
outfile; "-size"; Int64.to_string size ] @
+ (if prezeroed then [ "-prezeroed" ] else [])) in
+ close pipe_write;
+ (* Read Progress: output from the binary *)
+ let buf = String.create 128 in
+ let finished = ref false in
+ while not (!finished) do
+ let n = Unix.read pipe_read buf 0
(String.length buf) in
+ if n = 0 then finished := true else
debug "sparse_dd: %s" (String.sub buf 0 n);
+ try
+ Scanf.sscanf (String.sub buf 0
n) "Progress: %d"
+ (fun progress ->
+
TaskHelper.exn_if_cancelling ~__context;
+
TaskHelper.operate_on_db_task ~__context
+ (fun self ->
Db.Task.set_progress ~__context ~self ~value:(float_of_int progress /. 100.))
+ )
+ with _ -> ()
+ done;
+ match Forkhelpers.waitpid pid with
+ | (_, Unix.WEXITED 0) -> ()
+ | (_, Unix.WEXITED n) -> error "sparse_dd exit:
%d" n; failwith "sparse_dd"
+ ) with
+ | Forkhelpers.Success _ -> ()
+ | Forkhelpers.Failure (log, exn) ->
+ error "Failure from sparse_dd: %s" log;
+ raise exn
+ )
+ (fun () ->
+ close pipe_read;
+ close pipe_write)
+
\ No newline at end of file
ocaml/xapi/OMakefile | 1 +
ocaml/xapi/import_raw_vdi.ml | 9 +-
ocaml/xapi/sm_fs_ops.ml | 171 +++++++--------------------------------
ocaml/xapi/sm_fs_ops.mli | 1 +
ocaml/xapi/sparse_dd_wrapper.ml | 65 +++++++++++++++
5 files changed, 99 insertions(+), 148 deletions(-)
sm_fs_ops.patch
Description: Text Data
_______________________________________________
xen-api mailing list
xen-api@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/mailman/listinfo/xen-api
|