diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-05 15:56:46 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-05 15:56:46 +0100 |
commit | 145a1db8d13bdae5d1b6ab5747f7d8862593ffb9 (patch) | |
tree | d4ed9b42d7f456ef8cb206a9c714515f23a6cc8c | |
parent | ad6fde4fd7a44a44dda489f07248c0ba35ead067 (diff) | |
parent | dc81d515fa8843b142fcd1d6f3731bfa6930cfcc (diff) | |
download | rabbitmq-server-145a1db8d13bdae5d1b6ab5747f7d8862593ffb9.tar.gz |
merge default into bug23027
-rw-r--r-- | Makefile | 8 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 86 | ||||
-rw-r--r-- | src/rabbit_heartbeat.erl | 34 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 109 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 124 |
5 files changed, 122 insertions, 239 deletions
@@ -179,6 +179,14 @@ stop-rabbit-on-node: all force-snapshot: all echo "rabbit_persister:force_snapshot()." | $(ERL_CALL) +set-memory-alarm: all + echo "alarm_handler:set_alarm({vm_memory_high_watermark, []})." | \ + $(ERL_CALL) + +clear-memory-alarm: all + echo "alarm_handler:clear_alarm(vm_memory_high_watermark)." | \ + $(ERL_CALL) + stop-node: -$(ERL_CALL) -q diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 6ee6e209..edbcb2ea 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,12 +36,10 @@ -behaviour(gen_server2). -export([start_link/6, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). +-export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([emit_stats/1, flush/1]). --export([flow_timeout/2]). - -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). @@ -49,13 +47,9 @@ transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, queue_collector_pid, flow, - stats_timer}). - --record(flow, {server, client, pending}). + consumer_mapping, blocking, queue_collector_pid, stats_timer}). -define(MAX_PERMISSION_CACHE_SIZE, 12). --define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds -define(STATISTICS_KEYS, [pid, @@ -80,7 +74,6 @@ -export_type([channel_number/0]). --type(ref() :: any()). -type(channel_number() :: non_neg_integer()). -spec(start_link/6 :: @@ -94,9 +87,7 @@ -spec(deliver/4 :: (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). --spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). --spec(flow_timeout/2 :: (pid(), ref()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (pid()) -> [rabbit_types:info()]). @@ -128,15 +119,9 @@ send_command(Pid, Msg) -> deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). -conserve_memory(Pid, Conserve) -> - gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}). - flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). -flow_timeout(Pid, Ref) -> - gen_server2:pcast(Pid, 7, {flow_timeout, Ref}). - list() -> pg_local:get_members(rabbit_channels). @@ -185,8 +170,6 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> consumer_mapping = dict:new(), blocking = dict:new(), queue_collector_pid = CollectorPid, - flow = #flow{server = true, client = true, - pending = none}, stats_timer = rabbit_event:init_stats_timer()}, rabbit_event:notify( channel_created, @@ -252,26 +235,6 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, end, State), noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_cast({conserve_memory, true}, State = #ch{state = starting}) -> - noreply(State); -handle_cast({conserve_memory, false}, State = #ch{state = starting}) -> - ok = rabbit_writer:send_command(State#ch.writer_pid, #'channel.open_ok'{}), - noreply(State#ch{state = running}); -handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) -> - flow_control(not Conserve, State); -handle_cast({conserve_memory, _Conserve}, State) -> - noreply(State); - -handle_cast({flow_timeout, Ref}, - State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) -> - {stop, normal, terminating( - rabbit_misc:amqp_error( - precondition_failed, - "timeout waiting for channel.flow_ok{active=~w}", - [not Flow], none), State)}; -handle_cast({flow_timeout, _Ref}, State) -> - {noreply, State}; - handle_cast(emit_stats, State) -> internal_emit_stats(State), {noreply, State}. @@ -429,10 +392,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> - case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of - true -> {noreply, State}; - false -> {reply, #'channel.open_ok'{}, State#ch{state = running}} - end; + {reply, #'channel.open_ok'{}, State#ch{state = running}}; handle_method(#'channel.open'{}, _, _State) -> rabbit_misc:protocol_error( @@ -449,10 +409,6 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; -handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) -> - rabbit_misc:protocol_error( - command_invalid, - "basic.publish received after channel.flow_ok{active=false}", []); handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, @@ -919,48 +875,12 @@ handle_method(#'channel.flow'{active = false}, _, blocking = dict:from_list(Queues)}} end; -handle_method(#'channel.flow_ok'{active = Active}, _, - State = #ch{flow = #flow{server = Active, client = Flow, - pending = {_Ref, TRef}} = F}) - when Flow =:= not Active -> - {ok, cancel} = timer:cancel(TRef), - {noreply, State#ch{flow = F#flow{client = Active, pending = none}}}; -handle_method(#'channel.flow_ok'{active = Active}, _, - State = #ch{flow = #flow{server = Flow, client = Flow, - pending = {_Ref, TRef}}}) - when Flow =:= not Active -> - {ok, cancel} = timer:cancel(TRef), - {noreply, issue_flow(Flow, State)}; -handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) -> - rabbit_misc:protocol_error( - command_invalid, "unsolicited channel.flow_ok", []); -handle_method(#'channel.flow_ok'{active = Active}, _, _State) -> - rabbit_misc:protocol_error( - command_invalid, - "received channel.flow_ok{active=~w} has incorrect polarity", [Active]); - handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). %%---------------------------------------------------------------------------- -flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}}) - when Flow =:= not Active -> - ok = clear_permission_cache(), - noreply(issue_flow(Active, State)); -flow_control(Active, State = #ch{flow = F}) -> - noreply(State#ch{flow = F#flow{server = Active}}). - -issue_flow(Active, State) -> - ok = rabbit_writer:send_command( - State#ch.writer_pid, #'channel.flow'{active = Active}), - Ref = make_ref(), - {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout, - [self(), Ref]), - State#ch{flow = #flow{server = Active, client = not Active, - pending = {Ref, TRef}}}. - binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 1989fb7b..faddffc1 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -31,14 +31,17 @@ -module(rabbit_heartbeat). --export([start_heartbeat/2]). +-export([start_heartbeat/2, pause_monitor/1, resume_monitor/1]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> - rabbit_types:maybe({pid(), pid()})). +-type(pids() :: rabbit_types:maybe({pid(), pid()})). + +-spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> pids()). +-spec(pause_monitor/1 :: (pids()) -> 'ok'). +-spec(resume_monitor/1 :: (pids()) -> 'ok'). -endif. @@ -70,20 +73,43 @@ start_heartbeat(Sock, TimeoutSec) -> end}, Parent) end), {Sender, Receiver}. +pause_monitor(none) -> + ok; +pause_monitor({_Sender, Receiver}) -> + Receiver ! pause, + ok. + +resume_monitor(none) -> + ok; +resume_monitor({_Sender, Receiver}) -> + Receiver ! resume, + ok. + +%%---------------------------------------------------------------------------- + heartbeater(Params, Parent) -> heartbeater(Params, erlang:monitor(process, Parent), {0, 0}). heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, MonitorRef, {StatVal, SameCount}) -> + Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end, receive {'DOWN', MonitorRef, process, _Object, _Info} -> ok; + pause -> + receive + {'DOWN', MonitorRef, process, _Object, _Info} -> + ok; + resume -> + Recurse({0, 0}); + Other -> + exit({unexpected_message, Other}) + end; Other -> exit({unexpected_message, Other}) after TimeoutMillisec -> case rabbit_net:getstat(Sock, [StatName]) of {ok, [{StatName, NewStatVal}]} -> - Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end, if NewStatVal =/= StatVal -> Recurse({NewStatVal, 0}); SameCount < Threshold -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 4b612f2a..57c23990 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -39,7 +39,7 @@ -export([init/1, mainloop/3]). --export([server_properties/0]). +-export([conserve_memory/2, server_properties/0]). -export([analyze_frame/3]). @@ -59,8 +59,8 @@ %--------------------------------------------------------------------------- --record(v1, {sock, connection, callback, recv_ref, connection_state, - queue_collector, stats_timer}). +-record(v1, {sock, connection, callback, recv_length, recv_ref, + connection_state, queue_collector, heartbeater, stats_timer}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). @@ -107,6 +107,17 @@ %% -> log error, mark channel as closing, *running* %% handshake_timeout -> ignore, *running* %% heartbeat timeout -> *throw* +%% conserve_memory=true -> *blocking* +%% blocking: +%% conserve_memory=true -> *blocking* +%% conserve_memory=false -> *running* +%% receive a method frame for a content-bearing method +%% -> process, stop receiving, *blocked* +%% ...rest same as 'running' +%% blocked: +%% conserve_memory=true -> *blocked* +%% conserve_memory=false -> resume receiving, *running* +%% ...rest same as 'running' %% closing: %% socket close -> *terminate* %% receive connection.close -> send connection.close_ok, @@ -140,6 +151,11 @@ %% %% TODO: refactor the code so that the above is obvious +-define(IS_RUNNING(State), + (State#v1.connection_state =:= running orelse + State#v1.connection_state =:= blocking orelse + State#v1.connection_state =:= blocked)). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -149,6 +165,7 @@ -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). -spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). +-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). -endif. @@ -218,6 +235,10 @@ teardown_profiling(Value) -> fprof:analyse([{dest, []}, {cols, 100}]) end. +conserve_memory(Pid, Conserve) -> + Pid ! {conserve_memory, Conserve}, + ok. + server_properties() -> {ok, Product} = application:get_key(rabbit, id), {ok, Version} = application:get_key(rabbit, vsn), @@ -262,9 +283,11 @@ start_connection(Parent, Deb, Sock, SockTransform) -> client_properties = none, protocol = none}, callback = uninitialized_callback, + recv_length = 0, recv_ref = none, connection_state = pre_init, queue_collector = Collector, + heartbeater = none, stats_timer = rabbit_event:init_stats_timer()}, handshake, 8)) @@ -308,6 +331,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end; {inet_async, Sock, Ref, {error, Reason}} -> throw({inet_error, Reason}); + {conserve_memory, Conserve} -> + mainloop(Parent, Deb, internal_conserve_memory(Conserve, State)); {'EXIT', Parent, Reason} -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), @@ -329,7 +354,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> terminate_connection -> State; handshake_timeout -> - if State#v1.connection_state =:= running orelse + if ?IS_RUNNING(State) orelse State#v1.connection_state =:= closing orelse State#v1.connection_state =:= closed -> mainloop(Parent, Deb, State); @@ -367,19 +392,36 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> exit({unexpected_message, Other}) end. -switch_callback(OldState, NewCallback, Length) -> +switch_callback(State = #v1{connection_state = blocked, + heartbeater = Heartbeater}, Callback, Length) -> + ok = rabbit_heartbeat:pause_monitor(Heartbeater), + State#v1{callback = Callback, recv_length = Length, recv_ref = none}; +switch_callback(State, Callback, Length) -> Ref = inet_op(fun () -> rabbit_net:async_recv( - OldState#v1.sock, Length, infinity) end), - OldState#v1{callback = NewCallback, - recv_ref = Ref}. + State#v1.sock, Length, infinity) end), + State#v1{callback = Callback, recv_length = Length, recv_ref = Ref}. -terminate(Explanation, State = #v1{connection_state = running}) -> +terminate(Explanation, State) when ?IS_RUNNING(State) -> {normal, send_exception(State, 0, rabbit_misc:amqp_error( connection_forced, Explanation, [], none))}; terminate(_Explanation, State) -> {force, State}. +internal_conserve_memory(true, State = #v1{connection_state = running}) -> + State#v1{connection_state = blocking}; +internal_conserve_memory(false, State = #v1{connection_state = blocking}) -> + State#v1{connection_state = running}; +internal_conserve_memory(false, State = #v1{connection_state = blocked, + heartbeater = Heartbeater, + callback = Callback, + recv_length = Length, + recv_ref = none}) -> + ok = rabbit_heartbeat:resume_monitor(Heartbeater), + switch_callback(State#v1{connection_state = running}, Callback, Length); +internal_conserve_memory(_Conserve, State) -> + State. + close_connection(State = #v1{queue_collector = Collector, connection = #connection{ timeout_sec = TimeoutSec}}) -> @@ -505,13 +547,20 @@ handle_frame(Type, Channel, Payload, %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of {chpid, ChPid} -> + ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame), case AnalyzedFrame of {method, 'channel.close', _} -> - erase({channel, Channel}); - _ -> ok - end, - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame), - State; + erase({channel, Channel}), + State; + {method, MethodName, _} -> + case (State#v1.connection_state == blocking andalso + Protocol:method_has_content(MethodName)) of + true -> State#v1{connection_state = blocked}; + false -> State + end; + _ -> + State + end; closing -> %% According to the spec, after sending a %% channel.close we must ignore all frames except @@ -531,12 +580,13 @@ handle_frame(Type, Channel, Payload, end, State; undefined -> - case State#v1.connection_state of - running -> ok = send_to_new_channel( - Channel, AnalyzedFrame, State), - State; - Other -> throw({channel_frame_while_starting, - Channel, Other, AnalyzedFrame}) + case ?IS_RUNNING(State) of + true -> ok = send_to_new_channel( + Channel, AnalyzedFrame, State), + State; + false -> throw({channel_frame_while_starting, + Channel, State#v1.connection_state, + AnalyzedFrame}) end end end. @@ -649,13 +699,14 @@ handle_method0(MethodName, FieldsBin, Reason#amqp_error{method = MethodName}; OtherReason -> OtherReason end, - case State#v1.connection_state of - running -> send_exception(State, 0, CompleteReason); + case ?IS_RUNNING(State) of + true -> send_exception(State, 0, CompleteReason); %% We don't trust the client at this point - force %% them to wait for a bit so they can't DOS us with %% repeated failed logins etc. - Other -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), - throw({channel0_error, Other, CompleteReason}) + false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), + throw({channel0_error, State#v1.connection_state, + CompleteReason}) end end. @@ -689,11 +740,13 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat), + Heartbeater = rabbit_heartbeat:start_heartbeat( + Sock, ClientHeartbeat), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, - frame_max = FrameMax}} + frame_max = FrameMax}, + heartbeater = Heartbeater} end; handle_method0(#'connection.open'{virtual_host = VHostPath}, @@ -706,14 +759,14 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), + rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), State1 = State#v1{connection_state = running, connection = NewConnection}, rabbit_event:notify( connection_created, [{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]), State1; -handle_method0(#'connection.close'{}, - State = #v1{connection_state = running}) -> +handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); handle_method0(#'connection.close'{}, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 97960571..6812b8d4 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -67,7 +67,6 @@ all_tests() -> passed = test_log_management(), passed = test_app_management(), passed = test_log_management_during_startup(), - passed = test_memory_pressure(), passed = test_statistics(), passed = test_option_parser(), passed = test_cluster_management(), @@ -1066,44 +1065,6 @@ test_hooks() -> end, passed. -test_memory_pressure_receiver(Pid) -> - receive - shutdown -> - ok; - {send_command, Method} -> - ok = case Method of - #'channel.flow'{} -> ok; - #'basic.qos_ok'{} -> ok; - #'channel.open_ok'{} -> ok - end, - Pid ! Method, - test_memory_pressure_receiver(Pid); - sync -> - Pid ! sync, - test_memory_pressure_receiver(Pid) - end. - -test_memory_pressure_receive_flow(Active) -> - receive #'channel.flow'{active = Active} -> ok - after 1000 -> throw(failed_to_receive_channel_flow) - end, - receive #'channel.flow'{} -> - throw(pipelining_sync_commands_detected) - after 0 -> - ok - end. - -test_memory_pressure_sync(Ch, Writer) -> - ok = rabbit_channel:do(Ch, #'basic.qos'{}), - Writer ! sync, - receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, - receive #'basic.qos_ok'{} -> ok - after 1000 -> throw(failed_to_receive_basic_qos_ok) - end. - -test_memory_pressure_spawn() -> - test_spawn(fun test_memory_pressure_receiver/1). - test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), @@ -1116,91 +1077,6 @@ test_spawn(Receiver) -> end, {Writer, Ch, MRef}. -expect_normal_channel_termination(MRef, Ch) -> - receive {'DOWN', MRef, process, Ch, normal} -> ok - after 1000 -> throw(channel_failed_to_exit) - end. - -gobble_channel_exit() -> - receive {channel_exit, _, _} -> ok - after 1000 -> throw(channel_exit_not_received) - end. - -test_memory_pressure() -> - {Writer0, Ch0, MRef0} = test_memory_pressure_spawn(), - [ok = rabbit_channel:conserve_memory(Ch0, Conserve) || - Conserve <- [false, false, true, false, true, true, false]], - ok = test_memory_pressure_sync(Ch0, Writer0), - receive {'DOWN', MRef0, process, Ch0, Info0} -> - throw({channel_died_early, Info0}) - after 0 -> ok - end, - - %% we should have just 1 active=false waiting for us - ok = test_memory_pressure_receive_flow(false), - - %% if we reply with flow_ok, we should immediately get an - %% active=true back - ok = rabbit_channel:do(Ch0, #'channel.flow_ok'{active = false}), - ok = test_memory_pressure_receive_flow(true), - - %% if we publish at this point, the channel should die - Content = rabbit_basic:build_content(#'P_basic'{}, <<>>), - ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content), - expect_normal_channel_termination(MRef0, Ch0), - gobble_channel_exit(), - - {Writer1, Ch1, MRef1} = test_memory_pressure_spawn(), - ok = rabbit_channel:conserve_memory(Ch1, true), - ok = test_memory_pressure_receive_flow(false), - ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), - ok = test_memory_pressure_sync(Ch1, Writer1), - ok = rabbit_channel:conserve_memory(Ch1, false), - ok = test_memory_pressure_receive_flow(true), - %% send back the wrong flow_ok. Channel should die. - ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), - expect_normal_channel_termination(MRef1, Ch1), - gobble_channel_exit(), - - {_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(), - %% just out of the blue, send a flow_ok. Life should end. - ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}), - expect_normal_channel_termination(MRef2, Ch2), - gobble_channel_exit(), - - {_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(), - ok = rabbit_channel:conserve_memory(Ch3, true), - ok = test_memory_pressure_receive_flow(false), - receive {'DOWN', MRef3, process, Ch3, _} -> - ok - after 12000 -> - throw(channel_failed_to_exit) - end, - gobble_channel_exit(), - - alarm_handler:set_alarm({vm_memory_high_watermark, []}), - Me = self(), - Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end), - {ok, Ch4} = rabbit_channel:start_link(1, self(), Writer4, - <<"user">>, <<"/">>, self()), - ok = rabbit_channel:do(Ch4, #'channel.open'{}), - MRef4 = erlang:monitor(process, Ch4), - Writer4 ! sync, - receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, - receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok) - after 0 -> ok - end, - alarm_handler:clear_alarm(vm_memory_high_watermark), - Writer4 ! sync, - receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, - receive #'channel.open_ok'{} -> ok - after 1000 -> throw(failed_to_receive_channel_open_ok) - end, - rabbit_channel:shutdown(Ch4), - expect_normal_channel_termination(MRef4, Ch4), - - passed. - test_statistics_receiver(Pid) -> receive shutdown -> |