summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-01-29 21:06:23 +0000
committerMatthias Radestock <matthias@lshift.net>2009-01-29 21:06:23 +0000
commit36d808c76c6471a48da4e6ace3bbc9e3a2c87801 (patch)
tree706c90d5c669f3292bb0ca6d53a15e3acb570382
parent7e20e72d884b13b7e21231b6a0c3784de8a5ccb8 (diff)
downloadrabbitmq-server-bug20275.tar.gz
don't terminate channel processes abnormally for amqp errorsbug20275
since that causes unnecessary error logging
-rw-r--r--src/rabbit_channel.erl22
-rw-r--r--src/rabbit_reader.erl14
2 files changed, 26 insertions, 10 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5a1c0952..88d563b7 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,12 +35,12 @@
-behaviour(gen_server2).
--export([start_link/4, do/2, do/3, shutdown/1]).
+-export([start_link/5, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
--record(ch, {state, reader_pid, writer_pid, limiter_pid,
+-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host,
@@ -54,7 +54,8 @@
-ifdef(use_specs).
--spec(start_link/4 :: (pid(), pid(), username(), vhost()) -> pid()).
+-spec(start_link/5 ::
+ (channel_number(), pid(), pid(), username(), vhost()) -> pid()).
-spec(do/2 :: (pid(), amqp_method()) -> 'ok').
-spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
@@ -66,9 +67,10 @@
%%----------------------------------------------------------------------------
-start_link(ReaderPid, WriterPid, Username, VHost) ->
+start_link(Channel, ReaderPid, WriterPid, Username, VHost) ->
{ok, Pid} = gen_server2:start_link(
- ?MODULE, [ReaderPid, WriterPid, Username, VHost], []),
+ ?MODULE, [Channel, ReaderPid, WriterPid,
+ Username, VHost], []),
Pid.
do(Pid, Method) ->
@@ -91,11 +93,12 @@ conserve_memory(Pid, Conserve) ->
%%---------------------------------------------------------------------------
-init([ReaderPid, WriterPid, Username, VHost]) ->
+init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
process_flag(trap_exit, true),
link(WriterPid),
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
{ok, #ch{state = starting,
+ channel = Channel,
reader_pid = ReaderPid,
writer_pid = WriterPid,
limiter_pid = undefined,
@@ -123,8 +126,11 @@ handle_cast({method, Method, Content}, State) ->
{stop, normal, State#ch{state = terminating}}
catch
exit:{amqp, Error, Explanation, none} ->
- {stop, {amqp, Error, Explanation,
- rabbit_misc:method_record_type(Method)}, State};
+ ok = notify_queues(internal_rollback(State)),
+ Reason = {amqp, Error, Explanation,
+ rabbit_misc:method_record_type(Method)},
+ State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
+ {stop, normal, State#ch{state = terminating}};
exit:normal ->
{stop, normal, State};
_:Reason ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 9dbc49df..12ee299e 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -284,6 +284,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
exit(Reason);
{'EXIT', _Pid, E = {writer, send_failed, _Error}} ->
throw(E);
+ {channel_exit, Channel, Reason} ->
+ mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
{'EXIT', Pid, Reason} ->
mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
{terminate_channel, Channel, Ref1} ->
@@ -351,6 +353,14 @@ terminate_channel(Channel, Ref, State) ->
end,
State.
+handle_channel_exit(Channel, Reason, State) ->
+ %% We remove the channel from the inbound map only. That allows
+ %% the channel to be re-opened, but also means the remaining
+ %% cleanup, including possibly closing the connection, is deferred
+ %% until we get the (normal) exit signal.
+ erase({channel, Channel}),
+ handle_exception(State, Channel, Reason).
+
handle_dependent_exit(Pid, normal, State) ->
channel_cleanup(Pid),
maybe_close(State);
@@ -711,8 +721,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
vhost = VHost}} = State,
WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/4,
- [self(), WriterPid, Username, VHost]),
+ fun rabbit_channel:start_link/5,
+ [Channel, self(), WriterPid, Username, VHost]),
put({channel, Channel}, {chpid, ChPid}),
put({chpid, ChPid}, {channel, Channel}),
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame);