summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-05-25 12:42:00 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-05-25 12:42:00 +0100
commite649f863f74aae26c5e63061289eb4b3b14255d6 (patch)
treed2071d8773d75e015be4cef1d6461f96eb1aed42
parent350b35a0058c4a6ea517e383cd9e162e403f4e00 (diff)
downloadrabbitmq-server-e649f863f74aae26c5e63061289eb4b3b14255d6.tar.gz
Minor refactorings
-rw-r--r--src/rabbit_channel.erl18
-rw-r--r--src/rabbit_tests.erl9
2 files changed, 14 insertions, 13 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c258f6e8..edae3305 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -865,12 +865,7 @@ handle_method(#'channel.flow_ok'{active = Active}, _,
handle_method(#'channel.flow_ok'{active = Active}, _,
State = #ch{active = {invert, Active, {_Ref, TRef}}}) ->
{ok, cancel} = timer:cancel(TRef),
- ok = rabbit_writer:send_command(
- State#ch.writer_pid, #'channel.flow'{active = not Active}),
- Ref = make_ref(),
- {ok, TRef1} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout,
- [self(), Ref]),
- {noreply, State#ch{active = {pending, not Active, {Ref, TRef1}}}};
+ {noreply, issue_flow(not Active, State)};
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
@@ -886,14 +881,17 @@ flow_control(Active, State = #ch{active = {invert, Active, Refs}}) ->
flow_control(Active, State = #ch{active = NotActive})
when NotActive =:= not Active ->
ok = clear_permission_cache(),
+ noreply(issue_flow(Active, State));
+flow_control(_Active, State) ->
+ noreply(State).
+
+issue_flow(Active, State) ->
ok = rabbit_writer:send_command(
State#ch.writer_pid, #'channel.flow'{active = Active}),
Ref = make_ref(),
{ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout,
- [self(), Ref]),
- noreply(State#ch{active = {pending, Active, {Ref, TRef}}});
-flow_control(_Active, State) ->
- noreply(State).
+ [self(), Ref]),
+ State#ch{active = {pending, Active, {Ref, TRef}}}.
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e0aaaf6a..a871154c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -881,6 +881,10 @@ test_memory_pressure_receive_flow(Active) ->
ok
end.
+test_memory_pressure_sync(WPid) ->
+ WPid ! sync,
+ receive sync -> ok after 1000 -> throw(timeout) end.
+
test_memory_pressure() ->
Me = self(),
Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end),
@@ -900,15 +904,14 @@ test_memory_pressure() ->
ok
end,
- Writer ! sync,
- receive sync -> ok after 1000 -> throw(timeout) end,
-
+ ok = test_memory_pressure_sync(Writer),
%% we should have just 1 active=false waiting for us
ok = test_memory_pressure_receive_flow(false),
%% if we reply with flow_ok, we should immediately get an
%% active=true back
ok = rabbit_channel:do(Ch, #'channel.flow_ok'{active = false}),
+ ok = test_memory_pressure_sync(Writer),
ok = test_memory_pressure_receive_flow(true),
%% if we publish at this point, the channel should die