summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-03 14:26:05 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-03 14:26:05 +0100
commit5247d20bd3c9d1c6949c7a2952b6cea17b3293c4 (patch)
tree3e993c2fc60628a207d5e0a604aa157299376abd
parent6544a4de7c9db885b062f3e94fc43b1bb6ada163 (diff)
downloadrabbitmq-server-5247d20bd3c9d1c6949c7a2952b6cea17b3293c4.tar.gz
Reverted 'shutdown' to 'normal' for our own programmatic exits. This then leaves us with a problem with the reader shutting down channels as previously the framing_ch was trapping exits (so Reason of 'normal' gets converted to EXIT msg), but now it's not (Reason of 'normal' does *not* cause process to exit). Thus use framing_channel:shutdown instead . Also, a child may not return {ok, {pid, pid}} from its startup. But it may return {ok, pid, any} where any is passed back to the caller.
-rw-r--r--src/rabbit_channel.erl28
-rw-r--r--src/rabbit_channel_sup.erl4
-rw-r--r--src/rabbit_queue_collector.erl2
-rw-r--r--src/rabbit_reader.erl47
-rw-r--r--src/rabbit_tests.erl14
5 files changed, 54 insertions, 41 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3478908f..ca296b60 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -220,16 +220,14 @@ handle_cast({method, Method, Content}, State) ->
{noreply, NewState} ->
noreply(NewState);
stop ->
- {stop, shutdown, State#ch{state = terminating}}
+ {stop, normal, State#ch{state = terminating}}
catch
exit:Reason = #amqp_error{} ->
MethodName = rabbit_misc:method_record_type(Method),
- {stop, shutdown, terminating(Reason#amqp_error{method = MethodName},
+ {stop, normal, terminating(Reason#amqp_error{method = MethodName},
State)};
exit:normal ->
- {stop, shutdown, State};
- exit:shutdown ->
- {stop, shutdown, State};
+ {stop, normal, State};
_:Reason ->
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
@@ -238,7 +236,7 @@ handle_cast({flushed, QPid}, State) ->
{noreply, queue_blocked(QPid, State)};
handle_cast(terminate, State) ->
- {stop, shutdown, State};
+ {stop, normal, State};
handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
@@ -269,11 +267,11 @@ handle_cast({conserve_memory, _Conserve}, State) ->
handle_cast({flow_timeout, Ref},
State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) ->
- {stop, shutdown, terminating(
- rabbit_misc:amqp_error(
- precondition_failed,
- "timeout waiting for channel.flow_ok{active=~w}",
- [not Flow], none), State)};
+ {stop, normal, terminating(
+ rabbit_misc:amqp_error(
+ precondition_failed,
+ "timeout waiting for channel.flow_ok{active=~w}",
+ [not Flow], none), State)};
handle_cast({flow_timeout, _Ref}, State) ->
{noreply, State};
@@ -284,7 +282,7 @@ handle_cast(emit_stats, State) ->
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
- {stop, shutdown, State};
+ {stop, normal, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
@@ -301,8 +299,10 @@ terminate(_Reason, State = #ch{state = terminating}) ->
terminate(Reason, State) ->
Res = rollback_and_notify(State),
case Reason of
- shutdown -> ok = Res;
- _ -> ok
+ normal -> ok = Res;
+ shutdown -> ok = Res;
+ {shutdown, _Term} -> ok = Res;
+ _ -> ok
end,
terminate(State).
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index b565f236..17e1446d 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -47,7 +47,7 @@
(rabbit_types:protocol(), rabbit_net:socket(),
rabbit_channel:channel_number(), non_neg_integer(), pid(),
rabbit_access_control:username(), rabbit_types:vhost(), pid()) ->
- rabbit_types:ok({pid(), pid()})).
+ {'ok', pid(), pid()}).
-endif.
@@ -75,7 +75,7 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
{framing_channel, {rabbit_framing_channel, start_link,
[ChannelPid, Protocol]},
permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]}),
- {ok, {SupPid, FramingChannelPid}}.
+ {ok, SupPid, FramingChannelPid}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 2da6c182..ea3768d4 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -90,7 +90,7 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) ->
{reply, ok, State};
handle_call(shutdown, _From, State) ->
- {stop, shutdown, ok, State}.
+ {stop, normal, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 4b89db47..d1d21e16 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -177,8 +177,8 @@ init(Parent, ChannelSupSupPid) ->
start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform)
end.
-system_continue(_Parent, Deb, State) ->
- ?MODULE:mainloop(Deb, State).
+system_continue(Parent, Deb, State) ->
+ ?MODULE:mainloop(Deb, State = #v1{parent = Parent}).
system_terminate(Reason, _Parent, _Deb, _State) ->
exit(Reason).
@@ -246,7 +246,7 @@ socket_op(Sock, Fun) ->
[self(), Reason]),
rabbit_log:info("closing TCP connection ~p~n",
[self()]),
- exit(shutdown)
+ exit(normal)
end.
start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform) ->
@@ -297,9 +297,9 @@ start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform) ->
%%
%% gen_tcp:close(ClientSock),
teardown_profiling(ProfilingValue),
- rabbit_event:notify(connection_closed, [{pid, self()}]),
- exit(shutdown)
- end.
+ rabbit_event:notify(connection_closed, [{pid, self()}])
+ end,
+ done.
mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
%%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
@@ -406,13 +406,23 @@ close_channel(Channel, State) ->
handle_channel_exit(Channel, Reason, State) ->
handle_exception(State, Channel, Reason).
-handle_dependent_exit(Pid, shutdown, State) ->
- erase({chpid, Pid}),
- maybe_close(State);
handle_dependent_exit(Pid, Reason, State) ->
- case channel_cleanup(Pid) of
- undefined -> exit({abnormal_dependent_exit, Pid, Reason});
- Channel -> maybe_close(handle_exception(State, Channel, Reason))
+ case (case Reason of
+ shutdown -> controlled;
+ {shutdown, _Term} -> controlled;
+ normal -> controlled;
+ _ -> uncontrolled
+ end) of
+ controlled ->
+ erase({chpid, Pid}),
+ maybe_close(State);
+ uncontrolled ->
+ case channel_cleanup(Pid) of
+ undefined ->
+ exit({abnormal_dependent_exit, Pid, Reason});
+ Channel ->
+ maybe_close(handle_exception(State, Channel, Reason))
+ end
end.
channel_cleanup(Pid) ->
@@ -426,7 +436,8 @@ channel_cleanup(Pid) ->
all_channels() -> [Pid || {{chpid, Pid},_} <- get()].
terminate_channels() ->
- NChannels = length([exit(Pid, shutdown) || Pid <- all_channels()]),
+ NChannels =
+ length([rabbit_framing_channel:shutdown(Pid) || Pid <- all_channels()]),
if NChannels > 0 ->
Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
@@ -450,7 +461,9 @@ wait_for_channel_termination(N, TimerRef) ->
exit({abnormal_dependent_exit, Pid, Reason});
Channel ->
case Reason of
- shutdown -> ok;
+ normal -> ok;
+ shutdown -> ok;
+ {shutdown, _Term} -> ok;
_ ->
rabbit_log:error(
"connection ~p, channel ~p - "
@@ -618,8 +631,8 @@ handle_input(Callback, Data, _State) ->
%% includes a major and minor version number, Luckily 0-9 and 0-9-1
%% are similar enough that clients will be happy with either.
start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
- Protocol, State = #v1{sock = Sock,
- connection = Connection}) ->
+ Protocol,
+ State = #v1{sock = Sock, connection = Connection}) ->
Start = #'connection.start'{ version_major = ProtocolMajor,
version_minor = ProtocolMinor,
server_properties = server_properties(),
@@ -807,7 +820,7 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
frame_max = FrameMax,
user = #user{username = Username},
vhost = VHost}} = State,
- {ok, {_ChanSup, FrChPid}} =
+ {ok, _ChanSup, FrChPid} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup, [Protocol, Sock, Channel, FrameMax,
self(), Username, VHost, Collector]),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 59cfc064..23f30d13 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1088,8 +1088,8 @@ test_spawn(Receiver) ->
end,
{Writer, Ch}.
-expect_shutdown_channel_termination(Ch) ->
- receive {'EXIT', Ch, shutdown} -> ok
+expect_normal_channel_termination(Ch) ->
+ receive {'EXIT', Ch, normal} -> ok
after 1000 -> throw({channel_failed_to_shutdown, Ch})
end.
@@ -1120,7 +1120,7 @@ test_memory_pressure() ->
%% if we publish at this point, the channel should die
Content = rabbit_basic:build_content(#'P_basic'{}, <<>>),
ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content),
- expect_shutdown_channel_termination(Ch0),
+ expect_normal_channel_termination(Ch0),
gobble_channel_exit(),
{Writer1, Ch1} = test_memory_pressure_spawn(),
@@ -1132,19 +1132,19 @@ test_memory_pressure() ->
ok = test_memory_pressure_receive_flow(true),
%% send back the wrong flow_ok. Channel should die.
ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
- expect_shutdown_channel_termination(Ch1),
+ expect_normal_channel_termination(Ch1),
gobble_channel_exit(),
{_Writer2, Ch2} = test_memory_pressure_spawn(),
%% just out of the blue, send a flow_ok. Life should end.
ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}),
- expect_shutdown_channel_termination(Ch2),
+ expect_normal_channel_termination(Ch2),
gobble_channel_exit(),
{_Writer3, Ch3} = test_memory_pressure_spawn(),
ok = rabbit_channel:conserve_memory(Ch3, true),
ok = test_memory_pressure_receive_flow(false),
- receive {'EXIT', Ch3, shutdown} ->
+ receive {'EXIT', Ch3, normal} ->
ok
after 12000 ->
throw(channel_failed_to_exit)
@@ -1167,7 +1167,7 @@ test_memory_pressure() ->
after 1000 -> throw(failed_to_receive_channel_open_ok)
end,
rabbit_channel:shutdown(Ch4),
- expect_shutdown_channel_termination(Ch4),
+ expect_normal_channel_termination(Ch4),
true = process_flag(trap_exit, OldTrap),
passed.