summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-08-05 15:56:46 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-08-05 15:56:46 +0100
commit145a1db8d13bdae5d1b6ab5747f7d8862593ffb9 (patch)
treed4ed9b42d7f456ef8cb206a9c714515f23a6cc8c
parentad6fde4fd7a44a44dda489f07248c0ba35ead067 (diff)
parentdc81d515fa8843b142fcd1d6f3731bfa6930cfcc (diff)
downloadrabbitmq-server-145a1db8d13bdae5d1b6ab5747f7d8862593ffb9.tar.gz
merge default into bug23027
-rw-r--r--Makefile8
-rw-r--r--src/rabbit_channel.erl86
-rw-r--r--src/rabbit_heartbeat.erl34
-rw-r--r--src/rabbit_reader.erl109
-rw-r--r--src/rabbit_tests.erl124
5 files changed, 122 insertions, 239 deletions
diff --git a/Makefile b/Makefile
index a97838cc..1f15921b 100644
--- a/Makefile
+++ b/Makefile
@@ -179,6 +179,14 @@ stop-rabbit-on-node: all
force-snapshot: all
echo "rabbit_persister:force_snapshot()." | $(ERL_CALL)
+set-memory-alarm: all
+ echo "alarm_handler:set_alarm({vm_memory_high_watermark, []})." | \
+ $(ERL_CALL)
+
+clear-memory-alarm: all
+ echo "alarm_handler:clear_alarm(vm_memory_high_watermark)." | \
+ $(ERL_CALL)
+
stop-node:
-$(ERL_CALL) -q
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 6ee6e209..edbcb2ea 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,12 +36,10 @@
-behaviour(gen_server2).
-export([start_link/6, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
+-export([send_command/2, deliver/4, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([emit_stats/1, flush/1]).
--export([flow_timeout/2]).
-
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1]).
@@ -49,13 +47,9 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, queue_collector_pid, flow,
- stats_timer}).
-
--record(flow, {server, client, pending}).
+ consumer_mapping, blocking, queue_collector_pid, stats_timer}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
--define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds
-define(STATISTICS_KEYS,
[pid,
@@ -80,7 +74,6 @@
-export_type([channel_number/0]).
--type(ref() :: any()).
-type(channel_number() :: non_neg_integer()).
-spec(start_link/6 ::
@@ -94,9 +87,7 @@
-spec(deliver/4 ::
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
--spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
--spec(flow_timeout/2 :: (pid(), ref()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
@@ -128,15 +119,9 @@ send_command(Pid, Msg) ->
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
-conserve_memory(Pid, Conserve) ->
- gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}).
-
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
-flow_timeout(Pid, Ref) ->
- gen_server2:pcast(Pid, 7, {flow_timeout, Ref}).
-
list() ->
pg_local:get_members(rabbit_channels).
@@ -185,8 +170,6 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
consumer_mapping = dict:new(),
blocking = dict:new(),
queue_collector_pid = CollectorPid,
- flow = #flow{server = true, client = true,
- pending = none},
stats_timer = rabbit_event:init_stats_timer()},
rabbit_event:notify(
channel_created,
@@ -252,26 +235,6 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
end, State),
noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_cast({conserve_memory, true}, State = #ch{state = starting}) ->
- noreply(State);
-handle_cast({conserve_memory, false}, State = #ch{state = starting}) ->
- ok = rabbit_writer:send_command(State#ch.writer_pid, #'channel.open_ok'{}),
- noreply(State#ch{state = running});
-handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) ->
- flow_control(not Conserve, State);
-handle_cast({conserve_memory, _Conserve}, State) ->
- noreply(State);
-
-handle_cast({flow_timeout, Ref},
- State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) ->
- {stop, normal, terminating(
- rabbit_misc:amqp_error(
- precondition_failed,
- "timeout waiting for channel.flow_ok{active=~w}",
- [not Flow], none), State)};
-handle_cast({flow_timeout, _Ref}, State) ->
- {noreply, State};
-
handle_cast(emit_stats, State) ->
internal_emit_stats(State),
{noreply, State}.
@@ -429,10 +392,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
- case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of
- true -> {noreply, State};
- false -> {reply, #'channel.open_ok'{}, State#ch{state = running}}
- end;
+ {reply, #'channel.open_ok'{}, State#ch{state = running}};
handle_method(#'channel.open'{}, _, _State) ->
rabbit_misc:protocol_error(
@@ -449,10 +409,6 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
-handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) ->
- rabbit_misc:protocol_error(
- command_invalid,
- "basic.publish received after channel.flow_ok{active=false}", []);
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
@@ -919,48 +875,12 @@ handle_method(#'channel.flow'{active = false}, _,
blocking = dict:from_list(Queues)}}
end;
-handle_method(#'channel.flow_ok'{active = Active}, _,
- State = #ch{flow = #flow{server = Active, client = Flow,
- pending = {_Ref, TRef}} = F})
- when Flow =:= not Active ->
- {ok, cancel} = timer:cancel(TRef),
- {noreply, State#ch{flow = F#flow{client = Active, pending = none}}};
-handle_method(#'channel.flow_ok'{active = Active}, _,
- State = #ch{flow = #flow{server = Flow, client = Flow,
- pending = {_Ref, TRef}}})
- when Flow =:= not Active ->
- {ok, cancel} = timer:cancel(TRef),
- {noreply, issue_flow(Flow, State)};
-handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) ->
- rabbit_misc:protocol_error(
- command_invalid, "unsolicited channel.flow_ok", []);
-handle_method(#'channel.flow_ok'{active = Active}, _, _State) ->
- rabbit_misc:protocol_error(
- command_invalid,
- "received channel.flow_ok{active=~w} has incorrect polarity", [Active]);
-
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
command_invalid, "unimplemented method", []).
%%----------------------------------------------------------------------------
-flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}})
- when Flow =:= not Active ->
- ok = clear_permission_cache(),
- noreply(issue_flow(Active, State));
-flow_control(Active, State = #ch{flow = F}) ->
- noreply(State#ch{flow = F#flow{server = Active}}).
-
-issue_flow(Active, State) ->
- ok = rabbit_writer:send_command(
- State#ch.writer_pid, #'channel.flow'{active = Active}),
- Ref = make_ref(),
- {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout,
- [self(), Ref]),
- State#ch{flow = #flow{server = Active, client = not Active,
- pending = {Ref, TRef}}}.
-
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 1989fb7b..faddffc1 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -31,14 +31,17 @@
-module(rabbit_heartbeat).
--export([start_heartbeat/2]).
+-export([start_heartbeat/2, pause_monitor/1, resume_monitor/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) ->
- rabbit_types:maybe({pid(), pid()})).
+-type(pids() :: rabbit_types:maybe({pid(), pid()})).
+
+-spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> pids()).
+-spec(pause_monitor/1 :: (pids()) -> 'ok').
+-spec(resume_monitor/1 :: (pids()) -> 'ok').
-endif.
@@ -70,20 +73,43 @@ start_heartbeat(Sock, TimeoutSec) ->
end}, Parent) end),
{Sender, Receiver}.
+pause_monitor(none) ->
+ ok;
+pause_monitor({_Sender, Receiver}) ->
+ Receiver ! pause,
+ ok.
+
+resume_monitor(none) ->
+ ok;
+resume_monitor({_Sender, Receiver}) ->
+ Receiver ! resume,
+ ok.
+
+%%----------------------------------------------------------------------------
+
heartbeater(Params, Parent) ->
heartbeater(Params, erlang:monitor(process, Parent), {0, 0}).
heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
MonitorRef, {StatVal, SameCount}) ->
+ Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end,
receive
{'DOWN', MonitorRef, process, _Object, _Info} ->
ok;
+ pause ->
+ receive
+ {'DOWN', MonitorRef, process, _Object, _Info} ->
+ ok;
+ resume ->
+ Recurse({0, 0});
+ Other ->
+ exit({unexpected_message, Other})
+ end;
Other ->
exit({unexpected_message, Other})
after TimeoutMillisec ->
case rabbit_net:getstat(Sock, [StatName]) of
{ok, [{StatName, NewStatVal}]} ->
- Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end,
if NewStatVal =/= StatVal ->
Recurse({NewStatVal, 0});
SameCount < Threshold ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 4b612f2a..57c23990 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -39,7 +39,7 @@
-export([init/1, mainloop/3]).
--export([server_properties/0]).
+-export([conserve_memory/2, server_properties/0]).
-export([analyze_frame/3]).
@@ -59,8 +59,8 @@
%---------------------------------------------------------------------------
--record(v1, {sock, connection, callback, recv_ref, connection_state,
- queue_collector, stats_timer}).
+-record(v1, {sock, connection, callback, recv_length, recv_ref,
+ connection_state, queue_collector, heartbeater, stats_timer}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
@@ -107,6 +107,17 @@
%% -> log error, mark channel as closing, *running*
%% handshake_timeout -> ignore, *running*
%% heartbeat timeout -> *throw*
+%% conserve_memory=true -> *blocking*
+%% blocking:
+%% conserve_memory=true -> *blocking*
+%% conserve_memory=false -> *running*
+%% receive a method frame for a content-bearing method
+%% -> process, stop receiving, *blocked*
+%% ...rest same as 'running'
+%% blocked:
+%% conserve_memory=true -> *blocked*
+%% conserve_memory=false -> resume receiving, *running*
+%% ...rest same as 'running'
%% closing:
%% socket close -> *terminate*
%% receive connection.close -> send connection.close_ok,
@@ -140,6 +151,11 @@
%%
%% TODO: refactor the code so that the above is obvious
+-define(IS_RUNNING(State),
+ (State#v1.connection_state =:= running orelse
+ State#v1.connection_state =:= blocking orelse
+ State#v1.connection_state =:= blocked)).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -149,6 +165,7 @@
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
-spec(emit_stats/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
+-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
-endif.
@@ -218,6 +235,10 @@ teardown_profiling(Value) ->
fprof:analyse([{dest, []}, {cols, 100}])
end.
+conserve_memory(Pid, Conserve) ->
+ Pid ! {conserve_memory, Conserve},
+ ok.
+
server_properties() ->
{ok, Product} = application:get_key(rabbit, id),
{ok, Version} = application:get_key(rabbit, vsn),
@@ -262,9 +283,11 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
client_properties = none,
protocol = none},
callback = uninitialized_callback,
+ recv_length = 0,
recv_ref = none,
connection_state = pre_init,
queue_collector = Collector,
+ heartbeater = none,
stats_timer =
rabbit_event:init_stats_timer()},
handshake, 8))
@@ -308,6 +331,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end;
{inet_async, Sock, Ref, {error, Reason}} ->
throw({inet_error, Reason});
+ {conserve_memory, Conserve} ->
+ mainloop(Parent, Deb, internal_conserve_memory(Conserve, State));
{'EXIT', Parent, Reason} ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -329,7 +354,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
terminate_connection ->
State;
handshake_timeout ->
- if State#v1.connection_state =:= running orelse
+ if ?IS_RUNNING(State) orelse
State#v1.connection_state =:= closing orelse
State#v1.connection_state =:= closed ->
mainloop(Parent, Deb, State);
@@ -367,19 +392,36 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
exit({unexpected_message, Other})
end.
-switch_callback(OldState, NewCallback, Length) ->
+switch_callback(State = #v1{connection_state = blocked,
+ heartbeater = Heartbeater}, Callback, Length) ->
+ ok = rabbit_heartbeat:pause_monitor(Heartbeater),
+ State#v1{callback = Callback, recv_length = Length, recv_ref = none};
+switch_callback(State, Callback, Length) ->
Ref = inet_op(fun () -> rabbit_net:async_recv(
- OldState#v1.sock, Length, infinity) end),
- OldState#v1{callback = NewCallback,
- recv_ref = Ref}.
+ State#v1.sock, Length, infinity) end),
+ State#v1{callback = Callback, recv_length = Length, recv_ref = Ref}.
-terminate(Explanation, State = #v1{connection_state = running}) ->
+terminate(Explanation, State) when ?IS_RUNNING(State) ->
{normal, send_exception(State, 0,
rabbit_misc:amqp_error(
connection_forced, Explanation, [], none))};
terminate(_Explanation, State) ->
{force, State}.
+internal_conserve_memory(true, State = #v1{connection_state = running}) ->
+ State#v1{connection_state = blocking};
+internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->
+ State#v1{connection_state = running};
+internal_conserve_memory(false, State = #v1{connection_state = blocked,
+ heartbeater = Heartbeater,
+ callback = Callback,
+ recv_length = Length,
+ recv_ref = none}) ->
+ ok = rabbit_heartbeat:resume_monitor(Heartbeater),
+ switch_callback(State#v1{connection_state = running}, Callback, Length);
+internal_conserve_memory(_Conserve, State) ->
+ State.
+
close_connection(State = #v1{queue_collector = Collector,
connection = #connection{
timeout_sec = TimeoutSec}}) ->
@@ -505,13 +547,20 @@ handle_frame(Type, Channel, Payload,
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
{chpid, ChPid} ->
+ ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
case AnalyzedFrame of
{method, 'channel.close', _} ->
- erase({channel, Channel});
- _ -> ok
- end,
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
- State;
+ erase({channel, Channel}),
+ State;
+ {method, MethodName, _} ->
+ case (State#v1.connection_state == blocking andalso
+ Protocol:method_has_content(MethodName)) of
+ true -> State#v1{connection_state = blocked};
+ false -> State
+ end;
+ _ ->
+ State
+ end;
closing ->
%% According to the spec, after sending a
%% channel.close we must ignore all frames except
@@ -531,12 +580,13 @@ handle_frame(Type, Channel, Payload,
end,
State;
undefined ->
- case State#v1.connection_state of
- running -> ok = send_to_new_channel(
- Channel, AnalyzedFrame, State),
- State;
- Other -> throw({channel_frame_while_starting,
- Channel, Other, AnalyzedFrame})
+ case ?IS_RUNNING(State) of
+ true -> ok = send_to_new_channel(
+ Channel, AnalyzedFrame, State),
+ State;
+ false -> throw({channel_frame_while_starting,
+ Channel, State#v1.connection_state,
+ AnalyzedFrame})
end
end
end.
@@ -649,13 +699,14 @@ handle_method0(MethodName, FieldsBin,
Reason#amqp_error{method = MethodName};
OtherReason -> OtherReason
end,
- case State#v1.connection_state of
- running -> send_exception(State, 0, CompleteReason);
+ case ?IS_RUNNING(State) of
+ true -> send_exception(State, 0, CompleteReason);
%% We don't trust the client at this point - force
%% them to wait for a bit so they can't DOS us with
%% repeated failed logins etc.
- Other -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, Other, CompleteReason})
+ false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({channel0_error, State#v1.connection_state,
+ CompleteReason})
end
end.
@@ -689,11 +740,13 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ?FRAME_MAX]);
true ->
- rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat),
+ Heartbeater = rabbit_heartbeat:start_heartbeat(
+ Sock, ClientHeartbeat),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
- frame_max = FrameMax}}
+ frame_max = FrameMax},
+ heartbeater = Heartbeater}
end;
handle_method0(#'connection.open'{virtual_host = VHostPath},
@@ -706,14 +759,14 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
State1 = State#v1{connection_state = running,
connection = NewConnection},
rabbit_event:notify(
connection_created,
[{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]),
State1;
-handle_method0(#'connection.close'{},
- State = #v1{connection_state = running}) ->
+handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
handle_method0(#'connection.close'{},
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 97960571..6812b8d4 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -67,7 +67,6 @@ all_tests() ->
passed = test_log_management(),
passed = test_app_management(),
passed = test_log_management_during_startup(),
- passed = test_memory_pressure(),
passed = test_statistics(),
passed = test_option_parser(),
passed = test_cluster_management(),
@@ -1066,44 +1065,6 @@ test_hooks() ->
end,
passed.
-test_memory_pressure_receiver(Pid) ->
- receive
- shutdown ->
- ok;
- {send_command, Method} ->
- ok = case Method of
- #'channel.flow'{} -> ok;
- #'basic.qos_ok'{} -> ok;
- #'channel.open_ok'{} -> ok
- end,
- Pid ! Method,
- test_memory_pressure_receiver(Pid);
- sync ->
- Pid ! sync,
- test_memory_pressure_receiver(Pid)
- end.
-
-test_memory_pressure_receive_flow(Active) ->
- receive #'channel.flow'{active = Active} -> ok
- after 1000 -> throw(failed_to_receive_channel_flow)
- end,
- receive #'channel.flow'{} ->
- throw(pipelining_sync_commands_detected)
- after 0 ->
- ok
- end.
-
-test_memory_pressure_sync(Ch, Writer) ->
- ok = rabbit_channel:do(Ch, #'basic.qos'{}),
- Writer ! sync,
- receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
- receive #'basic.qos_ok'{} -> ok
- after 1000 -> throw(failed_to_receive_basic_qos_ok)
- end.
-
-test_memory_pressure_spawn() ->
- test_spawn(fun test_memory_pressure_receiver/1).
-
test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
@@ -1116,91 +1077,6 @@ test_spawn(Receiver) ->
end,
{Writer, Ch, MRef}.
-expect_normal_channel_termination(MRef, Ch) ->
- receive {'DOWN', MRef, process, Ch, normal} -> ok
- after 1000 -> throw(channel_failed_to_exit)
- end.
-
-gobble_channel_exit() ->
- receive {channel_exit, _, _} -> ok
- after 1000 -> throw(channel_exit_not_received)
- end.
-
-test_memory_pressure() ->
- {Writer0, Ch0, MRef0} = test_memory_pressure_spawn(),
- [ok = rabbit_channel:conserve_memory(Ch0, Conserve) ||
- Conserve <- [false, false, true, false, true, true, false]],
- ok = test_memory_pressure_sync(Ch0, Writer0),
- receive {'DOWN', MRef0, process, Ch0, Info0} ->
- throw({channel_died_early, Info0})
- after 0 -> ok
- end,
-
- %% we should have just 1 active=false waiting for us
- ok = test_memory_pressure_receive_flow(false),
-
- %% if we reply with flow_ok, we should immediately get an
- %% active=true back
- ok = rabbit_channel:do(Ch0, #'channel.flow_ok'{active = false}),
- ok = test_memory_pressure_receive_flow(true),
-
- %% if we publish at this point, the channel should die
- Content = rabbit_basic:build_content(#'P_basic'{}, <<>>),
- ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content),
- expect_normal_channel_termination(MRef0, Ch0),
- gobble_channel_exit(),
-
- {Writer1, Ch1, MRef1} = test_memory_pressure_spawn(),
- ok = rabbit_channel:conserve_memory(Ch1, true),
- ok = test_memory_pressure_receive_flow(false),
- ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
- ok = test_memory_pressure_sync(Ch1, Writer1),
- ok = rabbit_channel:conserve_memory(Ch1, false),
- ok = test_memory_pressure_receive_flow(true),
- %% send back the wrong flow_ok. Channel should die.
- ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
- expect_normal_channel_termination(MRef1, Ch1),
- gobble_channel_exit(),
-
- {_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(),
- %% just out of the blue, send a flow_ok. Life should end.
- ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}),
- expect_normal_channel_termination(MRef2, Ch2),
- gobble_channel_exit(),
-
- {_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(),
- ok = rabbit_channel:conserve_memory(Ch3, true),
- ok = test_memory_pressure_receive_flow(false),
- receive {'DOWN', MRef3, process, Ch3, _} ->
- ok
- after 12000 ->
- throw(channel_failed_to_exit)
- end,
- gobble_channel_exit(),
-
- alarm_handler:set_alarm({vm_memory_high_watermark, []}),
- Me = self(),
- Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end),
- {ok, Ch4} = rabbit_channel:start_link(1, self(), Writer4,
- <<"user">>, <<"/">>, self()),
- ok = rabbit_channel:do(Ch4, #'channel.open'{}),
- MRef4 = erlang:monitor(process, Ch4),
- Writer4 ! sync,
- receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
- receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok)
- after 0 -> ok
- end,
- alarm_handler:clear_alarm(vm_memory_high_watermark),
- Writer4 ! sync,
- receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
- receive #'channel.open_ok'{} -> ok
- after 1000 -> throw(failed_to_receive_channel_open_ok)
- end,
- rabbit_channel:shutdown(Ch4),
- expect_normal_channel_termination(MRef4, Ch4),
-
- passed.
-
test_statistics_receiver(Pid) ->
receive
shutdown ->