diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-07-06 12:20:56 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-07-06 12:20:56 +0100 |
commit | 72c32850933d01b9ad7adf788af21affc40ac27a (patch) | |
tree | e04f85b822e65a73ad89dfe0c94a33d6c9033b5f | |
parent | 877b428bbe8c70510a26b2c583da35cc32226334 (diff) | |
download | rabbitmq-server-15930.tar.gz |
As a first step, get the channel, framing channel, and writer to exit with shutdown, not normal, as shutdown is the correct exit reason for supervised processes. Tests adjusted, and still pass15930
-rw-r--r-- | src/rabbit_channel.erl | 26 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 10 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 32 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 2 |
4 files changed, 37 insertions, 33 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 94a20fbd..692d7bac 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -191,14 +191,16 @@ handle_cast({method, Method, Content}, State) -> {noreply, NewState} -> noreply(NewState); stop -> - {stop, normal, State#ch{state = terminating}} + {stop, shutdown, State#ch{state = terminating}} catch exit:Reason = #amqp_error{} -> MethodName = rabbit_misc:method_record_type(Method), - {stop, normal, terminating(Reason#amqp_error{method = MethodName}, + {stop, shutdown, terminating(Reason#amqp_error{method = MethodName}, State)}; exit:normal -> - {stop, normal, State}; + {stop, shutdown, State}; + exit:shutdown -> + {stop, shutdown, State}; _:Reason -> {stop, {Reason, erlang:get_stacktrace()}, State} end; @@ -207,7 +209,7 @@ handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State)}; handle_cast(terminate, State) -> - {stop, normal, State}; + {stop, shutdown, State}; handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), @@ -232,18 +234,18 @@ handle_cast({conserve_memory, _Conserve}, State) -> handle_cast({flow_timeout, Ref}, State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) -> - {stop, normal, terminating( - rabbit_misc:amqp_error( - precondition_failed, - "timeout waiting for channel.flow_ok{active=~w}", - [not Flow], none), State)}; + {stop, shutdown, terminating( + rabbit_misc:amqp_error( + precondition_failed, + "timeout waiting for channel.flow_ok{active=~w}", + [not Flow], none), State)}; handle_cast({flow_timeout, _Ref}, State) -> {noreply, State}. handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, - {stop, normal, State}; + {stop, shutdown, State}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> @@ -259,8 +261,8 @@ terminate(_Reason, State = #ch{state = terminating}) -> terminate(Reason, State) -> Res = rollback_and_notify(State), case Reason of - normal -> ok = Res; - _ -> ok + shutdown -> ok = Res; + _ -> ok end, terminate(State). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index a54e0de9..0a48e61a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -227,7 +227,7 @@ socket_op(Sock, Fun) -> [self(), Reason]), rabbit_log:info("closing TCP connection ~p~n", [self()]), - exit(normal) + exit(shutdown) end. start_connection(Parent, Deb, Sock, SockTransform) -> @@ -275,7 +275,7 @@ start_connection(Parent, Deb, Sock, SockTransform) -> rabbit_reader_queue_collector:shutdown(Collector), rabbit_misc:unlink_and_capture_exit(Collector) end, - done. + exit(shutdown). mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]), @@ -379,7 +379,7 @@ close_channel(Channel, State) -> handle_channel_exit(Channel, Reason, State) -> handle_exception(State, Channel, Reason). -handle_dependent_exit(Pid, normal, State) -> +handle_dependent_exit(Pid, shutdown, State) -> erase({chpid, Pid}), maybe_close(State); handle_dependent_exit(Pid, Reason, State) -> @@ -399,7 +399,7 @@ channel_cleanup(Pid) -> all_channels() -> [Pid || {{chpid, Pid},_} <- get()]. terminate_channels() -> - NChannels = length([exit(Pid, normal) || Pid <- all_channels()]), + NChannels = length([exit(Pid, shutdown) || Pid <- all_channels()]), if NChannels > 0 -> Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, TimerRef = erlang:send_after(Timeout, self(), cancel_wait), @@ -423,7 +423,7 @@ wait_for_channel_termination(N, TimerRef) -> exit({abnormal_dependent_exit, Pid, Reason}); Channel -> case Reason of - normal -> ok; + shutdown -> ok; _ -> rabbit_log:error( "connection ~p, channel ~p - " diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 7fbbc1ea..a8996e43 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -828,6 +828,8 @@ test_server_status() -> %% cleanup [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]], + + unlink(Ch), ok = rabbit_channel:shutdown(Ch), passed. @@ -919,23 +921,23 @@ test_memory_pressure_spawn() -> Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, self()), ok = rabbit_channel:do(Ch, #'channel.open'{}), - MRef = erlang:monitor(process, Ch), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) end, - {Writer, Ch, MRef}. + {Writer, Ch}. -expect_normal_channel_termination(MRef, Ch) -> - receive {'DOWN', MRef, process, Ch, normal} -> ok +expect_normal_channel_termination(Ch) -> + receive {'EXIT', Ch, shutdown} -> ok after 1000 -> throw(channel_failed_to_exit) end. test_memory_pressure() -> - {Writer0, Ch0, MRef0} = test_memory_pressure_spawn(), + OldTrap = process_flag(trap_exit, true), + {Writer0, Ch0} = test_memory_pressure_spawn(), [ok = rabbit_channel:conserve_memory(Ch0, Conserve) || Conserve <- [false, false, true, false, true, true, false]], ok = test_memory_pressure_sync(Ch0, Writer0), - receive {'DOWN', MRef0, process, Ch0, Info0} -> + receive {'EXIT', Ch0, Info0} -> throw({channel_died_early, Info0}) after 0 -> ok end, @@ -951,9 +953,9 @@ test_memory_pressure() -> %% if we publish at this point, the channel should die Content = rabbit_basic:build_content(#'P_basic'{}, <<>>), ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content), - expect_normal_channel_termination(MRef0, Ch0), + expect_normal_channel_termination(Ch0), - {Writer1, Ch1, MRef1} = test_memory_pressure_spawn(), + {Writer1, Ch1} = test_memory_pressure_spawn(), ok = rabbit_channel:conserve_memory(Ch1, true), ok = test_memory_pressure_receive_flow(false), ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), @@ -962,16 +964,16 @@ test_memory_pressure() -> ok = test_memory_pressure_receive_flow(true), %% send back the wrong flow_ok. Channel should die. ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), - expect_normal_channel_termination(MRef1, Ch1), + expect_normal_channel_termination(Ch1), - {_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(), + {_Writer2, Ch2} = test_memory_pressure_spawn(), %% just out of the blue, send a flow_ok. Life should end. ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}), - expect_normal_channel_termination(MRef2, Ch2), + expect_normal_channel_termination(Ch2), - {_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(), + {_Writer3, Ch3} = test_memory_pressure_spawn(), ok = rabbit_channel:conserve_memory(Ch3, true), - receive {'DOWN', MRef3, process, Ch3, _} -> + receive {'EXIT', Ch3, shutdown} -> ok after 12000 -> throw(channel_failed_to_exit) @@ -983,7 +985,6 @@ test_memory_pressure() -> Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>, self()), ok = rabbit_channel:do(Ch4, #'channel.open'{}), - MRef4 = erlang:monitor(process, Ch4), Writer4 ! sync, receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok) @@ -996,8 +997,9 @@ test_memory_pressure() -> after 1000 -> throw(failed_to_receive_channel_open_ok) end, rabbit_channel:shutdown(Ch4), - expect_normal_channel_termination(MRef4, Ch4), + expect_normal_channel_termination(Ch4), + true = process_flag(trap_exit, OldTrap), passed. test_delegates_async(SecondaryNode) -> diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 233d7291..bf6e9bdf 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -121,7 +121,7 @@ handle_message({inet_reply, _, ok}, State) -> handle_message({inet_reply, _, Status}, _State) -> exit({writer, send_failed, Status}); handle_message(shutdown, _State) -> - exit(normal); + exit(shutdown); handle_message(Message, _State) -> exit({writer, message_not_understood, Message}). |