summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-05-24 18:22:25 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-05-24 18:22:25 +0100
commit350b35a0058c4a6ea517e383cd9e162e403f4e00 (patch)
tree4ff5f232c804efe265c3004411ea389a12e445a2
parentf7f5b34ea829434d1071b11a72d0c4fc47ce31e6 (diff)
downloadrabbitmq-server-350b35a0058c4a6ea517e383cd9e162e403f4e00.tar.gz
Writing some tests revealed a number of mistakes
-rw-r--r--src/rabbit_channel.erl11
-rw-r--r--src/rabbit_tests.erl63
2 files changed, 68 insertions, 6 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3f1bf940..c258f6e8 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -861,16 +861,16 @@ handle_method(#'channel.flow'{active = false}, _,
handle_method(#'channel.flow_ok'{active = Active}, _,
State = #ch{active = {pending, Active, {_Ref, TRef}}}) ->
{ok, cancel} = timer:cancel(TRef),
- noreply(State#ch{active = Active});
+ {noreply, State#ch{active = Active}};
handle_method(#'channel.flow_ok'{active = Active}, _,
State = #ch{active = {invert, Active, {_Ref, TRef}}}) ->
{ok, cancel} = timer:cancel(TRef),
ok = rabbit_writer:send_command(
State#ch.writer_pid, #'channel.flow'{active = not Active}),
Ref = make_ref(),
- {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout,
+ {ok, TRef1} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout,
[self(), Ref]),
- noreply(State#ch{active = {pending, not Active, {Ref, TRef}}});
+ {noreply, State#ch{active = {pending, not Active, {Ref, TRef1}}}};
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
@@ -881,9 +881,8 @@ handle_method(_MethodRecord, _Content, _State) ->
flow_control(Active, State = #ch{active = {pending, NotActive, Refs}})
when NotActive =:= not Active ->
noreply(State#ch{active = {invert, NotActive, Refs}});
-flow_control(Active, State = #ch{active = {invert, NotActive, Refs}})
- when NotActive =:= not Active ->
- noreply(State#ch{active = {pending, NotActive, Refs}});
+flow_control(Active, State = #ch{active = {invert, Active, Refs}}) ->
+ noreply(State#ch{active = {pending, Active, Refs}});
flow_control(Active, State = #ch{active = NotActive})
when NotActive =:= not Active ->
ok = clear_permission_cache(),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 76ebd982..e0aaaf6a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -41,6 +41,7 @@
-import(lists).
-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
-include_lib("kernel/include/file.hrl").
test_content_prop_roundtrip(Datum, Binary) ->
@@ -58,6 +59,7 @@ all_tests() ->
passed = test_log_management(),
passed = test_app_management(),
passed = test_log_management_during_startup(),
+ passed = test_memory_pressure(),
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
@@ -857,6 +859,67 @@ test_delegates_async(SecondaryNode) ->
passed.
+test_memory_pressure_receiver(Pid) ->
+ receive
+ shutdown ->
+ ok;
+ {send_command, #'channel.flow'{} = Method} ->
+ Pid ! Method,
+ test_memory_pressure_receiver(Pid);
+ sync ->
+ Pid ! sync,
+ test_memory_pressure_receiver(Pid)
+ end.
+
+test_memory_pressure_receive_flow(Active) ->
+ receive #'channel.flow'{active = Active} -> ok
+ after 1000 -> throw(failed_to_receive_channel_flow)
+ end,
+ receive #'channel.flow'{} ->
+ throw(pipelining_sync_commands_detected)
+ after 0 ->
+ ok
+ end.
+
+test_memory_pressure() ->
+ Me = self(),
+ Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end),
+ Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>),
+ ok = rabbit_channel:do(Ch, #'channel.open'{}),
+ ok = rabbit_channel:conserve_memory(Ch, false),
+ ok = rabbit_channel:conserve_memory(Ch, false),
+ ok = rabbit_channel:conserve_memory(Ch, true),
+ ok = rabbit_channel:conserve_memory(Ch, false),
+ ok = rabbit_channel:conserve_memory(Ch, true),
+ ok = rabbit_channel:conserve_memory(Ch, true),
+ ok = rabbit_channel:conserve_memory(Ch, false),
+ MRef = erlang:monitor(process, Ch),
+ receive {'DOWN', MRef, process, Ch, Info} ->
+ throw({channel_died_early, Info})
+ after 0 ->
+ ok
+ end,
+
+ Writer ! sync,
+ receive sync -> ok after 1000 -> throw(timeout) end,
+
+ %% we should have just 1 active=false waiting for us
+ ok = test_memory_pressure_receive_flow(false),
+
+ %% if we reply with flow_ok, we should immediately get an
+ %% active=true back
+ ok = rabbit_channel:do(Ch, #'channel.flow_ok'{active = false}),
+ ok = test_memory_pressure_receive_flow(true),
+
+ %% if we publish at this point, the channel should die
+ ok = rabbit_channel:do(Ch, #'basic.publish'{}, #content{}),
+ receive {'DOWN', MRef, process, Ch, normal} ->
+ ok
+ after 1000 ->
+ throw(channel_failed_to_exit)
+ end,
+ passed.
+
make_responder(FMsg) ->
fun() ->
receive Msg -> FMsg(Msg)