diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-01-29 21:06:23 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-01-29 21:06:23 +0000 |
commit | 36d808c76c6471a48da4e6ace3bbc9e3a2c87801 (patch) | |
tree | 706c90d5c669f3292bb0ca6d53a15e3acb570382 | |
parent | 7e20e72d884b13b7e21231b6a0c3784de8a5ccb8 (diff) | |
download | rabbitmq-server-bug20275.tar.gz |
don't terminate channel processes abnormally for amqp errorsbug20275
since that causes unnecessary error logging
-rw-r--r-- | src/rabbit_channel.erl | 22 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 14 |
2 files changed, 26 insertions, 10 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5a1c0952..88d563b7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,12 +35,12 @@ -behaviour(gen_server2). --export([start_link/4, do/2, do/3, shutdown/1]). +-export([start_link/5, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --record(ch, {state, reader_pid, writer_pid, limiter_pid, +-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, @@ -54,7 +54,8 @@ -ifdef(use_specs). --spec(start_link/4 :: (pid(), pid(), username(), vhost()) -> pid()). +-spec(start_link/5 :: + (channel_number(), pid(), pid(), username(), vhost()) -> pid()). -spec(do/2 :: (pid(), amqp_method()) -> 'ok'). -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). @@ -66,9 +67,10 @@ %%---------------------------------------------------------------------------- -start_link(ReaderPid, WriterPid, Username, VHost) -> +start_link(Channel, ReaderPid, WriterPid, Username, VHost) -> {ok, Pid} = gen_server2:start_link( - ?MODULE, [ReaderPid, WriterPid, Username, VHost], []), + ?MODULE, [Channel, ReaderPid, WriterPid, + Username, VHost], []), Pid. do(Pid, Method) -> @@ -91,11 +93,12 @@ conserve_memory(Pid, Conserve) -> %%--------------------------------------------------------------------------- -init([ReaderPid, WriterPid, Username, VHost]) -> +init([Channel, ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), {ok, #ch{state = starting, + channel = Channel, reader_pid = ReaderPid, writer_pid = WriterPid, limiter_pid = undefined, @@ -123,8 +126,11 @@ handle_cast({method, Method, Content}, State) -> {stop, normal, State#ch{state = terminating}} catch exit:{amqp, Error, Explanation, none} -> - {stop, {amqp, Error, Explanation, - rabbit_misc:method_record_type(Method)}, State}; + ok = notify_queues(internal_rollback(State)), + Reason = {amqp, Error, Explanation, + rabbit_misc:method_record_type(Method)}, + State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, + {stop, normal, State#ch{state = terminating}}; exit:normal -> {stop, normal, State}; _:Reason -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 9dbc49df..12ee299e 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -284,6 +284,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> exit(Reason); {'EXIT', _Pid, E = {writer, send_failed, _Error}} -> throw(E); + {channel_exit, Channel, Reason} -> + mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); {'EXIT', Pid, Reason} -> mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); {terminate_channel, Channel, Ref1} -> @@ -351,6 +353,14 @@ terminate_channel(Channel, Ref, State) -> end, State. +handle_channel_exit(Channel, Reason, State) -> + %% We remove the channel from the inbound map only. That allows + %% the channel to be re-opened, but also means the remaining + %% cleanup, including possibly closing the connection, is deferred + %% until we get the (normal) exit signal. + erase({channel, Channel}), + handle_exception(State, Channel, Reason). + handle_dependent_exit(Pid, normal, State) -> channel_cleanup(Pid), maybe_close(State); @@ -711,8 +721,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> vhost = VHost}} = State, WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/4, - [self(), WriterPid, Username, VHost]), + fun rabbit_channel:start_link/5, + [Channel, self(), WriterPid, Username, VHost]), put({channel, Channel}, {chpid, ChPid}), put({chpid, ChPid}, {channel, Channel}), ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame); |