diff options
author | Vlad Ionescu <vlad@lshift.net> | 2009-09-22 13:17:01 +0100 |
---|---|---|
committer | Vlad Ionescu <vlad@lshift.net> | 2009-09-22 13:17:01 +0100 |
commit | af9bb2a7bc33b126aa38792138d5dd922978e0aa (patch) | |
tree | d349fd25970847b418e66d90fa25f807262f12ad /src | |
parent | 9e41642b809a4631c88107adc7aa70db48511a46 (diff) | |
parent | 5ff2563f1ab5305509bf3be6691ea4493f98e85e (diff) | |
download | rabbitmq-server-af9bb2a7bc33b126aa38792138d5dd922978e0aa.tar.gz |
merging bug21650 into default
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_reader.erl | 112 |
1 files changed, 36 insertions, 76 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 761794f1..beb53761 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -49,7 +49,6 @@ -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 1). --define(CHANNEL_CLOSING_TIMEOUT, 1). -define(CHANNEL_TERMINATION_TIMEOUT, 3). %--------------------------------------------------------------------------- @@ -94,23 +93,19 @@ %% -> log error, wait for channels to terminate forcefully, start %% terminate_connection timer, send close, *closed* %% channel exit with soft error -%% -> log error, start terminate_channel timer, mark channel as -%% closing, *running* -%% terminate_channel timeout -> remove 'closing' mark, *running* +%% -> log error, mark channel as closing, *running* %% handshake_timeout -> ignore, *running* %% heartbeat timeout -> *throw* %% closing: %% socket close -> *terminate* %% receive frame -> ignore, *closing* -%% terminate_channel timeout -> remove 'closing' mark, *closing* %% handshake_timeout -> ignore, *closing* %% heartbeat timeout -> *throw* %% channel exit with hard error %% -> log error, wait for channels to terminate forcefully, start %% terminate_connection timer, send close, *closed* %% channel exit with soft error -%% -> log error, start terminate_channel timer, mark channel as -%% closing +%% -> log error, mark channel as closing %% if last channel to exit then send connection.close_ok, %% start terminate_connection timer, *closed* %% else *closing* @@ -123,7 +118,6 @@ %% *closed* %% receive frame -> ignore, *closed* %% terminate_connection timeout -> *terminate* -%% terminate_channel timeout -> remove 'closing' mark, *closed* %% handshake_timeout -> ignore, *closed* %% heartbeat timeout -> *throw* %% channel exit -> log error, *closed* @@ -292,8 +286,6 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); {'EXIT', Pid, Reason} -> mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); - {terminate_channel, Channel, Ref1} -> - mainloop(Parent, Deb, terminate_channel(Channel, Ref1, State)); terminate_connection -> State; handshake_timeout -> @@ -341,32 +333,14 @@ close_connection(State = #v1{connection = #connection{ State#v1{connection_state = closed}. close_channel(Channel, State) -> - Ref = make_ref(), - TRef = erlang:send_after(1000 * ?CHANNEL_CLOSING_TIMEOUT, - self(), - {terminate_channel, Channel, Ref}), - put({closing_channel, Channel}, {Ref, TRef}), - State. - -terminate_channel(Channel, Ref, State) -> - case get({closing_channel, Channel}) of - undefined -> ok; %% got close_ok in the meantime - {Ref, _} -> erase({closing_channel, Channel}), - ok; - {_Ref, _} -> ok %% got close_ok, and have new closing channel - end, + put({channel, Channel}, closing), State. handle_channel_exit(Channel, Reason, State) -> - %% We remove the channel from the inbound map only. That allows - %% the channel to be re-opened, but also means the remaining - %% cleanup, including possibly closing the connection, is deferred - %% until we get the (normal) exit signal. - erase({channel, Channel}), handle_exception(State, Channel, Reason). handle_dependent_exit(Pid, normal, State) -> - channel_cleanup(Pid), + erase({chpid, Pid}), maybe_close(State); handle_dependent_exit(Pid, Reason, State) -> case channel_cleanup(Pid) of @@ -376,17 +350,10 @@ handle_dependent_exit(Pid, Reason, State) -> channel_cleanup(Pid) -> case get({chpid, Pid}) of - undefined -> - case get({closing_chpid, Pid}) of - undefined -> undefined; - {channel, Channel} -> - erase({closing_chpid, Pid}), - Channel - end; - {channel, Channel} -> - erase({channel, Channel}), - erase({chpid, Pid}), - Channel + undefined -> undefined; + {channel, Channel} -> erase({channel, Channel}), + erase({chpid, Pid}), + Channel end. all_channels() -> [Pid || {{chpid, Pid},_} <- get()]. @@ -467,13 +434,27 @@ handle_frame(Type, Channel, Payload, State) -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of {chpid, ChPid} -> - ok = check_for_close(Channel, ChPid, AnalyzedFrame), + case AnalyzedFrame of + {method, 'channel.close', _} -> + erase({channel, Channel}); + _ -> ok + end, ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame), State; + closing -> + %% According to the spec, after sending a + %% channel.close we must ignore all frames except + %% channel.close_ok. + case AnalyzedFrame of + {method, 'channel.close_ok', _} -> + erase({channel, Channel}); + _ -> ok + end, + State; undefined -> case State#v1.connection_state of - running -> send_to_new_channel( - Channel, AnalyzedFrame, State), + running -> ok = send_to_new_channel( + Channel, AnalyzedFrame, State), State; Other -> throw({channel_frame_while_starting, Channel, Other, AnalyzedFrame}) @@ -716,38 +697,17 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- send_to_new_channel(Channel, AnalyzedFrame, State) -> - case get({closing_channel, Channel}) of - undefined -> - #v1{sock = Sock, - connection = #connection{ - frame_max = FrameMax, - user = #user{username = Username}, - vhost = VHost}} = State, - WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), - ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/5, - [Channel, self(), WriterPid, Username, VHost]), - put({channel, Channel}, {chpid, ChPid}), - put({chpid, ChPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame); - {_, TRef} -> - %% According to the spec, after sending a channel.close we - %% must ignore all frames except channel.close_ok. - case AnalyzedFrame of - {method, 'channel.close_ok', _} -> - erlang:cancel_timer(TRef), - erase({closing_channel, Channel}), - ok; - _Other -> ok - end - end. - -check_for_close(Channel, ChPid, {method, 'channel.close', _}) -> - channel_cleanup(ChPid), - put({closing_chpid, ChPid}, {channel, Channel}), - ok; -check_for_close(_Channel, _ChPid, _Frame) -> - ok. + #v1{sock = Sock, connection = #connection{ + frame_max = FrameMax, + user = #user{username = Username}, + vhost = VHost}} = State, + WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), + ChPid = rabbit_framing_channel:start_link( + fun rabbit_channel:start_link/5, + [Channel, self(), WriterPid, Username, VHost]), + put({channel, Channel}, {chpid, ChPid}), + put({chpid, ChPid}, {channel, Channel}), + ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", |