summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-07-06 12:20:56 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-07-06 12:20:56 +0100
commit72c32850933d01b9ad7adf788af21affc40ac27a (patch)
treee04f85b822e65a73ad89dfe0c94a33d6c9033b5f
parent877b428bbe8c70510a26b2c583da35cc32226334 (diff)
downloadrabbitmq-server-15930.tar.gz
As a first step, get the channel, framing channel, and writer to exit with shutdown, not normal, as shutdown is the correct exit reason for supervised processes. Tests adjusted, and still pass15930
-rw-r--r--src/rabbit_channel.erl26
-rw-r--r--src/rabbit_reader.erl10
-rw-r--r--src/rabbit_tests.erl32
-rw-r--r--src/rabbit_writer.erl2
4 files changed, 37 insertions, 33 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 94a20fbd..692d7bac 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -191,14 +191,16 @@ handle_cast({method, Method, Content}, State) ->
{noreply, NewState} ->
noreply(NewState);
stop ->
- {stop, normal, State#ch{state = terminating}}
+ {stop, shutdown, State#ch{state = terminating}}
catch
exit:Reason = #amqp_error{} ->
MethodName = rabbit_misc:method_record_type(Method),
- {stop, normal, terminating(Reason#amqp_error{method = MethodName},
+ {stop, shutdown, terminating(Reason#amqp_error{method = MethodName},
State)};
exit:normal ->
- {stop, normal, State};
+ {stop, shutdown, State};
+ exit:shutdown ->
+ {stop, shutdown, State};
_:Reason ->
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
@@ -207,7 +209,7 @@ handle_cast({flushed, QPid}, State) ->
{noreply, queue_blocked(QPid, State)};
handle_cast(terminate, State) ->
- {stop, normal, State};
+ {stop, shutdown, State};
handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
@@ -232,18 +234,18 @@ handle_cast({conserve_memory, _Conserve}, State) ->
handle_cast({flow_timeout, Ref},
State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) ->
- {stop, normal, terminating(
- rabbit_misc:amqp_error(
- precondition_failed,
- "timeout waiting for channel.flow_ok{active=~w}",
- [not Flow], none), State)};
+ {stop, shutdown, terminating(
+ rabbit_misc:amqp_error(
+ precondition_failed,
+ "timeout waiting for channel.flow_ok{active=~w}",
+ [not Flow], none), State)};
handle_cast({flow_timeout, _Ref}, State) ->
{noreply, State}.
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
- {stop, normal, State};
+ {stop, shutdown, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
@@ -259,8 +261,8 @@ terminate(_Reason, State = #ch{state = terminating}) ->
terminate(Reason, State) ->
Res = rollback_and_notify(State),
case Reason of
- normal -> ok = Res;
- _ -> ok
+ shutdown -> ok = Res;
+ _ -> ok
end,
terminate(State).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index a54e0de9..0a48e61a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -227,7 +227,7 @@ socket_op(Sock, Fun) ->
[self(), Reason]),
rabbit_log:info("closing TCP connection ~p~n",
[self()]),
- exit(normal)
+ exit(shutdown)
end.
start_connection(Parent, Deb, Sock, SockTransform) ->
@@ -275,7 +275,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
rabbit_reader_queue_collector:shutdown(Collector),
rabbit_misc:unlink_and_capture_exit(Collector)
end,
- done.
+ exit(shutdown).
mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
%%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
@@ -379,7 +379,7 @@ close_channel(Channel, State) ->
handle_channel_exit(Channel, Reason, State) ->
handle_exception(State, Channel, Reason).
-handle_dependent_exit(Pid, normal, State) ->
+handle_dependent_exit(Pid, shutdown, State) ->
erase({chpid, Pid}),
maybe_close(State);
handle_dependent_exit(Pid, Reason, State) ->
@@ -399,7 +399,7 @@ channel_cleanup(Pid) ->
all_channels() -> [Pid || {{chpid, Pid},_} <- get()].
terminate_channels() ->
- NChannels = length([exit(Pid, normal) || Pid <- all_channels()]),
+ NChannels = length([exit(Pid, shutdown) || Pid <- all_channels()]),
if NChannels > 0 ->
Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
@@ -423,7 +423,7 @@ wait_for_channel_termination(N, TimerRef) ->
exit({abnormal_dependent_exit, Pid, Reason});
Channel ->
case Reason of
- normal -> ok;
+ shutdown -> ok;
_ ->
rabbit_log:error(
"connection ~p, channel ~p - "
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 7fbbc1ea..a8996e43 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -828,6 +828,8 @@ test_server_status() ->
%% cleanup
[{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]],
+
+ unlink(Ch),
ok = rabbit_channel:shutdown(Ch),
passed.
@@ -919,23 +921,23 @@ test_memory_pressure_spawn() ->
Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
self()),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
- MRef = erlang:monitor(process, Ch),
receive #'channel.open_ok'{} -> ok
after 1000 -> throw(failed_to_receive_channel_open_ok)
end,
- {Writer, Ch, MRef}.
+ {Writer, Ch}.
-expect_normal_channel_termination(MRef, Ch) ->
- receive {'DOWN', MRef, process, Ch, normal} -> ok
+expect_normal_channel_termination(Ch) ->
+ receive {'EXIT', Ch, shutdown} -> ok
after 1000 -> throw(channel_failed_to_exit)
end.
test_memory_pressure() ->
- {Writer0, Ch0, MRef0} = test_memory_pressure_spawn(),
+ OldTrap = process_flag(trap_exit, true),
+ {Writer0, Ch0} = test_memory_pressure_spawn(),
[ok = rabbit_channel:conserve_memory(Ch0, Conserve) ||
Conserve <- [false, false, true, false, true, true, false]],
ok = test_memory_pressure_sync(Ch0, Writer0),
- receive {'DOWN', MRef0, process, Ch0, Info0} ->
+ receive {'EXIT', Ch0, Info0} ->
throw({channel_died_early, Info0})
after 0 -> ok
end,
@@ -951,9 +953,9 @@ test_memory_pressure() ->
%% if we publish at this point, the channel should die
Content = rabbit_basic:build_content(#'P_basic'{}, <<>>),
ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content),
- expect_normal_channel_termination(MRef0, Ch0),
+ expect_normal_channel_termination(Ch0),
- {Writer1, Ch1, MRef1} = test_memory_pressure_spawn(),
+ {Writer1, Ch1} = test_memory_pressure_spawn(),
ok = rabbit_channel:conserve_memory(Ch1, true),
ok = test_memory_pressure_receive_flow(false),
ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
@@ -962,16 +964,16 @@ test_memory_pressure() ->
ok = test_memory_pressure_receive_flow(true),
%% send back the wrong flow_ok. Channel should die.
ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
- expect_normal_channel_termination(MRef1, Ch1),
+ expect_normal_channel_termination(Ch1),
- {_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(),
+ {_Writer2, Ch2} = test_memory_pressure_spawn(),
%% just out of the blue, send a flow_ok. Life should end.
ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}),
- expect_normal_channel_termination(MRef2, Ch2),
+ expect_normal_channel_termination(Ch2),
- {_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(),
+ {_Writer3, Ch3} = test_memory_pressure_spawn(),
ok = rabbit_channel:conserve_memory(Ch3, true),
- receive {'DOWN', MRef3, process, Ch3, _} ->
+ receive {'EXIT', Ch3, shutdown} ->
ok
after 12000 ->
throw(channel_failed_to_exit)
@@ -983,7 +985,6 @@ test_memory_pressure() ->
Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>,
self()),
ok = rabbit_channel:do(Ch4, #'channel.open'{}),
- MRef4 = erlang:monitor(process, Ch4),
Writer4 ! sync,
receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok)
@@ -996,8 +997,9 @@ test_memory_pressure() ->
after 1000 -> throw(failed_to_receive_channel_open_ok)
end,
rabbit_channel:shutdown(Ch4),
- expect_normal_channel_termination(MRef4, Ch4),
+ expect_normal_channel_termination(Ch4),
+ true = process_flag(trap_exit, OldTrap),
passed.
test_delegates_async(SecondaryNode) ->
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 233d7291..bf6e9bdf 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -121,7 +121,7 @@ handle_message({inet_reply, _, ok}, State) ->
handle_message({inet_reply, _, Status}, _State) ->
exit({writer, send_failed, Status});
handle_message(shutdown, _State) ->
- exit(normal);
+ exit(shutdown);
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).