diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-03 14:26:05 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-03 14:26:05 +0100 |
commit | 5247d20bd3c9d1c6949c7a2952b6cea17b3293c4 (patch) | |
tree | 3e993c2fc60628a207d5e0a604aa157299376abd | |
parent | 6544a4de7c9db885b062f3e94fc43b1bb6ada163 (diff) | |
download | rabbitmq-server-5247d20bd3c9d1c6949c7a2952b6cea17b3293c4.tar.gz |
Reverted 'shutdown' to 'normal' for our own programmatic exits. This then leaves us with a problem with the reader shutting down channels as previously the framing_ch was trapping exits (so Reason of 'normal' gets converted to EXIT msg), but now it's not (Reason of 'normal' does *not* cause process to exit). Thus use framing_channel:shutdown instead . Also, a child may not return {ok, {pid, pid}} from its startup. But it may return {ok, pid, any} where any is passed back to the caller.
-rw-r--r-- | src/rabbit_channel.erl | 28 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 4 | ||||
-rw-r--r-- | src/rabbit_queue_collector.erl | 2 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 47 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 14 |
5 files changed, 54 insertions, 41 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3478908f..ca296b60 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -220,16 +220,14 @@ handle_cast({method, Method, Content}, State) -> {noreply, NewState} -> noreply(NewState); stop -> - {stop, shutdown, State#ch{state = terminating}} + {stop, normal, State#ch{state = terminating}} catch exit:Reason = #amqp_error{} -> MethodName = rabbit_misc:method_record_type(Method), - {stop, shutdown, terminating(Reason#amqp_error{method = MethodName}, + {stop, normal, terminating(Reason#amqp_error{method = MethodName}, State)}; exit:normal -> - {stop, shutdown, State}; - exit:shutdown -> - {stop, shutdown, State}; + {stop, normal, State}; _:Reason -> {stop, {Reason, erlang:get_stacktrace()}, State} end; @@ -238,7 +236,7 @@ handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State)}; handle_cast(terminate, State) -> - {stop, shutdown, State}; + {stop, normal, State}; handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), @@ -269,11 +267,11 @@ handle_cast({conserve_memory, _Conserve}, State) -> handle_cast({flow_timeout, Ref}, State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) -> - {stop, shutdown, terminating( - rabbit_misc:amqp_error( - precondition_failed, - "timeout waiting for channel.flow_ok{active=~w}", - [not Flow], none), State)}; + {stop, normal, terminating( + rabbit_misc:amqp_error( + precondition_failed, + "timeout waiting for channel.flow_ok{active=~w}", + [not Flow], none), State)}; handle_cast({flow_timeout, _Ref}, State) -> {noreply, State}; @@ -284,7 +282,7 @@ handle_cast(emit_stats, State) -> handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, - {stop, shutdown, State}; + {stop, normal, State}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> @@ -301,8 +299,10 @@ terminate(_Reason, State = #ch{state = terminating}) -> terminate(Reason, State) -> Res = rollback_and_notify(State), case Reason of - shutdown -> ok = Res; - _ -> ok + normal -> ok = Res; + shutdown -> ok = Res; + {shutdown, _Term} -> ok = Res; + _ -> ok end, terminate(State). diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index b565f236..17e1446d 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -47,7 +47,7 @@ (rabbit_types:protocol(), rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), pid(), rabbit_access_control:username(), rabbit_types:vhost(), pid()) -> - rabbit_types:ok({pid(), pid()})). + {'ok', pid(), pid()}). -endif. @@ -75,7 +75,7 @@ start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, {framing_channel, {rabbit_framing_channel, start_link, [ChannelPid, Protocol]}, permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]}), - {ok, {SupPid, FramingChannelPid}}. + {ok, SupPid, FramingChannelPid}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index 2da6c182..ea3768d4 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -90,7 +90,7 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) -> {reply, ok, State}; handle_call(shutdown, _From, State) -> - {stop, shutdown, ok, State}. + {stop, normal, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 4b89db47..d1d21e16 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -177,8 +177,8 @@ init(Parent, ChannelSupSupPid) -> start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform) end. -system_continue(_Parent, Deb, State) -> - ?MODULE:mainloop(Deb, State). +system_continue(Parent, Deb, State) -> + ?MODULE:mainloop(Deb, State = #v1{parent = Parent}). system_terminate(Reason, _Parent, _Deb, _State) -> exit(Reason). @@ -246,7 +246,7 @@ socket_op(Sock, Fun) -> [self(), Reason]), rabbit_log:info("closing TCP connection ~p~n", [self()]), - exit(shutdown) + exit(normal) end. start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform) -> @@ -297,9 +297,9 @@ start_connection(Parent, ChannelSupSupPid, Deb, Sock, SockTransform) -> %% %% gen_tcp:close(ClientSock), teardown_profiling(ProfilingValue), - rabbit_event:notify(connection_closed, [{pid, self()}]), - exit(shutdown) - end. + rabbit_event:notify(connection_closed, [{pid, self()}]) + end, + done. mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]), @@ -406,13 +406,23 @@ close_channel(Channel, State) -> handle_channel_exit(Channel, Reason, State) -> handle_exception(State, Channel, Reason). -handle_dependent_exit(Pid, shutdown, State) -> - erase({chpid, Pid}), - maybe_close(State); handle_dependent_exit(Pid, Reason, State) -> - case channel_cleanup(Pid) of - undefined -> exit({abnormal_dependent_exit, Pid, Reason}); - Channel -> maybe_close(handle_exception(State, Channel, Reason)) + case (case Reason of + shutdown -> controlled; + {shutdown, _Term} -> controlled; + normal -> controlled; + _ -> uncontrolled + end) of + controlled -> + erase({chpid, Pid}), + maybe_close(State); + uncontrolled -> + case channel_cleanup(Pid) of + undefined -> + exit({abnormal_dependent_exit, Pid, Reason}); + Channel -> + maybe_close(handle_exception(State, Channel, Reason)) + end end. channel_cleanup(Pid) -> @@ -426,7 +436,8 @@ channel_cleanup(Pid) -> all_channels() -> [Pid || {{chpid, Pid},_} <- get()]. terminate_channels() -> - NChannels = length([exit(Pid, shutdown) || Pid <- all_channels()]), + NChannels = + length([rabbit_framing_channel:shutdown(Pid) || Pid <- all_channels()]), if NChannels > 0 -> Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, TimerRef = erlang:send_after(Timeout, self(), cancel_wait), @@ -450,7 +461,9 @@ wait_for_channel_termination(N, TimerRef) -> exit({abnormal_dependent_exit, Pid, Reason}); Channel -> case Reason of - shutdown -> ok; + normal -> ok; + shutdown -> ok; + {shutdown, _Term} -> ok; _ -> rabbit_log:error( "connection ~p, channel ~p - " @@ -618,8 +631,8 @@ handle_input(Callback, Data, _State) -> %% includes a major and minor version number, Luckily 0-9 and 0-9-1 %% are similar enough that clients will be happy with either. start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, - Protocol, State = #v1{sock = Sock, - connection = Connection}) -> + Protocol, + State = #v1{sock = Sock, connection = Connection}) -> Start = #'connection.start'{ version_major = ProtocolMajor, version_minor = ProtocolMinor, server_properties = server_properties(), @@ -807,7 +820,7 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> frame_max = FrameMax, user = #user{username = Username}, vhost = VHost}} = State, - {ok, {_ChanSup, FrChPid}} = + {ok, _ChanSup, FrChPid} = rabbit_channel_sup_sup:start_channel( ChanSupSup, [Protocol, Sock, Channel, FrameMax, self(), Username, VHost, Collector]), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 59cfc064..23f30d13 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1088,8 +1088,8 @@ test_spawn(Receiver) -> end, {Writer, Ch}. -expect_shutdown_channel_termination(Ch) -> - receive {'EXIT', Ch, shutdown} -> ok +expect_normal_channel_termination(Ch) -> + receive {'EXIT', Ch, normal} -> ok after 1000 -> throw({channel_failed_to_shutdown, Ch}) end. @@ -1120,7 +1120,7 @@ test_memory_pressure() -> %% if we publish at this point, the channel should die Content = rabbit_basic:build_content(#'P_basic'{}, <<>>), ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content), - expect_shutdown_channel_termination(Ch0), + expect_normal_channel_termination(Ch0), gobble_channel_exit(), {Writer1, Ch1} = test_memory_pressure_spawn(), @@ -1132,19 +1132,19 @@ test_memory_pressure() -> ok = test_memory_pressure_receive_flow(true), %% send back the wrong flow_ok. Channel should die. ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), - expect_shutdown_channel_termination(Ch1), + expect_normal_channel_termination(Ch1), gobble_channel_exit(), {_Writer2, Ch2} = test_memory_pressure_spawn(), %% just out of the blue, send a flow_ok. Life should end. ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}), - expect_shutdown_channel_termination(Ch2), + expect_normal_channel_termination(Ch2), gobble_channel_exit(), {_Writer3, Ch3} = test_memory_pressure_spawn(), ok = rabbit_channel:conserve_memory(Ch3, true), ok = test_memory_pressure_receive_flow(false), - receive {'EXIT', Ch3, shutdown} -> + receive {'EXIT', Ch3, normal} -> ok after 12000 -> throw(channel_failed_to_exit) @@ -1167,7 +1167,7 @@ test_memory_pressure() -> after 1000 -> throw(failed_to_receive_channel_open_ok) end, rabbit_channel:shutdown(Ch4), - expect_shutdown_channel_termination(Ch4), + expect_normal_channel_termination(Ch4), true = process_flag(trap_exit, OldTrap), passed. |