summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-02-03 16:55:41 +0000
committerMatthias Radestock <matthias@lshift.net>2010-02-03 16:55:41 +0000
commit6493aec463e59246e19636c1de70885c0a5a41c8 (patch)
tree5554b80e8de5c7a8931b6518de58962c136c9536
parente3f4f8aa92c8aaea15ffc94068f902b49ff74424 (diff)
downloadrabbitmq-server-6493aec463e59246e19636c1de70885c0a5a41c8.tar.gz
extend rabbit_reader API with a shutdown fun
This is wired into the same logic as the handling of a supervisor-initiated shutdown. When the connection is in the 'running' state we send a 'connection.close' with a CONNECTION_FORCED code and follow our normal connection closure logic. In any other state we just drop out of the mainloop and thus close the socket.
-rw-r--r--src/rabbit_reader.erl29
1 files changed, 21 insertions, 8 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 503e2fb4..90da2a7e 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -33,7 +33,7 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/0, info/1, info/2]).
+-export([start_link/0, info/1, info/2, shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -131,6 +131,7 @@
-spec(info/1 :: (pid()) -> [info()]).
-spec(info/2 :: (pid(), [info_key()]) -> [info()]).
+-spec(shutdown/2 :: (pid(), string()) -> 'ok').
-endif.
@@ -139,6 +140,9 @@
start_link() ->
{ok, proc_lib:spawn_link(?MODULE, init, [self()])}.
+shutdown(Pid, Explanation) ->
+ gen_server:call(Pid, {shutdown, Explanation}, infinity).
+
init(Parent) ->
Deb = sys:debug_options([]),
receive
@@ -264,13 +268,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
{inet_async, Sock, Ref, {error, Reason}} ->
throw({inet_error, Reason});
{'EXIT', Parent, Reason} ->
- if State#v1.connection_state =:= running ->
- send_exception(State, 0,
- rabbit_misc:amqp_error(connection_forced,
- "broker forced connection closure with reason '~w'",
- [Reason], none));
- true -> ok
- end,
+ terminate(io_lib:format("broker forced connection closure "
+ "with reason '~w'", [Reason]), State),
%% this is what we are expected to do according to
%% http://www.erlang.org/doc/man/sys.html
%%
@@ -298,6 +297,13 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end;
timeout ->
throw({timeout, State#v1.connection_state});
+ {'$gen_call', From, {shutdown, Explanation}} ->
+ {ForceTermination, NewState} = terminate(Explanation, State),
+ gen_server:reply(From, ok),
+ case ForceTermination of
+ force -> ok;
+ normal -> mainloop(Parent, Deb, NewState)
+ end;
{'$gen_call', From, info} ->
gen_server:reply(From, infos(?INFO_KEYS, State)),
mainloop(Parent, Deb, State);
@@ -320,6 +326,13 @@ switch_callback(OldState, NewCallback, Length) ->
OldState#v1{callback = NewCallback,
recv_ref = Ref}.
+terminate(Explanation, State = #v1{connection_state = running}) ->
+ {normal, send_exception(State, 0,
+ rabbit_misc:amqp_error(
+ connection_forced, Explanation, [], none))};
+terminate(_Explanation, State) ->
+ {force, State}.
+
close_connection(State = #v1{connection = #connection{
timeout_sec = TimeoutSec}}) ->
%% We terminate the connection after the specified interval, but