diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-02-03 16:55:41 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-02-03 16:55:41 +0000 |
commit | 6493aec463e59246e19636c1de70885c0a5a41c8 (patch) | |
tree | 5554b80e8de5c7a8931b6518de58962c136c9536 | |
parent | e3f4f8aa92c8aaea15ffc94068f902b49ff74424 (diff) | |
download | rabbitmq-server-6493aec463e59246e19636c1de70885c0a5a41c8.tar.gz |
extend rabbit_reader API with a shutdown fun
This is wired into the same logic as the handling of a
supervisor-initiated shutdown. When the connection is in the 'running'
state we send a 'connection.close' with a CONNECTION_FORCED code and
follow our normal connection closure logic. In any other state we just
drop out of the mainloop and thus close the socket.
-rw-r--r-- | src/rabbit_reader.erl | 29 |
1 files changed, 21 insertions, 8 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 503e2fb4..90da2a7e 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -33,7 +33,7 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/0, info/1, info/2]). +-export([start_link/0, info/1, info/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -131,6 +131,7 @@ -spec(info/1 :: (pid()) -> [info()]). -spec(info/2 :: (pid(), [info_key()]) -> [info()]). +-spec(shutdown/2 :: (pid(), string()) -> 'ok'). -endif. @@ -139,6 +140,9 @@ start_link() -> {ok, proc_lib:spawn_link(?MODULE, init, [self()])}. +shutdown(Pid, Explanation) -> + gen_server:call(Pid, {shutdown, Explanation}, infinity). + init(Parent) -> Deb = sys:debug_options([]), receive @@ -264,13 +268,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> {inet_async, Sock, Ref, {error, Reason}} -> throw({inet_error, Reason}); {'EXIT', Parent, Reason} -> - if State#v1.connection_state =:= running -> - send_exception(State, 0, - rabbit_misc:amqp_error(connection_forced, - "broker forced connection closure with reason '~w'", - [Reason], none)); - true -> ok - end, + terminate(io_lib:format("broker forced connection closure " + "with reason '~w'", [Reason]), State), %% this is what we are expected to do according to %% http://www.erlang.org/doc/man/sys.html %% @@ -298,6 +297,13 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end; timeout -> throw({timeout, State#v1.connection_state}); + {'$gen_call', From, {shutdown, Explanation}} -> + {ForceTermination, NewState} = terminate(Explanation, State), + gen_server:reply(From, ok), + case ForceTermination of + force -> ok; + normal -> mainloop(Parent, Deb, NewState) + end; {'$gen_call', From, info} -> gen_server:reply(From, infos(?INFO_KEYS, State)), mainloop(Parent, Deb, State); @@ -320,6 +326,13 @@ switch_callback(OldState, NewCallback, Length) -> OldState#v1{callback = NewCallback, recv_ref = Ref}. +terminate(Explanation, State = #v1{connection_state = running}) -> + {normal, send_exception(State, 0, + rabbit_misc:amqp_error( + connection_forced, Explanation, [], none))}; +terminate(_Explanation, State) -> + {force, State}. + close_connection(State = #v1{connection = #connection{ timeout_sec = TimeoutSec}}) -> %% We terminate the connection after the specified interval, but |