diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-26 18:40:30 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-26 18:40:30 +0100 |
commit | 5c9938c68d93cb29577cf5e3f245cbfb5d6291a7 (patch) | |
tree | d955bcbcfaede21502d5517368debfc6b930d2a5 | |
parent | 16b397876264ad4e1973833b794596f89316231d (diff) | |
download | rabbitmq-server-5c9938c68d93cb29577cf5e3f245cbfb5d6291a7.tar.gz |
Sorted out exception handling correctly, and added many more tests
-rw-r--r-- | src/rabbit_channel.erl | 73 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 86 |
2 files changed, 110 insertions, 49 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a4ce5384..0cdf7f2d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -183,26 +183,9 @@ handle_call(_Request, _From, State) -> noreply(State). handle_cast({method, Method, Content}, State) -> - try handle_method(Method, Content, State) of - {reply, Reply, NewState} -> - ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), - noreply(NewState); - {noreply, NewState} -> - noreply(NewState); - stop -> - {stop, normal, State#ch{state = terminating}} - catch - exit:Reason = #amqp_error{} -> - ok = rollback_and_notify(State), - MethodName = rabbit_misc:method_record_type(Method), - State#ch.reader_pid ! {channel_exit, State#ch.channel, - Reason#amqp_error{method = MethodName}}, - {stop, normal, State#ch{state = terminating}}; - exit:normal -> - {stop, normal, State}; - _:Reason -> - {stop, {Reason, erlang:get_stacktrace()}, State} - end; + handle_exiting_function( + fun (State1) -> handle_method(Method, Content, State1) end, Method, + State); handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State)}; @@ -224,13 +207,18 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, handle_cast({conserve_memory, Conserve}, State) -> flow_control(not Conserve, State); -handle_cast({flow_timeout, Ref}, #ch{flow = #flow{client = ClientFlow, - pending = {Ref, _TRef}}}) -> - rabbit_misc:protocol_error( - precondition_failed, - "timeout waiting for channel.flow_ok{active=~w}", [not ClientFlow]); -handle_cast({flow_timeout, _Ref}, State) -> - noreply(State). +handle_cast({flow_timeout, Ref}, State) -> + handle_exiting_function( + fun (#ch{flow = #flow{client = ClientFlow, + pending = {Ref1, _TRef}}}) + when Ref =:= Ref1 -> + rabbit_misc:protocol_error( + precondition_failed, + "timeout waiting for channel.flow_ok{active=~w}", + [not ClientFlow]); + (State1) -> + {noreply, State1} + end, none, State). handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> @@ -369,6 +357,31 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. +handle_exiting_function(Fun, Method, State) -> + try Fun(State) of + {reply, Reply, NewState} -> + ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), + noreply(NewState); + {noreply, NewState} -> + noreply(NewState); + stop -> + {stop, normal, State#ch{state = terminating}} + catch + exit:Reason = #amqp_error{} -> + ok = rollback_and_notify(State), + MethodName = case Method of + none -> none; + _ -> rabbit_misc:method_record_type(Method) + end, + State#ch.reader_pid ! {channel_exit, State#ch.channel, + Reason#amqp_error{method = MethodName}}, + {stop, normal, State#ch{state = terminating}}; + exit:normal -> + {stop, normal, State}; + _:Reason -> + {stop, {Reason, erlang:get_stacktrace()}, State} + end. + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -850,11 +863,11 @@ handle_method(#'channel.flow'{active = false}, _, end; handle_method(#'channel.flow_ok'{active = Active}, _, - State = #ch{flow = #flow{server = Active, client = Flow, - pending = {_Ref, TRef}}}) + State = #ch{flow = F = #flow{server = Active, client = Flow, + pending = {_Ref, TRef}}}) when Flow =:= not Active -> {ok, cancel} = timer:cancel(TRef), - {noreply, State#ch{flow = #flow{client = Active, pending = none}}}; + {noreply, State#ch{flow = F#flow{client = Active, pending = none}}}; handle_method(#'channel.flow_ok'{active = Active}, _, State = #ch{flow = #flow{server = Flow, client = Flow, pending = {_Ref, TRef}}}) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a871154c..357e9949 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -863,7 +863,12 @@ test_memory_pressure_receiver(Pid) -> receive shutdown -> ok; - {send_command, #'channel.flow'{} = Method} -> + {send_command, Method} -> + ok = case Method of + #'channel.flow'{} -> ok; + #'basic.qos_ok'{} -> ok; + #'channel.open_ok'{} -> ok + end, Pid ! Method, test_memory_pressure_receiver(Pid); sync -> @@ -881,46 +886,89 @@ test_memory_pressure_receive_flow(Active) -> ok end. -test_memory_pressure_sync(WPid) -> - WPid ! sync, - receive sync -> ok after 1000 -> throw(timeout) end. +test_memory_pressure_sync(Ch, Writer) -> + ok = rabbit_channel:do(Ch, #'basic.qos'{}), + Writer ! sync, + receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, + receive #'basic.qos_ok'{} -> ok + after 1000 -> throw(failed_to_receive_basic_qos_ok) + end. -test_memory_pressure() -> +test_memory_pressure_spawn() -> Me = self(), Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end), Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>), ok = rabbit_channel:do(Ch, #'channel.open'{}), - ok = rabbit_channel:conserve_memory(Ch, false), - ok = rabbit_channel:conserve_memory(Ch, false), - ok = rabbit_channel:conserve_memory(Ch, true), - ok = rabbit_channel:conserve_memory(Ch, false), - ok = rabbit_channel:conserve_memory(Ch, true), - ok = rabbit_channel:conserve_memory(Ch, true), - ok = rabbit_channel:conserve_memory(Ch, false), MRef = erlang:monitor(process, Ch), - receive {'DOWN', MRef, process, Ch, Info} -> - throw({channel_died_early, Info}) + receive #'channel.open_ok'{} -> ok + after 1000 -> throw(failed_to_receive_channel_open_ok) + end, + {Writer, Ch, MRef}. + +test_memory_pressure() -> + {Writer0, Ch0, MRef0} = test_memory_pressure_spawn(), + ok = rabbit_channel:conserve_memory(Ch0, false), + ok = rabbit_channel:conserve_memory(Ch0, false), + ok = rabbit_channel:conserve_memory(Ch0, true), + ok = rabbit_channel:conserve_memory(Ch0, false), + ok = rabbit_channel:conserve_memory(Ch0, true), + ok = rabbit_channel:conserve_memory(Ch0, true), + ok = rabbit_channel:conserve_memory(Ch0, false), + ok = test_memory_pressure_sync(Ch0, Writer0), + receive {'DOWN', MRef0, process, Ch0, Info0} -> + throw({channel_died_early, Info0}) after 0 -> ok end, - ok = test_memory_pressure_sync(Writer), %% we should have just 1 active=false waiting for us ok = test_memory_pressure_receive_flow(false), %% if we reply with flow_ok, we should immediately get an %% active=true back - ok = rabbit_channel:do(Ch, #'channel.flow_ok'{active = false}), - ok = test_memory_pressure_sync(Writer), + ok = rabbit_channel:do(Ch0, #'channel.flow_ok'{active = false}), ok = test_memory_pressure_receive_flow(true), %% if we publish at this point, the channel should die - ok = rabbit_channel:do(Ch, #'basic.publish'{}, #content{}), - receive {'DOWN', MRef, process, Ch, normal} -> + ok = rabbit_channel:do(Ch0, #'basic.publish'{}, #content{}), + receive {'DOWN', MRef0, process, Ch0, normal} -> + ok + after 1000 -> + throw(channel_failed_to_exit) + end, + + {Writer1, Ch1, MRef1} = test_memory_pressure_spawn(), + ok = rabbit_channel:conserve_memory(Ch1, true), + ok = test_memory_pressure_receive_flow(false), + ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), + ok = test_memory_pressure_sync(Ch1, Writer1), + ok = rabbit_channel:conserve_memory(Ch1, false), + ok = test_memory_pressure_receive_flow(true), + %% send back the wrong flow_ok. Channel should die. + ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), + receive {'DOWN', MRef1, process, Ch1, normal} -> + ok + after 1000 -> + throw(channel_failed_to_exit) + end, + + {_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(), + %% just out of the blue, send a flow_ok. Life should end. + ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}), + receive {'DOWN', MRef2, process, Ch2, normal} -> ok after 1000 -> throw(channel_failed_to_exit) end, + + {_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(), + ok = rabbit_channel:conserve_memory(Ch3, true), + receive {'DOWN', MRef3, process, Ch3, _} -> + ok + after 12000 -> + throw(channel_failed_to_exit) + end, + passed. make_responder(FMsg) -> |