[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-API] [PATCH] Work around firewalls killing connections in export code



# HG changeset patch
# User David Scott <dave.scott@xxxxxxxxxxxxx>
# Date 1270839463 -3600
# Node ID 9efb5684022b96d8e0da86b869038dfe2cbd362b
# Parent  6182bc7d23af84bb7e5138c698dc2efda20c35dc
CA-39291: Work around firewalls which kill idle TCP connections by inserting a 
small empty block into an export every 5s or so.

The failure happens whenever a disk has a lot of zeroes in it: the TCP 
connection goes idle while the server is scanning for the next non-zero block. 
Even setting SO_KEEPALIVE on the stunnel sockets and reducing the window probe 
interval down to 30s didn't fix it. We wish to keep the ability to have a basic 
client do an export via HTTP GET so we can't add application-level keepalives 
to the protocol... we must add them to the export itself.

Note this change is backwards compatible. The receiver code expects:
* a common prefix
* a monotonically increasing chunk number
* the first and last blocks to be the same size and included verbatim (even if 
all zeroes)
* blocks of zeroes the same size as the first block represented as gaps in the 
increasing chunk number sequence

Therefore including extra files of length 0 in the stream will be ignored 
provided they
* share the common prefix and chunk numbering scheme
* are not the first or last blocks

Signed-off-by: David Scott <dave.scott@xxxxxxxxxxxxx>

diff -r 6182bc7d23af -r 9efb5684022b ocaml/xapi/stream_vdi.ml
--- a/ocaml/xapi/stream_vdi.ml  Fri Apr 09 10:56:14 2010 +0100
+++ b/ocaml/xapi/stream_vdi.ml  Fri Apr 09 19:57:43 2010 +0100
@@ -86,6 +86,27 @@
   end
 
 
+(** Write a block of checksummed data of length [len] with name [filename] to 
[ofd] *)
+let write_block ~__context filename buffer ofd len = 
+  let hdr = Tar.Header.make filename (Int64.of_int len) in
+
+  try
+       let csum = Sha1sum.sha1sum
+         (fun checksumfd ->
+                  Tar.write_block hdr (fun ofd -> 
Tar.Archive.multicast_n_string buffer 
+                                                                       [ ofd; 
checksumfd ] len) ofd
+         ) in
+       (* Write the checksum as a separate file *)
+       let hdr' = Tar.Header.make (filename ^ checksum_extension) 
(Int64.of_int (String.length csum)) in
+       Tar.write_block hdr' (fun ofd -> ignore(Unix.write ofd csum 0 
(String.length csum))) ofd
+  with
+       Unix.Unix_error (a,b,c) as e ->
+               if TaskHelper.is_cancelling ~__context
+               then raise (Api_errors.Server_error (Api_errors.task_cancelled, 
[]))
+               else 
+                 (if b="write" 
+                  then raise (Api_errors.Server_error 
(Api_errors.client_error, [ExnHelper.string_of_exn e]))
+                  else raise e)
 
 
 (** Stream a set of VDIs split into chunks in a tar format in a defined order. 
Return an
@@ -95,56 +116,53 @@
 
   let progress = new_progress_record __context prefix_vdis in
 
+  (* Remember when we last wrote something so that we can work around 
firewalls which close 'idle' connections *)
+  let last_transmission_time = ref 0. in
+
   let send_one ofd (__context:Context.t) (prefix, vdi_ref, size) = 
     let size = Db.VDI.get_virtual_size ~__context ~self:vdi_ref in
-    let buffer = String.make (Int64.to_int chunk_size) '\000' in
 
     with_open_vdi __context rpc session_id vdi_ref `RO [Unix.O_RDONLY] 0o644
       (fun ifd ->
-        
+
+
         (* NB. It used to be that chunks could be larger than a native int *)
         (* could handle, but this is no longer the case! Ensure all chunks *)
         (* are strictly less than 2^30 bytes *)
         let rec stream_from (chunk_no: int) (offset: int64) = 
           refresh_session ();
-
           let remaining = Int64.sub size offset in
           if remaining > 0L
           then 
             begin
               let this_chunk = (min remaining chunk_size) in
-              let last_chunk = this_chunk=remaining in
+                  let last_chunk = this_chunk = remaining in
               let this_chunk = Int64.to_int this_chunk in
               let filename = Printf.sprintf "%s/%08d" prefix chunk_no in
               let hdr = Tar.Header.make filename (Int64.of_int this_chunk) in
-              Unixext.really_read ifd buffer 0 this_chunk;
-              
-              (* Only write the chunk if it's not all zeros, or if it's the 
first *)
-              if not (Zerocheck.is_all_zeros buffer this_chunk) || chunk_no=0 
|| last_chunk then
-                begin
-                  let csum = Sha1sum.sha1sum
-                    (fun checksumfd ->
-                      try
-                        Tar.write_block hdr (fun ofd -> 
Tar.Archive.multicast_n_string buffer 
-                          [ ofd; checksumfd ] this_chunk) ofd
-                      with
-                          Unix.Unix_error (a,b,c) as e ->
-                            if TaskHelper.is_cancelling ~__context
-                            then raise (Api_errors.Server_error 
(Api_errors.task_cancelled, []))
-                            else 
-                              (if b="write" 
-                              then raise (Api_errors.Server_error 
(Api_errors.client_error, [ExnHelper.string_of_exn e]))
-                                else raise e)
-                    ) in       
+
+                  let now = Unix.gettimeofday () in
+                  let time_since_transmission = now -. !last_transmission_time 
in
                   
-                  (* Write the checksum as a separate file *)
-                  let hdr' = Tar.Header.make (filename ^ checksum_extension) 
(Int64.of_int (String.length csum)) in
-                  Tar.write_block hdr' (fun ofd -> ignore(Unix.write ofd csum 
0 (String.length csum))) ofd
-                end;
+                  (* We always include the first and last blocks *)
+                  let first_or_last = chunk_no = 0 || last_chunk in
 
-              made_progress __context progress (Int64.of_int this_chunk);
-              stream_from (chunk_no + 1) (Int64.add offset chunk_size)
-            end;
+                  if time_since_transmission > 5. && not first_or_last then 
begin
+                        last_transmission_time := now;
+                        write_block ~__context filename "" ofd 0;
+                        (* no progress has been made *)
+                        stream_from (chunk_no + 1) offset
+                  end else begin
+                        let buffer = String.make this_chunk '\000' in
+                        Unixext.really_read ifd buffer 0 this_chunk;
+                        if not (Zerocheck.is_all_zeros buffer this_chunk) || 
first_or_last then begin
+                          last_transmission_time := now;
+                          write_block ~__context filename buffer ofd 
this_chunk;
+                        end;
+                        made_progress __context progress (Int64.of_int 
this_chunk);
+                        stream_from (chunk_no + 1) (Int64.add offset 
chunk_size);
+              end
+                end
         in
         stream_from 0 0L);
     debug "Finished streaming VDI" in
1 file changed, 48 insertions(+), 30 deletions(-)
ocaml/xapi/stream_vdi.ml |   78 ++++++++++++++++++++++++++++------------------


Attachment: xen-api.hg.patch
Description: Text Data

_______________________________________________
xen-api mailing list
xen-api@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/mailman/listinfo/xen-api

 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.