# HG changeset patch # User David Scott # Date 1270839463 -3600 # Node ID 9efb5684022b96d8e0da86b869038dfe2cbd362b # Parent 6182bc7d23af84bb7e5138c698dc2efda20c35dc CA-39291: Work around firewalls which kill idle TCP connections by inserting a small empty block into an export every 5s or so. The failure happens whenever a disk has a lot of zeroes in it: the TCP connection goes idle while the server is scanning for the next non-zero block. Even setting SO_KEEPALIVE on the stunnel sockets and reducing the window probe interval down to 30s didn't fix it. We wish to keep the ability to have a basic client do an export via HTTP GET so we can't add application-level keepalives to the protocol... we must add them to the export itself. Note this change is backwards compatible. The receiver code expects: * a common prefix * a monotonically increasing chunk number * the first and last blocks to be the same size and included verbatim (even if all zeroes) * blocks of zeroes the same size as the first block represented as gaps in the increasing chunk number sequence Therefore including extra files of length 0 in the stream will be ignored provided they * share the common prefix and chunk numbering scheme * are not the first or last blocks Signed-off-by: David Scott diff -r 6182bc7d23af -r 9efb5684022b ocaml/xapi/stream_vdi.ml --- a/ocaml/xapi/stream_vdi.ml Fri Apr 09 10:56:14 2010 +0100 +++ b/ocaml/xapi/stream_vdi.ml Fri Apr 09 19:57:43 2010 +0100 @@ -86,6 +86,27 @@ end +(** Write a block of checksummed data of length [len] with name [filename] to [ofd] *) +let write_block ~__context filename buffer ofd len = + let hdr = Tar.Header.make filename (Int64.of_int len) in + + try + let csum = Sha1sum.sha1sum + (fun checksumfd -> + Tar.write_block hdr (fun ofd -> Tar.Archive.multicast_n_string buffer + [ ofd; checksumfd ] len) ofd + ) in + (* Write the checksum as a separate file *) + let hdr' = Tar.Header.make (filename ^ checksum_extension) (Int64.of_int (String.length csum)) in + Tar.write_block hdr' (fun ofd -> ignore(Unix.write ofd csum 0 (String.length csum))) ofd + with + Unix.Unix_error (a,b,c) as e -> + if TaskHelper.is_cancelling ~__context + then raise (Api_errors.Server_error (Api_errors.task_cancelled, [])) + else + (if b="write" + then raise (Api_errors.Server_error (Api_errors.client_error, [ExnHelper.string_of_exn e])) + else raise e) (** Stream a set of VDIs split into chunks in a tar format in a defined order. Return an @@ -95,56 +116,53 @@ let progress = new_progress_record __context prefix_vdis in + (* Remember when we last wrote something so that we can work around firewalls which close 'idle' connections *) + let last_transmission_time = ref 0. in + let send_one ofd (__context:Context.t) (prefix, vdi_ref, size) = let size = Db.VDI.get_virtual_size ~__context ~self:vdi_ref in - let buffer = String.make (Int64.to_int chunk_size) '\000' in with_open_vdi __context rpc session_id vdi_ref `RO [Unix.O_RDONLY] 0o644 (fun ifd -> - + + (* NB. It used to be that chunks could be larger than a native int *) (* could handle, but this is no longer the case! Ensure all chunks *) (* are strictly less than 2^30 bytes *) let rec stream_from (chunk_no: int) (offset: int64) = refresh_session (); - let remaining = Int64.sub size offset in if remaining > 0L then begin let this_chunk = (min remaining chunk_size) in - let last_chunk = this_chunk=remaining in + let last_chunk = this_chunk = remaining in let this_chunk = Int64.to_int this_chunk in let filename = Printf.sprintf "%s/%08d" prefix chunk_no in let hdr = Tar.Header.make filename (Int64.of_int this_chunk) in - Unixext.really_read ifd buffer 0 this_chunk; - - (* Only write the chunk if it's not all zeros, or if it's the first *) - if not (Zerocheck.is_all_zeros buffer this_chunk) || chunk_no=0 || last_chunk then - begin - let csum = Sha1sum.sha1sum - (fun checksumfd -> - try - Tar.write_block hdr (fun ofd -> Tar.Archive.multicast_n_string buffer - [ ofd; checksumfd ] this_chunk) ofd - with - Unix.Unix_error (a,b,c) as e -> - if TaskHelper.is_cancelling ~__context - then raise (Api_errors.Server_error (Api_errors.task_cancelled, [])) - else - (if b="write" - then raise (Api_errors.Server_error (Api_errors.client_error, [ExnHelper.string_of_exn e])) - else raise e) - ) in + + let now = Unix.gettimeofday () in + let time_since_transmission = now -. !last_transmission_time in - (* Write the checksum as a separate file *) - let hdr' = Tar.Header.make (filename ^ checksum_extension) (Int64.of_int (String.length csum)) in - Tar.write_block hdr' (fun ofd -> ignore(Unix.write ofd csum 0 (String.length csum))) ofd - end; + (* We always include the first and last blocks *) + let first_or_last = chunk_no = 0 || last_chunk in - made_progress __context progress (Int64.of_int this_chunk); - stream_from (chunk_no + 1) (Int64.add offset chunk_size) - end; + if time_since_transmission > 5. && not first_or_last then begin + last_transmission_time := now; + write_block ~__context filename "" ofd 0; + (* no progress has been made *) + stream_from (chunk_no + 1) offset + end else begin + let buffer = String.make this_chunk '\000' in + Unixext.really_read ifd buffer 0 this_chunk; + if not (Zerocheck.is_all_zeros buffer this_chunk) || first_or_last then begin + last_transmission_time := now; + write_block ~__context filename buffer ofd this_chunk; + end; + made_progress __context progress (Int64.of_int this_chunk); + stream_from (chunk_no + 1) (Int64.add offset chunk_size); + end + end in stream_from 0 0L); debug "Finished streaming VDI" in