diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-24 18:22:25 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-24 18:22:25 +0100 |
commit | 350b35a0058c4a6ea517e383cd9e162e403f4e00 (patch) | |
tree | 4ff5f232c804efe265c3004411ea389a12e445a2 | |
parent | f7f5b34ea829434d1071b11a72d0c4fc47ce31e6 (diff) | |
download | rabbitmq-server-350b35a0058c4a6ea517e383cd9e162e403f4e00.tar.gz |
Writing some tests revealed a number of mistakes
-rw-r--r-- | src/rabbit_channel.erl | 11 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 63 |
2 files changed, 68 insertions, 6 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3f1bf940..c258f6e8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -861,16 +861,16 @@ handle_method(#'channel.flow'{active = false}, _, handle_method(#'channel.flow_ok'{active = Active}, _, State = #ch{active = {pending, Active, {_Ref, TRef}}}) -> {ok, cancel} = timer:cancel(TRef), - noreply(State#ch{active = Active}); + {noreply, State#ch{active = Active}}; handle_method(#'channel.flow_ok'{active = Active}, _, State = #ch{active = {invert, Active, {_Ref, TRef}}}) -> {ok, cancel} = timer:cancel(TRef), ok = rabbit_writer:send_command( State#ch.writer_pid, #'channel.flow'{active = not Active}), Ref = make_ref(), - {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout, + {ok, TRef1} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout, [self(), Ref]), - noreply(State#ch{active = {pending, not Active, {Ref, TRef}}}); + {noreply, State#ch{active = {pending, not Active, {Ref, TRef1}}}}; handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( @@ -881,9 +881,8 @@ handle_method(_MethodRecord, _Content, _State) -> flow_control(Active, State = #ch{active = {pending, NotActive, Refs}}) when NotActive =:= not Active -> noreply(State#ch{active = {invert, NotActive, Refs}}); -flow_control(Active, State = #ch{active = {invert, NotActive, Refs}}) - when NotActive =:= not Active -> - noreply(State#ch{active = {pending, NotActive, Refs}}); +flow_control(Active, State = #ch{active = {invert, Active, Refs}}) -> + noreply(State#ch{active = {pending, Active, Refs}}); flow_control(Active, State = #ch{active = NotActive}) when NotActive =:= not Active -> ok = clear_permission_cache(), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 76ebd982..e0aaaf6a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -41,6 +41,7 @@ -import(lists). -include("rabbit.hrl"). +-include("rabbit_framing.hrl"). -include_lib("kernel/include/file.hrl"). test_content_prop_roundtrip(Datum, Binary) -> @@ -58,6 +59,7 @@ all_tests() -> passed = test_log_management(), passed = test_app_management(), passed = test_log_management_during_startup(), + passed = test_memory_pressure(), passed = test_cluster_management(), passed = test_user_management(), passed = test_server_status(), @@ -857,6 +859,67 @@ test_delegates_async(SecondaryNode) -> passed. +test_memory_pressure_receiver(Pid) -> + receive + shutdown -> + ok; + {send_command, #'channel.flow'{} = Method} -> + Pid ! Method, + test_memory_pressure_receiver(Pid); + sync -> + Pid ! sync, + test_memory_pressure_receiver(Pid) + end. + +test_memory_pressure_receive_flow(Active) -> + receive #'channel.flow'{active = Active} -> ok + after 1000 -> throw(failed_to_receive_channel_flow) + end, + receive #'channel.flow'{} -> + throw(pipelining_sync_commands_detected) + after 0 -> + ok + end. + +test_memory_pressure() -> + Me = self(), + Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end), + Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>), + ok = rabbit_channel:do(Ch, #'channel.open'{}), + ok = rabbit_channel:conserve_memory(Ch, false), + ok = rabbit_channel:conserve_memory(Ch, false), + ok = rabbit_channel:conserve_memory(Ch, true), + ok = rabbit_channel:conserve_memory(Ch, false), + ok = rabbit_channel:conserve_memory(Ch, true), + ok = rabbit_channel:conserve_memory(Ch, true), + ok = rabbit_channel:conserve_memory(Ch, false), + MRef = erlang:monitor(process, Ch), + receive {'DOWN', MRef, process, Ch, Info} -> + throw({channel_died_early, Info}) + after 0 -> + ok + end, + + Writer ! sync, + receive sync -> ok after 1000 -> throw(timeout) end, + + %% we should have just 1 active=false waiting for us + ok = test_memory_pressure_receive_flow(false), + + %% if we reply with flow_ok, we should immediately get an + %% active=true back + ok = rabbit_channel:do(Ch, #'channel.flow_ok'{active = false}), + ok = test_memory_pressure_receive_flow(true), + + %% if we publish at this point, the channel should die + ok = rabbit_channel:do(Ch, #'basic.publish'{}, #content{}), + receive {'DOWN', MRef, process, Ch, normal} -> + ok + after 1000 -> + throw(channel_failed_to_exit) + end, + passed. + make_responder(FMsg) -> fun() -> receive Msg -> FMsg(Msg) |