diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-02-04 21:31:55 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-02-04 21:31:55 +0000 |
commit | 17d99556a4857032711b78499d553978a6440f0b (patch) | |
tree | 9ac5f53793c053d6e4cdde13703fbc97527ff19a | |
parent | c482d4af6e0c1bebbda15d6ef2651d827c8af498 (diff) | |
parent | ce8891a58b10c5d8a8bfedc3b0a85d05b5a298ba (diff) | |
download | rabbitmq-server-17d99556a4857032711b78499d553978a6440f0b.tar.gz |
merge default into bug22309
-rw-r--r-- | src/rabbit_control.erl | 57 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 29 |
2 files changed, 78 insertions, 8 deletions
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 2fe3f33e..2c2c67a6 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -81,6 +81,9 @@ start() -> {error, Reason} -> error("~p", [Reason]), halt(2); + {badrpc, {'EXIT', Reason}} -> + error("~p", [Reason]), + halt(2); {badrpc, Reason} -> error("unable to connect to node ~w: ~w", [Node, Reason]), print_badrpc_diagnostics(Node), @@ -159,6 +162,8 @@ Available commands: list_bindings [-p <VHostPath>] list_connections [<ConnectionInfoItem> ...] + close_connection <ConnectionPid> <ExplanationString> + Quiet output mode is selected with the \"-q\" flag. Informational messages are suppressed when quiet mode is in effect. @@ -301,6 +306,11 @@ action(list_connections, Node, Args, Inform) -> [ArgAtoms]), ArgAtoms); +action(close_connection, Node, [PidStr, Explanation], Inform) -> + Inform("Closing connection ~s", [PidStr]), + rpc_call(Node, rabbit_reader, shutdown, + [string_to_pid(PidStr), Explanation]); + action(Command, Node, Args, Inform) -> {VHost, RemainingArgs} = parse_vhost_flag(Args), action(Command, Node, VHost, RemainingArgs, Inform). @@ -423,3 +433,50 @@ pid_to_string(Pid) -> = term_to_binary(Pid), Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])). + +string_to_pid(Str) -> + ErrorFun = fun () -> throw({error, {invalid_pid_syntax, Str}}) end, + %% TODO: simplify this code by using the 're' module, once we drop + %% support for R11 + %% + %% 1) sanity check + %% The \ before the trailing $ is only there to keep emacs + %% font-lock from getting confused. + case regexp:first_match(Str, "^<.*\.[0-9]+\.[0-9]+>\$") of + {match, _, _} -> + %% 2) strip <> + Str1 = string:substr(Str, 2, string:len(Str) - 2), + %% 3) extract three constituent parts, taking care to + %% handle dots in the node part (hence the reverse and concat) + [SerStr, IdStr | Rest] = lists:reverse(string:tokens(Str1, ".")), + NodeStr = lists:concat(lists:reverse(Rest)), + %% 4) construct a triple term from the three parts + TripleStr = lists:flatten(io_lib:format("{~s,~s,~s}.", + [NodeStr, IdStr, SerStr])), + %% 5) parse the triple + Tokens = case erl_scan:string(TripleStr) of + {ok, Tokens1, _} -> Tokens1; + {error, _, _} -> ErrorFun() + end, + Term = case erl_parse:parse_term(Tokens) of + {ok, Term1} -> Term1; + {error, _} -> ErrorFun() + end, + {Node, Id, Ser} = + case Term of + {Node1, Id1, Ser1} when is_atom(Node1) andalso + is_integer(Id1) andalso + is_integer(Ser1) -> + Term; + _ -> + ErrorFun() + end, + %% 6) turn the triple into a pid - see pid_to_string + <<131,NodeEnc/binary>> = term_to_binary(Node), + binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>); + nomatch -> + ErrorFun(); + Error -> + %% invalid regexp - shouldn't happen + throw(Error) + end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f5bdb985..d0d8860f 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -33,7 +33,7 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/0, info_keys/0, info/1, info/2]). +-export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -132,6 +132,7 @@ -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (pid()) -> [info()]). -spec(info/2 :: (pid(), [info_key()]) -> [info()]). +-spec(shutdown/2 :: (pid(), string()) -> 'ok'). -endif. @@ -140,6 +141,9 @@ start_link() -> {ok, proc_lib:spawn_link(?MODULE, init, [self()])}. +shutdown(Pid, Explanation) -> + gen_server:call(Pid, {shutdown, Explanation}, infinity). + init(Parent) -> Deb = sys:debug_options([]), receive @@ -267,13 +271,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> {inet_async, Sock, Ref, {error, Reason}} -> throw({inet_error, Reason}); {'EXIT', Parent, Reason} -> - if State#v1.connection_state =:= running -> - send_exception(State, 0, - rabbit_misc:amqp_error(connection_forced, - "broker forced connection closure with reason '~w'", - [Reason], none)); - true -> ok - end, + terminate(io_lib:format("broker forced connection closure " + "with reason '~w'", [Reason]), State), %% this is what we are expected to do according to %% http://www.erlang.org/doc/man/sys.html %% @@ -301,6 +300,13 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end; timeout -> throw({timeout, State#v1.connection_state}); + {'$gen_call', From, {shutdown, Explanation}} -> + {ForceTermination, NewState} = terminate(Explanation, State), + gen_server:reply(From, ok), + case ForceTermination of + force -> ok; + normal -> mainloop(Parent, Deb, NewState) + end; {'$gen_call', From, info} -> gen_server:reply(From, infos(?INFO_KEYS, State)), mainloop(Parent, Deb, State); @@ -323,6 +329,13 @@ switch_callback(OldState, NewCallback, Length) -> OldState#v1{callback = NewCallback, recv_ref = Ref}. +terminate(Explanation, State = #v1{connection_state = running}) -> + {normal, send_exception(State, 0, + rabbit_misc:amqp_error( + connection_forced, Explanation, [], none))}; +terminate(_Explanation, State) -> + {force, State}. + close_connection(State = #v1{connection = #connection{ timeout_sec = TimeoutSec}}) -> %% We terminate the connection after the specified interval, but |