[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [PATCH v2 8/8] tools/ocaml/xenstored: Implement live update for socket connections
From: Edvin Torok <edvint@xxxxxxxxxxxxxxxxxxxxxx> Signed-off-by: Edwin Török <edvin.torok@xxxxxxxxxx> Reviewed-by: Pau Ruiz Safont <pau.safont@xxxxxxxxxx> Reviewed-by: Christian Lindig <christian.lindig@xxxxxxxxxx> --- Changed since V1 * post publicly now that the XSA is out --- tools/ocaml/xenstored/connection.ml | 25 +++++--- tools/ocaml/xenstored/parse_arg.ml | 4 ++ tools/ocaml/xenstored/process.ml | 51 ++++++++++++----- tools/ocaml/xenstored/store.ml | 2 +- tools/ocaml/xenstored/utils.ml | 12 ++++ tools/ocaml/xenstored/xenstored.ml | 88 +++++++++++++++++++++-------- 6 files changed, 138 insertions(+), 44 deletions(-) diff --git a/tools/ocaml/xenstored/connection.ml b/tools/ocaml/xenstored/connection.ml index bd02060cd0..eb23c3af7a 100644 --- a/tools/ocaml/xenstored/connection.ml +++ b/tools/ocaml/xenstored/connection.ml @@ -281,6 +281,9 @@ let get_transaction con tid = let do_input con = Xenbus.Xb.input con.xb let has_input con = Xenbus.Xb.has_in_packet con.xb +let has_partial_input con = match con.xb.Xenbus.Xb.partial_in with + | HaveHdr _ -> true + | NoHdr (n, _) -> n < Xenbus.Partial.header_size () let pop_in con = Xenbus.Xb.get_in_packet con.xb let has_more_input con = Xenbus.Xb.has_more_input con.xb @@ -309,12 +312,13 @@ let is_bad con = match con.dom with None -> false | Some dom -> Domain.is_bad_do Restrictions below can be relaxed once xenstored learns to dump more of its live state in a safe way *) let has_extra_connection_data con = - let has_in = has_input con in + let has_in = has_input con || has_partial_input con in let has_out = has_output con in let has_socket = con.dom = None in let has_nondefault_perms = make_perm con.dom <> con.perm in has_in || has_out - || has_socket (* dom0 sockets not dumped yet *) + (* TODO: what about SIGTERM, should use systemd to store FDS + || has_socket (* dom0 sockets not * dumped yet *) *) || has_nondefault_perms (* set_target not dumped yet *) let has_transaction_data con = @@ -337,16 +341,21 @@ let stats con = Hashtbl.length con.watches, con.stat_nb_ops let dump con chan = - match con.dom with + let id = match con.dom with | Some dom -> let domid = Domain.get_id dom in (* dump domain *) Domain.dump dom chan; - (* dump watches *) - List.iter (fun (path, token) -> - Printf.fprintf chan "watch,%d,%s,%s\n" domid (Utils.hexify path) (Utils.hexify token) - ) (list_watches con); - | None -> () + domid + | None -> + let fd = con |> get_fd |> Utils.FD.to_int in + Printf.fprintf chan "socket,%d\n" fd; + -fd + in + (* dump watches *) + List.iter (fun (path, token) -> + Printf.fprintf chan "watch,%d,%s,%s\n" id (Utils.hexify path) (Utils.hexify token) + ) (list_watches con) let debug con = let domid = get_domstr con in diff --git a/tools/ocaml/xenstored/parse_arg.ml b/tools/ocaml/xenstored/parse_arg.ml index 2c4b5a8528..7c0478e76a 100644 --- a/tools/ocaml/xenstored/parse_arg.ml +++ b/tools/ocaml/xenstored/parse_arg.ml @@ -24,6 +24,7 @@ type config = pidfile: string option; (* old xenstored compatibility *) tracefile: string option; (* old xenstored compatibility *) restart: bool; + live_reload: bool; disable_socket: bool; } @@ -35,6 +36,7 @@ let do_argv = and reraise_top_level = ref false and config_file = ref "" and restart = ref false + and live_reload = ref false and disable_socket = ref false in @@ -52,6 +54,7 @@ let do_argv = ("--pid-file", Arg.Set_string pidfile, ""); (* for compatibility *) ("-T", Arg.Set_string tracefile, ""); (* for compatibility *) ("--restart", Arg.Set restart, "Read database on starting"); + ("--live", Arg.Set live_reload, "Read live dump on startup"); ("--disable-socket", Arg.Unit (fun () -> disable_socket := true), "Disable socket"); ] in let usage_msg = "usage : xenstored [--config-file <filename>] [--no-domain-init] [--help] [--no-fork] [--reraise-top-level] [--restart] [--disable-socket]" in @@ -65,5 +68,6 @@ let do_argv = pidfile = if !pidfile <> "" then Some !pidfile else None; tracefile = if !tracefile <> "" then Some !tracefile else None; restart = !restart; + live_reload = !live_reload; disable_socket = !disable_socket; } diff --git a/tools/ocaml/xenstored/process.ml b/tools/ocaml/xenstored/process.ml index 3174d8ede5..dd50456ad5 100644 --- a/tools/ocaml/xenstored/process.ml +++ b/tools/ocaml/xenstored/process.ml @@ -91,19 +91,24 @@ type t = ; cmdline: string list ; deadline: float ; force: bool + ; result: string list ; pending: bool } let state = ref { binary= Sys.executable_name - ; cmdline= [] + ; cmdline= (Sys.argv |> Array.to_list |> List.tl) ; deadline= 0. ; force= false + ; result = [] ; pending= false } let debug = Printf.eprintf -let args_of_t t = (t.binary, "--restart" :: t.cmdline) +let forced_args = ["--live"; "--restart"] +let args_of_t t = + let filtered = List.filter (fun x -> not @@ List.mem x forced_args) t.cmdline in + (t.binary, forced_args @ filtered) let string_of_t t = let executable, rest = args_of_t t in @@ -117,12 +122,12 @@ let launch_exn t = let validate_exn t = (* --help must be last to check validity of earlier arguments *) - let t = {t with cmdline= t.cmdline @ ["--help"]} in - let cmd = string_of_t t in + let t' = {t with cmdline= t.cmdline @ ["--help"]} in + let cmd = string_of_t t' in debug "Executing %s" cmd ; match Unix.fork () with | 0 -> - ( try launch_exn t with _ -> exit 2 ) + ( try launch_exn t' with _ -> exit 2 ) | pid -> ( match Unix.waitpid [] pid with | _, Unix.WEXITED 0 -> @@ -146,10 +151,14 @@ let parse_live_update args = validate_exn {!state with binary= file} | ["-a"] -> debug "Live update aborted" ; - {!state with pending= false} + {!state with pending= false; result = []} | "-c" :: cmdline -> - validate_exn {!state with cmdline} + validate_exn {!state with cmdline = !state.cmdline @ cmdline} | "-s" :: _ -> + (match !state.pending, !state.result with + | true, _ -> !state (* no change to state, avoid resetting timeout *) + | false, _ :: _ -> !state (* we got a pending result to deliver *) + | false, [] -> let timeout = ref 60 in let force = ref false in Arg.parse_argv ~current:(ref 0) (Array.of_list args) @@ -165,10 +174,16 @@ let parse_live_update args = "live-update -s" ; debug "Live update process queued" ; {!state with deadline = Unix.gettimeofday () +. float !timeout - ; force= !force; pending= true} + ; force= !force; pending= true}) | _ -> invalid_arg ("Unknown arguments: " ^ String.concat "," args)) ; - None + match !state.pending, !state.result with + | true, _ -> Some "BUSY" + | false, (_ :: _ as result) -> + (* xenstore-control has read the result, clear it *) + state := { !state with result = [] }; + Some (String.concat "\n" result) + | false, [] -> None with | Arg.Bad s | Arg.Help s | Invalid_argument s -> Some s @@ -182,17 +197,27 @@ let parse_live_update args = | [] -> true | _ when Unix.gettimeofday () < t.deadline -> false | l -> - info "Live update timeout reached: %d active connections" (List.length l); - List.iter (fun con -> warn "%s prevents live update" (Connection.get_domstr con)) l; + warn "timeout reached: have to wait, migrate or shutdown %d domains:" (List.length l); + let msgs = List.rev_map (fun con -> Printf.sprintf "%s: %d tx, in: %b, out: %b, perm: %s" + (Connection.get_domstr con) + (Connection.number_of_transactions con) + (Connection.has_input con) + (Connection.has_output con) + (Connection.get_perm con |> Perms.Connection.to_string) + ) l in + List.iter (warn "Live-update: %s") msgs; if t.force then begin warn "Live update forced, some domain connections may break!"; true end else begin - warn "Live update aborted, try migrating or shutting down the domains/toolstack"; - state := { t with pending = false }; + warn "Live update aborted (see above for domains preventing it)"; + state := { t with pending = false; result = msgs}; false end end else false + + let completed () = + state := { !state with result = ["OK"] } end (* packets *) diff --git a/tools/ocaml/xenstored/store.ml b/tools/ocaml/xenstored/store.ml index e20767372f..a3be2e6bbe 100644 --- a/tools/ocaml/xenstored/store.ml +++ b/tools/ocaml/xenstored/store.ml @@ -366,7 +366,7 @@ let traversal root_node f = let rec _traversal path node = f path node; let node_path = Path.of_path_and_name path (Symbol.to_string node.Node.name) in - List.iter (_traversal node_path) node.Node.children + List.iter (_traversal node_path) (List.rev node.Node.children) in _traversal [] root_node diff --git a/tools/ocaml/xenstored/utils.ml b/tools/ocaml/xenstored/utils.ml index eb79bf0146..6c1603c276 100644 --- a/tools/ocaml/xenstored/utils.ml +++ b/tools/ocaml/xenstored/utils.ml @@ -115,3 +115,15 @@ let path_validate path connection_path = if len > !Define.path_max then raise Define.Invalid_path; abs_path + +module FD : sig + type t = Unix.file_descr + val of_int: int -> t + val to_int : t -> int +end = struct + type t = Unix.file_descr + (* This is like Obj.magic but just for these types, + and relies on Unix.file_descr = int *) + external to_int : t -> int = "%identity" + external of_int : int -> t = "%identity" +end diff --git a/tools/ocaml/xenstored/xenstored.ml b/tools/ocaml/xenstored/xenstored.ml index 22413271fb..5893af2caa 100644 --- a/tools/ocaml/xenstored/xenstored.ml +++ b/tools/ocaml/xenstored/xenstored.ml @@ -141,9 +141,12 @@ exception Bad_format of string let dump_format_header = "$xenstored-dump-format" -let from_channel_f chan domain_f watch_f store_f = +let from_channel_f chan global_f socket_f domain_f watch_f store_f = let unhexify s = Utils.unhexify s in - let getpath s = Store.Path.of_string (Utils.unhexify s) in + let getpath s = + let u = Utils.unhexify s in + debug "Path: %s" u; + Store.Path.of_string u in let header = input_line chan in if header <> dump_format_header then raise (Bad_format "header"); @@ -155,6 +158,12 @@ let from_channel_f chan domain_f watch_f store_f = let l = String.split ',' line in try match l with + | "global" :: rw :: _ -> + (* there might be more parameters here, + e.g. a RO socket from a previous version: ignore it *) + global_f ~rw + | "socket" :: fd :: [] -> + socket_f ~fd:(int_of_string fd) | "dom" :: domid :: mfn :: port :: []-> domain_f (int_of_string domid) (Nativeint.of_string mfn) @@ -175,12 +184,28 @@ let from_channel_f chan domain_f watch_f store_f = with End_of_file -> quit := true done; - () + info "Completed loading xenstore dump" let from_channel store cons doms chan = (* don't let the permission get on our way, full perm ! *) let op = Store.get_ops store Perms.Connection.full_rights in - + let rwro = ref (None) in + let global_f ~rw = + let get_listen_sock sockfd = + let fd = sockfd |> int_of_string |> Utils.FD.of_int in + Unix.listen fd 1; + Some fd + in + rwro := get_listen_sock rw + in + let socket_f ~fd = + let ufd = Utils.FD.of_int fd in + let is_valid = try (Unix.fstat ufd).Unix.st_kind = Unix.S_SOCK with _ -> false in + if is_valid then + Connections.add_anonymous cons ufd + else + warn "Ignoring invalid socket FD %d" fd + in let domain_f domid mfn port = let ndom = if domid > 0 then @@ -190,28 +215,38 @@ let from_channel store cons doms chan = in Connections.add_domain cons ndom; in - let watch_f domid path token = - let con = Connections.find_domain cons domid in - ignore (Connections.add_watch cons con path token) + let get_con id = + if id < 0 then Connections.find cons (Utils.FD.of_int (-id)) + else Connections.find_domain cons id + in + let watch_f id path token = + ignore (Connections.add_watch cons (get_con id) path token) in let store_f path perms value = op.Store.write path value; op.Store.setperms path perms in - from_channel_f chan domain_f watch_f store_f + from_channel_f chan global_f socket_f domain_f watch_f store_f; + !rwro let from_file store cons doms file = + info "Loading xenstore dump from %s" file; let channel = open_in file in finally (fun () -> from_channel store doms cons channel) (fun () -> close_in channel) -let to_channel store cons chan = +let to_channel store cons rw chan = let hexify s = Utils.hexify s in fprintf chan "%s\n" dump_format_header; + let fdopt = function None -> -1 | Some fd -> + (* systemd and utils.ml sets it close on exec *) + Unix.clear_close_on_exec fd; + Utils.FD.to_int fd in + fprintf chan "global,%d\n" (fdopt rw); - (* dump connections related to domains; domid, mfn, eventchn port, watches *) - Connections.iter_domains cons (fun con -> Connection.dump con chan); + (* dump connections related to domains: domid, mfn, eventchn port/ sockets, and watches *) + Connections.iter cons (fun con -> Connection.dump con chan); (* dump the store *) Store.dump_fct store (fun path node -> @@ -224,9 +259,9 @@ let to_channel store cons chan = () -let to_file store cons file = +let to_file store cons fds file = let channel = open_out_gen [ Open_wronly; Open_creat; Open_trunc; ] 0o600 file in - finally (fun () -> to_channel store cons channel) + finally (fun () -> to_channel store cons fds channel) (fun () -> close_out channel) end @@ -246,13 +281,13 @@ let _ = ); let rw_sock = - if cf.disable_socket then + if cf.disable_socket || cf.live_reload then None else Some (Unix.handle_unix_error Utils.create_unix_socket Define.xs_daemon_socket) in - if cf.daemonize then + if cf.daemonize && not cf.live_reload then Unixext.daemonize () else printf "Xen Storage Daemon, version %d.%d\n%!" @@ -292,10 +327,15 @@ let _ = List.iter (fun path -> Store.write store Perms.Connection.full_rights path "") Store.Path.specials; + let rw_sock = if cf.restart && Sys.file_exists Disk.xs_daemon_database then ( - DB.from_file store domains cons Disk.xs_daemon_database; - Event.bind_dom_exc_virq eventchn - ) else ( + let rwro = DB.from_file store domains cons Disk.xs_daemon_database in + info "Live reload: database loaded"; + Event.bind_dom_exc_virq eventchn; + Process.LiveUpdate.completed (); + rwro + ) else ( + info "No live reload: regular startup"; if !Disk.enable then ( info "reading store from disk"; Disk.read store @@ -309,10 +349,13 @@ let _ = Connections.add_domain cons (Domains.create0 domains); Event.bind_dom_exc_virq eventchn ); - ); + rw_sock + ) in (* required for xenstore-control to detect availability of live-update *) - Store.mkdir store Perms.Connection.full_rights (Store.Path.of_string "/tool"); + let toolpath = Store.Path.of_string "/tool" in + if not (Store.path_exists store toolpath) then + Store.mkdir store Perms.Connection.full_rights toolpath; Store.write store Perms.Connection.full_rights (Store.Path.of_string "/tool/xenstored") Sys.executable_name; @@ -324,7 +367,7 @@ let _ = Sys.set_signal Sys.sigpipe Sys.Signal_ignore; if cf.activate_access_log then begin - let post_rotate () = DB.to_file store cons Disk.xs_daemon_database in + let post_rotate () = DB.to_file store cons (None) Disk.xs_daemon_database in Logging.init_access_log post_rotate end; @@ -367,6 +410,7 @@ let _ = let ring_scan_checker dom = (* no need to scan domains already marked as for processing *) if not (Domain.get_io_credit dom > 0) then + debug "Looking up domid %d" (Domain.get_id dom); let con = Connections.find_domain cons (Domain.get_id dom) in if not (Connection.has_more_work con) then ( Process.do_output store cons domains con; @@ -496,7 +540,7 @@ let _ = live_update := Process.LiveUpdate.should_run cons; if !live_update || !quit then begin (* don't initiate live update if saving state fails *) - DB.to_file store cons Disk.xs_daemon_database; + DB.to_file store cons (rw_sock) Disk.xs_daemon_database; quit := true; end with exc -> -- 2.29.2
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |