diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-25 12:42:00 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-25 12:42:00 +0100 |
commit | e649f863f74aae26c5e63061289eb4b3b14255d6 (patch) | |
tree | d2071d8773d75e015be4cef1d6461f96eb1aed42 | |
parent | 350b35a0058c4a6ea517e383cd9e162e403f4e00 (diff) | |
download | rabbitmq-server-e649f863f74aae26c5e63061289eb4b3b14255d6.tar.gz |
Minor refactorings
-rw-r--r-- | src/rabbit_channel.erl | 18 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 9 |
2 files changed, 14 insertions, 13 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c258f6e8..edae3305 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -865,12 +865,7 @@ handle_method(#'channel.flow_ok'{active = Active}, _, handle_method(#'channel.flow_ok'{active = Active}, _, State = #ch{active = {invert, Active, {_Ref, TRef}}}) -> {ok, cancel} = timer:cancel(TRef), - ok = rabbit_writer:send_command( - State#ch.writer_pid, #'channel.flow'{active = not Active}), - Ref = make_ref(), - {ok, TRef1} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout, - [self(), Ref]), - {noreply, State#ch{active = {pending, not Active, {Ref, TRef1}}}}; + {noreply, issue_flow(not Active, State)}; handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( @@ -886,14 +881,17 @@ flow_control(Active, State = #ch{active = {invert, Active, Refs}}) -> flow_control(Active, State = #ch{active = NotActive}) when NotActive =:= not Active -> ok = clear_permission_cache(), + noreply(issue_flow(Active, State)); +flow_control(_Active, State) -> + noreply(State). + +issue_flow(Active, State) -> ok = rabbit_writer:send_command( State#ch.writer_pid, #'channel.flow'{active = Active}), Ref = make_ref(), {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout, - [self(), Ref]), - noreply(State#ch{active = {pending, Active, {Ref, TRef}}}); -flow_control(_Active, State) -> - noreply(State). + [self(), Ref]), + State#ch{active = {pending, Active, {Ref, TRef}}}. binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e0aaaf6a..a871154c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -881,6 +881,10 @@ test_memory_pressure_receive_flow(Active) -> ok end. +test_memory_pressure_sync(WPid) -> + WPid ! sync, + receive sync -> ok after 1000 -> throw(timeout) end. + test_memory_pressure() -> Me = self(), Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end), @@ -900,15 +904,14 @@ test_memory_pressure() -> ok end, - Writer ! sync, - receive sync -> ok after 1000 -> throw(timeout) end, - + ok = test_memory_pressure_sync(Writer), %% we should have just 1 active=false waiting for us ok = test_memory_pressure_receive_flow(false), %% if we reply with flow_ok, we should immediately get an %% active=true back ok = rabbit_channel:do(Ch, #'channel.flow_ok'{active = false}), + ok = test_memory_pressure_sync(Writer), ok = test_memory_pressure_receive_flow(true), %% if we publish at this point, the channel should die |