diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-06-02 17:50:09 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-06-02 17:50:09 +0100 |
commit | cdd4762425baa070eb0caf07b5876862d19fe0a7 (patch) | |
tree | d5d2bf5df933a918e72c1e7f7ae4f9e64c1c6a68 | |
parent | 7069f77efe8b6810e084bc8455f217c7d1088750 (diff) | |
parent | 3f23842bba1f0c7431b829b0934399bb51f9aab9 (diff) | |
download | rabbitmq-server-cdd4762425baa070eb0caf07b5876862d19fe0a7.tar.gz |
merge bug21932 into default
-rw-r--r-- | src/rabbit_alarm.erl | 11 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 115 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 128 |
3 files changed, 224 insertions, 30 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 7e96d9a3..53c713e6 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -47,7 +47,7 @@ -type(mfa_tuple() :: {atom(), atom(), list()}). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(register/2 :: (pid(), mfa_tuple()) -> 'ok'). +-spec(register/2 :: (pid(), mfa_tuple()) -> boolean()). -endif. @@ -67,9 +67,9 @@ stop() -> ok = alarm_handler:delete_alarm_handler(?MODULE). register(Pid, HighMemMFA) -> - ok = gen_event:call(alarm_handler, ?MODULE, - {register, Pid, HighMemMFA}, - infinity). + gen_event:call(alarm_handler, ?MODULE, + {register, Pid, HighMemMFA}, + infinity). %%---------------------------------------------------------------------------- @@ -84,7 +84,8 @@ handle_call({register, Pid, {M, F, A} = HighMemMFA}, false -> ok end, NewAlertees = dict:store(Pid, HighMemMFA, Alertess), - {ok, ok, State#alarms{alertees = NewAlertees}}; + {ok, State#alarms.vm_memory_high_watermark, + State#alarms{alertees = NewAlertees}}; handle_call(_Request, State) -> {ok, not_understood, State}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 31bb54c0..8bc53b4a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -39,6 +39,8 @@ -export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). +-export([flow_timeout/2]). + -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). @@ -46,9 +48,12 @@ transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, queue_collector_pid}). + consumer_mapping, blocking, queue_collector_pid, flow}). + +-record(flow, {server, client, pending}). -define(MAX_PERMISSION_CACHE_SIZE, 12). +-define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds -define(INFO_KEYS, [pid, @@ -66,6 +71,8 @@ -ifdef(use_specs). +-type(ref() :: any()). + -spec(start_link/6 :: (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()). -spec(do/2 :: (pid(), amqp_method()) -> 'ok'). @@ -75,6 +82,7 @@ -spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). +-spec(flow_timeout/2 :: (pid(), ref()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (pid()) -> [info()]). @@ -113,6 +121,9 @@ conserve_memory(Pid, Conserve) -> flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). +flow_timeout(Pid, Ref) -> + gen_server2:pcast(Pid, 7, {flow_timeout, Ref}). + list() -> pg_local:get_members(rabbit_channels). @@ -154,7 +165,9 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), blocking = dict:new(), - queue_collector_pid = CollectorPid}, + queue_collector_pid = CollectorPid, + flow = #flow{server = true, client = true, + pending = none}}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -181,11 +194,9 @@ handle_cast({method, Method, Content}, State) -> {stop, normal, State#ch{state = terminating}} catch exit:Reason = #amqp_error{} -> - ok = rollback_and_notify(State), MethodName = rabbit_misc:method_record_type(Method), - State#ch.reader_pid ! {channel_exit, State#ch.channel, - Reason#amqp_error{method = MethodName}}, - {stop, normal, State#ch{state = terminating}}; + {stop, normal, terminating(Reason#amqp_error{method = MethodName}, + State)}; exit:normal -> {stop, normal, State}; _:Reason -> @@ -209,11 +220,25 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_cast({conserve_memory, Conserve}, State) -> - ok = clear_permission_cache(), - ok = rabbit_writer:send_command( - State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), - noreply(State). +handle_cast({conserve_memory, true}, State = #ch{state = starting}) -> + noreply(State); +handle_cast({conserve_memory, false}, State = #ch{state = starting}) -> + ok = rabbit_writer:send_command(State#ch.writer_pid, #'channel.open_ok'{}), + noreply(State#ch{state = running}); +handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) -> + flow_control(not Conserve, State); +handle_cast({conserve_memory, _Conserve}, State) -> + noreply(State); + +handle_cast({flow_timeout, Ref}, + State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) -> + {stop, normal, terminating( + rabbit_misc:amqp_error( + precondition_failed, + "timeout waiting for channel.flow_ok{active=~w}", + [not Flow], none), State)}; +handle_cast({flow_timeout, _Ref}, State) -> + {noreply, State}. handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> @@ -254,6 +279,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. +terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> + ok = rollback_and_notify(State), + Reader ! {channel_exit, Channel, Reason}, + State#ch{state = terminating}. + return_queue_declare_ok(State, NoWait, Q) -> NewState = State#ch{most_recently_declared_queue = (Q#amqqueue.name)#resource.name}, @@ -369,8 +399,10 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - {reply, #'channel.open_ok'{}, State#ch{state = running}}; + case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of + true -> {noreply, State}; + false -> {reply, #'channel.open_ok'{}, State#ch{state = running}} + end; handle_method(#'channel.open'{}, _, _State) -> rabbit_misc:protocol_error( @@ -387,13 +419,17 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; -handle_method(#'basic.publish'{exchange = ExchangeNameBin, +handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) -> + rabbit_misc:protocol_error( + command_invalid, + "basic.publish received after channel.flow_ok{active=false}", []); +handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, - mandatory = Mandatory, - immediate = Immediate}, - Content, State = #ch{ virtual_host = VHostPath, - transaction_id = TxnKey, - writer_pid = WriterPid}) -> + mandatory = Mandatory, + immediate = Immediate}, + Content, State = #ch{virtual_host = VHostPath, + transaction_id = TxnKey, + writer_pid = WriterPid}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -822,7 +858,6 @@ handle_method(#'channel.flow'{active = true}, _, end, {reply, #'channel.flow_ok'{active = true}, State#ch{limiter_pid = LimiterPid1}}; - handle_method(#'channel.flow'{active = false}, _, State = #ch{limiter_pid = LimiterPid, consumer_mapping = Consumers}) -> @@ -840,11 +875,25 @@ handle_method(#'channel.flow'{active = false}, _, blocking = dict:from_list(Queues)}} end; -handle_method(#'channel.flow_ok'{active = _}, _, State) -> - %% TODO: We may want to correlate this to channel.flow messages we - %% have sent, and complain if we get an unsolicited - %% channel.flow_ok, or the client refuses our flow request. - {noreply, State}; +handle_method(#'channel.flow_ok'{active = Active}, _, + State = #ch{flow = #flow{server = Active, client = Flow, + pending = {_Ref, TRef}} = F}) + when Flow =:= not Active -> + {ok, cancel} = timer:cancel(TRef), + {noreply, State#ch{flow = F#flow{client = Active, pending = none}}}; +handle_method(#'channel.flow_ok'{active = Active}, _, + State = #ch{flow = #flow{server = Flow, client = Flow, + pending = {_Ref, TRef}}}) + when Flow =:= not Active -> + {ok, cancel} = timer:cancel(TRef), + {noreply, issue_flow(Flow, State)}; +handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) -> + rabbit_misc:protocol_error( + command_invalid, "unsolicited channel.flow_ok", []); +handle_method(#'channel.flow_ok'{active = Active}, _, _State) -> + rabbit_misc:protocol_error( + command_invalid, + "received channel.flow_ok{active=~w} has incorrect polarity", [Active]); handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( @@ -852,6 +901,22 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- +flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}}) + when Flow =:= not Active -> + ok = clear_permission_cache(), + noreply(issue_flow(Active, State)); +flow_control(Active, State = #ch{flow = F}) -> + noreply(State#ch{flow = F#flow{server = Active}}). + +issue_flow(Active, State) -> + ok = rabbit_writer:send_command( + State#ch.writer_pid, #'channel.flow'{active = Active}), + Ref = make_ref(), + {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout, + [self(), Ref]), + State#ch{flow = #flow{server = Active, client = not Active, + pending = {Ref, TRef}}}. + binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) -> %% FIXME: connection exception (!) on failure?? diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index dc7f92d1..ecc2613d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -41,6 +41,7 @@ -import(lists). -include("rabbit.hrl"). +-include("rabbit_framing.hrl"). -include_lib("kernel/include/file.hrl"). test_content_prop_roundtrip(Datum, Binary) -> @@ -58,6 +59,7 @@ all_tests() -> passed = test_log_management(), passed = test_app_management(), passed = test_log_management_during_startup(), + passed = test_memory_pressure(), passed = test_cluster_management(), passed = test_user_management(), passed = test_server_status(), @@ -839,6 +841,132 @@ test_hooks() -> end, passed. +test_memory_pressure_receiver(Pid) -> + receive + shutdown -> + ok; + {send_command, Method} -> + ok = case Method of + #'channel.flow'{} -> ok; + #'basic.qos_ok'{} -> ok; + #'channel.open_ok'{} -> ok + end, + Pid ! Method, + test_memory_pressure_receiver(Pid); + sync -> + Pid ! sync, + test_memory_pressure_receiver(Pid) + end. + +test_memory_pressure_receive_flow(Active) -> + receive #'channel.flow'{active = Active} -> ok + after 1000 -> throw(failed_to_receive_channel_flow) + end, + receive #'channel.flow'{} -> + throw(pipelining_sync_commands_detected) + after 0 -> + ok + end. + +test_memory_pressure_sync(Ch, Writer) -> + ok = rabbit_channel:do(Ch, #'basic.qos'{}), + Writer ! sync, + receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, + receive #'basic.qos_ok'{} -> ok + after 1000 -> throw(failed_to_receive_basic_qos_ok) + end. + +test_memory_pressure_spawn() -> + Me = self(), + Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end), + Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, + self()), + ok = rabbit_channel:do(Ch, #'channel.open'{}), + MRef = erlang:monitor(process, Ch), + receive #'channel.open_ok'{} -> ok + after 1000 -> throw(failed_to_receive_channel_open_ok) + end, + {Writer, Ch, MRef}. + +expect_normal_channel_termination(MRef, Ch) -> + receive {'DOWN', MRef, process, Ch, normal} -> ok + after 1000 -> throw(channel_failed_to_exit) + end. + +test_memory_pressure() -> + {Writer0, Ch0, MRef0} = test_memory_pressure_spawn(), + [ok = rabbit_channel:conserve_memory(Ch0, Conserve) || + Conserve <- [false, false, true, false, true, true, false]], + ok = test_memory_pressure_sync(Ch0, Writer0), + receive {'DOWN', MRef0, process, Ch0, Info0} -> + throw({channel_died_early, Info0}) + after 0 -> ok + end, + + %% we should have just 1 active=false waiting for us + ok = test_memory_pressure_receive_flow(false), + + %% if we reply with flow_ok, we should immediately get an + %% active=true back + ok = rabbit_channel:do(Ch0, #'channel.flow_ok'{active = false}), + ok = test_memory_pressure_receive_flow(true), + + %% if we publish at this point, the channel should die + Content = #content{class_id = element(1, rabbit_framing:method_id( + 'basic.publish')), + properties = none, + properties_bin = <<>>, + payload_fragments_rev = []}, + ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content), + expect_normal_channel_termination(MRef0, Ch0), + + {Writer1, Ch1, MRef1} = test_memory_pressure_spawn(), + ok = rabbit_channel:conserve_memory(Ch1, true), + ok = test_memory_pressure_receive_flow(false), + ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), + ok = test_memory_pressure_sync(Ch1, Writer1), + ok = rabbit_channel:conserve_memory(Ch1, false), + ok = test_memory_pressure_receive_flow(true), + %% send back the wrong flow_ok. Channel should die. + ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), + expect_normal_channel_termination(MRef1, Ch1), + + {_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(), + %% just out of the blue, send a flow_ok. Life should end. + ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}), + expect_normal_channel_termination(MRef2, Ch2), + + {_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(), + ok = rabbit_channel:conserve_memory(Ch3, true), + receive {'DOWN', MRef3, process, Ch3, _} -> + ok + after 12000 -> + throw(channel_failed_to_exit) + end, + + alarm_handler:set_alarm({vm_memory_high_watermark, []}), + Me = self(), + Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end), + Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>, + self()), + ok = rabbit_channel:do(Ch4, #'channel.open'{}), + MRef4 = erlang:monitor(process, Ch4), + Writer4 ! sync, + receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, + receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok) + after 0 -> ok + end, + alarm_handler:clear_alarm(vm_memory_high_watermark), + Writer4 ! sync, + receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, + receive #'channel.open_ok'{} -> ok + after 1000 -> throw(failed_to_receive_channel_open_ok) + end, + rabbit_channel:shutdown(Ch4), + expect_normal_channel_termination(MRef4, Ch4), + + passed. + test_delegates_async(SecondaryNode) -> Self = self(), Sender = fun (Pid) -> Pid ! {invoked, Self} end, |