summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-02-04 21:31:55 +0000
committerMatthias Radestock <matthias@lshift.net>2010-02-04 21:31:55 +0000
commit17d99556a4857032711b78499d553978a6440f0b (patch)
tree9ac5f53793c053d6e4cdde13703fbc97527ff19a
parentc482d4af6e0c1bebbda15d6ef2651d827c8af498 (diff)
parentce8891a58b10c5d8a8bfedc3b0a85d05b5a298ba (diff)
downloadrabbitmq-server-17d99556a4857032711b78499d553978a6440f0b.tar.gz
merge default into bug22309
-rw-r--r--src/rabbit_control.erl57
-rw-r--r--src/rabbit_reader.erl29
2 files changed, 78 insertions, 8 deletions
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 2fe3f33e..2c2c67a6 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -81,6 +81,9 @@ start() ->
{error, Reason} ->
error("~p", [Reason]),
halt(2);
+ {badrpc, {'EXIT', Reason}} ->
+ error("~p", [Reason]),
+ halt(2);
{badrpc, Reason} ->
error("unable to connect to node ~w: ~w", [Node, Reason]),
print_badrpc_diagnostics(Node),
@@ -159,6 +162,8 @@ Available commands:
list_bindings [-p <VHostPath>]
list_connections [<ConnectionInfoItem> ...]
+ close_connection <ConnectionPid> <ExplanationString>
+
Quiet output mode is selected with the \"-q\" flag. Informational
messages are suppressed when quiet mode is in effect.
@@ -301,6 +306,11 @@ action(list_connections, Node, Args, Inform) ->
[ArgAtoms]),
ArgAtoms);
+action(close_connection, Node, [PidStr, Explanation], Inform) ->
+ Inform("Closing connection ~s", [PidStr]),
+ rpc_call(Node, rabbit_reader, shutdown,
+ [string_to_pid(PidStr), Explanation]);
+
action(Command, Node, Args, Inform) ->
{VHost, RemainingArgs} = parse_vhost_flag(Args),
action(Command, Node, VHost, RemainingArgs, Inform).
@@ -423,3 +433,50 @@ pid_to_string(Pid) ->
= term_to_binary(Pid),
Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])).
+
+string_to_pid(Str) ->
+ ErrorFun = fun () -> throw({error, {invalid_pid_syntax, Str}}) end,
+ %% TODO: simplify this code by using the 're' module, once we drop
+ %% support for R11
+ %%
+ %% 1) sanity check
+ %% The \ before the trailing $ is only there to keep emacs
+ %% font-lock from getting confused.
+ case regexp:first_match(Str, "^<.*\.[0-9]+\.[0-9]+>\$") of
+ {match, _, _} ->
+ %% 2) strip <>
+ Str1 = string:substr(Str, 2, string:len(Str) - 2),
+ %% 3) extract three constituent parts, taking care to
+ %% handle dots in the node part (hence the reverse and concat)
+ [SerStr, IdStr | Rest] = lists:reverse(string:tokens(Str1, ".")),
+ NodeStr = lists:concat(lists:reverse(Rest)),
+ %% 4) construct a triple term from the three parts
+ TripleStr = lists:flatten(io_lib:format("{~s,~s,~s}.",
+ [NodeStr, IdStr, SerStr])),
+ %% 5) parse the triple
+ Tokens = case erl_scan:string(TripleStr) of
+ {ok, Tokens1, _} -> Tokens1;
+ {error, _, _} -> ErrorFun()
+ end,
+ Term = case erl_parse:parse_term(Tokens) of
+ {ok, Term1} -> Term1;
+ {error, _} -> ErrorFun()
+ end,
+ {Node, Id, Ser} =
+ case Term of
+ {Node1, Id1, Ser1} when is_atom(Node1) andalso
+ is_integer(Id1) andalso
+ is_integer(Ser1) ->
+ Term;
+ _ ->
+ ErrorFun()
+ end,
+ %% 6) turn the triple into a pid - see pid_to_string
+ <<131,NodeEnc/binary>> = term_to_binary(Node),
+ binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>);
+ nomatch ->
+ ErrorFun();
+ Error ->
+ %% invalid regexp - shouldn't happen
+ throw(Error)
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f5bdb985..d0d8860f 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -33,7 +33,7 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/0, info_keys/0, info/1, info/2]).
+-export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -132,6 +132,7 @@
-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (pid()) -> [info()]).
-spec(info/2 :: (pid(), [info_key()]) -> [info()]).
+-spec(shutdown/2 :: (pid(), string()) -> 'ok').
-endif.
@@ -140,6 +141,9 @@
start_link() ->
{ok, proc_lib:spawn_link(?MODULE, init, [self()])}.
+shutdown(Pid, Explanation) ->
+ gen_server:call(Pid, {shutdown, Explanation}, infinity).
+
init(Parent) ->
Deb = sys:debug_options([]),
receive
@@ -267,13 +271,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
{inet_async, Sock, Ref, {error, Reason}} ->
throw({inet_error, Reason});
{'EXIT', Parent, Reason} ->
- if State#v1.connection_state =:= running ->
- send_exception(State, 0,
- rabbit_misc:amqp_error(connection_forced,
- "broker forced connection closure with reason '~w'",
- [Reason], none));
- true -> ok
- end,
+ terminate(io_lib:format("broker forced connection closure "
+ "with reason '~w'", [Reason]), State),
%% this is what we are expected to do according to
%% http://www.erlang.org/doc/man/sys.html
%%
@@ -301,6 +300,13 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end;
timeout ->
throw({timeout, State#v1.connection_state});
+ {'$gen_call', From, {shutdown, Explanation}} ->
+ {ForceTermination, NewState} = terminate(Explanation, State),
+ gen_server:reply(From, ok),
+ case ForceTermination of
+ force -> ok;
+ normal -> mainloop(Parent, Deb, NewState)
+ end;
{'$gen_call', From, info} ->
gen_server:reply(From, infos(?INFO_KEYS, State)),
mainloop(Parent, Deb, State);
@@ -323,6 +329,13 @@ switch_callback(OldState, NewCallback, Length) ->
OldState#v1{callback = NewCallback,
recv_ref = Ref}.
+terminate(Explanation, State = #v1{connection_state = running}) ->
+ {normal, send_exception(State, 0,
+ rabbit_misc:amqp_error(
+ connection_forced, Explanation, [], none))};
+terminate(_Explanation, State) ->
+ {force, State}.
+
close_connection(State = #v1{connection = #connection{
timeout_sec = TimeoutSec}}) ->
%% We terminate the connection after the specified interval, but