summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-06-02 17:50:09 +0100
committerMatthias Radestock <matthias@lshift.net>2010-06-02 17:50:09 +0100
commitcdd4762425baa070eb0caf07b5876862d19fe0a7 (patch)
treed5d2bf5df933a918e72c1e7f7ae4f9e64c1c6a68
parent7069f77efe8b6810e084bc8455f217c7d1088750 (diff)
parent3f23842bba1f0c7431b829b0934399bb51f9aab9 (diff)
downloadrabbitmq-server-cdd4762425baa070eb0caf07b5876862d19fe0a7.tar.gz
merge bug21932 into default
-rw-r--r--src/rabbit_alarm.erl11
-rw-r--r--src/rabbit_channel.erl115
-rw-r--r--src/rabbit_tests.erl128
3 files changed, 224 insertions, 30 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 7e96d9a3..53c713e6 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -47,7 +47,7 @@
-type(mfa_tuple() :: {atom(), atom(), list()}).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(register/2 :: (pid(), mfa_tuple()) -> 'ok').
+-spec(register/2 :: (pid(), mfa_tuple()) -> boolean()).
-endif.
@@ -67,9 +67,9 @@ stop() ->
ok = alarm_handler:delete_alarm_handler(?MODULE).
register(Pid, HighMemMFA) ->
- ok = gen_event:call(alarm_handler, ?MODULE,
- {register, Pid, HighMemMFA},
- infinity).
+ gen_event:call(alarm_handler, ?MODULE,
+ {register, Pid, HighMemMFA},
+ infinity).
%%----------------------------------------------------------------------------
@@ -84,7 +84,8 @@ handle_call({register, Pid, {M, F, A} = HighMemMFA},
false -> ok
end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertess),
- {ok, ok, State#alarms{alertees = NewAlertees}};
+ {ok, State#alarms.vm_memory_high_watermark,
+ State#alarms{alertees = NewAlertees}};
handle_call(_Request, State) ->
{ok, not_understood, State}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 31bb54c0..8bc53b4a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -39,6 +39,8 @@
-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
+-export([flow_timeout/2]).
+
-export([init/1, terminate/2, code_change/3,
handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]).
@@ -46,9 +48,12 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, queue_collector_pid}).
+ consumer_mapping, blocking, queue_collector_pid, flow}).
+
+-record(flow, {server, client, pending}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
+-define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds
-define(INFO_KEYS,
[pid,
@@ -66,6 +71,8 @@
-ifdef(use_specs).
+-type(ref() :: any()).
+
-spec(start_link/6 ::
(channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()).
-spec(do/2 :: (pid(), amqp_method()) -> 'ok').
@@ -75,6 +82,7 @@
-spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
+-spec(flow_timeout/2 :: (pid(), ref()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (pid()) -> [info()]).
@@ -113,6 +121,9 @@ conserve_memory(Pid, Conserve) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
+flow_timeout(Pid, Ref) ->
+ gen_server2:pcast(Pid, 7, {flow_timeout, Ref}).
+
list() ->
pg_local:get_members(rabbit_channels).
@@ -154,7 +165,9 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
blocking = dict:new(),
- queue_collector_pid = CollectorPid},
+ queue_collector_pid = CollectorPid,
+ flow = #flow{server = true, client = true,
+ pending = none}},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -181,11 +194,9 @@ handle_cast({method, Method, Content}, State) ->
{stop, normal, State#ch{state = terminating}}
catch
exit:Reason = #amqp_error{} ->
- ok = rollback_and_notify(State),
MethodName = rabbit_misc:method_record_type(Method),
- State#ch.reader_pid ! {channel_exit, State#ch.channel,
- Reason#amqp_error{method = MethodName}},
- {stop, normal, State#ch{state = terminating}};
+ {stop, normal, terminating(Reason#amqp_error{method = MethodName},
+ State)};
exit:normal ->
{stop, normal, State};
_:Reason ->
@@ -209,11 +220,25 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_cast({conserve_memory, Conserve}, State) ->
- ok = clear_permission_cache(),
- ok = rabbit_writer:send_command(
- State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
- noreply(State).
+handle_cast({conserve_memory, true}, State = #ch{state = starting}) ->
+ noreply(State);
+handle_cast({conserve_memory, false}, State = #ch{state = starting}) ->
+ ok = rabbit_writer:send_command(State#ch.writer_pid, #'channel.open_ok'{}),
+ noreply(State#ch{state = running});
+handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) ->
+ flow_control(not Conserve, State);
+handle_cast({conserve_memory, _Conserve}, State) ->
+ noreply(State);
+
+handle_cast({flow_timeout, Ref},
+ State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) ->
+ {stop, normal, terminating(
+ rabbit_misc:amqp_error(
+ precondition_failed,
+ "timeout waiting for channel.flow_ok{active=~w}",
+ [not Flow], none), State)};
+handle_cast({flow_timeout, _Ref}, State) ->
+ {noreply, State}.
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
@@ -254,6 +279,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
+terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) ->
+ ok = rollback_and_notify(State),
+ Reader ! {channel_exit, Channel, Reason},
+ State#ch{state = terminating}.
+
return_queue_declare_ok(State, NoWait, Q) ->
NewState = State#ch{most_recently_declared_queue =
(Q#amqqueue.name)#resource.name},
@@ -369,8 +399,10 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
- {reply, #'channel.open_ok'{}, State#ch{state = running}};
+ case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of
+ true -> {noreply, State};
+ false -> {reply, #'channel.open_ok'{}, State#ch{state = running}}
+ end;
handle_method(#'channel.open'{}, _, _State) ->
rabbit_misc:protocol_error(
@@ -387,13 +419,17 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
-handle_method(#'basic.publish'{exchange = ExchangeNameBin,
+handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "basic.publish received after channel.flow_ok{active=false}", []);
+handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
- mandatory = Mandatory,
- immediate = Immediate},
- Content, State = #ch{ virtual_host = VHostPath,
- transaction_id = TxnKey,
- writer_pid = WriterPid}) ->
+ mandatory = Mandatory,
+ immediate = Immediate},
+ Content, State = #ch{virtual_host = VHostPath,
+ transaction_id = TxnKey,
+ writer_pid = WriterPid}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -822,7 +858,6 @@ handle_method(#'channel.flow'{active = true}, _,
end,
{reply, #'channel.flow_ok'{active = true},
State#ch{limiter_pid = LimiterPid1}};
-
handle_method(#'channel.flow'{active = false}, _,
State = #ch{limiter_pid = LimiterPid,
consumer_mapping = Consumers}) ->
@@ -840,11 +875,25 @@ handle_method(#'channel.flow'{active = false}, _,
blocking = dict:from_list(Queues)}}
end;
-handle_method(#'channel.flow_ok'{active = _}, _, State) ->
- %% TODO: We may want to correlate this to channel.flow messages we
- %% have sent, and complain if we get an unsolicited
- %% channel.flow_ok, or the client refuses our flow request.
- {noreply, State};
+handle_method(#'channel.flow_ok'{active = Active}, _,
+ State = #ch{flow = #flow{server = Active, client = Flow,
+ pending = {_Ref, TRef}} = F})
+ when Flow =:= not Active ->
+ {ok, cancel} = timer:cancel(TRef),
+ {noreply, State#ch{flow = F#flow{client = Active, pending = none}}};
+handle_method(#'channel.flow_ok'{active = Active}, _,
+ State = #ch{flow = #flow{server = Flow, client = Flow,
+ pending = {_Ref, TRef}}})
+ when Flow =:= not Active ->
+ {ok, cancel} = timer:cancel(TRef),
+ {noreply, issue_flow(Flow, State)};
+handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) ->
+ rabbit_misc:protocol_error(
+ command_invalid, "unsolicited channel.flow_ok", []);
+handle_method(#'channel.flow_ok'{active = Active}, _, _State) ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "received channel.flow_ok{active=~w} has incorrect polarity", [Active]);
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
@@ -852,6 +901,22 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
+flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}})
+ when Flow =:= not Active ->
+ ok = clear_permission_cache(),
+ noreply(issue_flow(Active, State));
+flow_control(Active, State = #ch{flow = F}) ->
+ noreply(State#ch{flow = F#flow{server = Active}}).
+
+issue_flow(Active, State) ->
+ ok = rabbit_writer:send_command(
+ State#ch.writer_pid, #'channel.flow'{active = Active}),
+ Ref = make_ref(),
+ {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout,
+ [self(), Ref]),
+ State#ch{flow = #flow{server = Active, client = not Active,
+ pending = {Ref, TRef}}}.
+
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) ->
%% FIXME: connection exception (!) on failure??
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index dc7f92d1..ecc2613d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -41,6 +41,7 @@
-import(lists).
-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
-include_lib("kernel/include/file.hrl").
test_content_prop_roundtrip(Datum, Binary) ->
@@ -58,6 +59,7 @@ all_tests() ->
passed = test_log_management(),
passed = test_app_management(),
passed = test_log_management_during_startup(),
+ passed = test_memory_pressure(),
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
@@ -839,6 +841,132 @@ test_hooks() ->
end,
passed.
+test_memory_pressure_receiver(Pid) ->
+ receive
+ shutdown ->
+ ok;
+ {send_command, Method} ->
+ ok = case Method of
+ #'channel.flow'{} -> ok;
+ #'basic.qos_ok'{} -> ok;
+ #'channel.open_ok'{} -> ok
+ end,
+ Pid ! Method,
+ test_memory_pressure_receiver(Pid);
+ sync ->
+ Pid ! sync,
+ test_memory_pressure_receiver(Pid)
+ end.
+
+test_memory_pressure_receive_flow(Active) ->
+ receive #'channel.flow'{active = Active} -> ok
+ after 1000 -> throw(failed_to_receive_channel_flow)
+ end,
+ receive #'channel.flow'{} ->
+ throw(pipelining_sync_commands_detected)
+ after 0 ->
+ ok
+ end.
+
+test_memory_pressure_sync(Ch, Writer) ->
+ ok = rabbit_channel:do(Ch, #'basic.qos'{}),
+ Writer ! sync,
+ receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
+ receive #'basic.qos_ok'{} -> ok
+ after 1000 -> throw(failed_to_receive_basic_qos_ok)
+ end.
+
+test_memory_pressure_spawn() ->
+ Me = self(),
+ Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end),
+ Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
+ self()),
+ ok = rabbit_channel:do(Ch, #'channel.open'{}),
+ MRef = erlang:monitor(process, Ch),
+ receive #'channel.open_ok'{} -> ok
+ after 1000 -> throw(failed_to_receive_channel_open_ok)
+ end,
+ {Writer, Ch, MRef}.
+
+expect_normal_channel_termination(MRef, Ch) ->
+ receive {'DOWN', MRef, process, Ch, normal} -> ok
+ after 1000 -> throw(channel_failed_to_exit)
+ end.
+
+test_memory_pressure() ->
+ {Writer0, Ch0, MRef0} = test_memory_pressure_spawn(),
+ [ok = rabbit_channel:conserve_memory(Ch0, Conserve) ||
+ Conserve <- [false, false, true, false, true, true, false]],
+ ok = test_memory_pressure_sync(Ch0, Writer0),
+ receive {'DOWN', MRef0, process, Ch0, Info0} ->
+ throw({channel_died_early, Info0})
+ after 0 -> ok
+ end,
+
+ %% we should have just 1 active=false waiting for us
+ ok = test_memory_pressure_receive_flow(false),
+
+ %% if we reply with flow_ok, we should immediately get an
+ %% active=true back
+ ok = rabbit_channel:do(Ch0, #'channel.flow_ok'{active = false}),
+ ok = test_memory_pressure_receive_flow(true),
+
+ %% if we publish at this point, the channel should die
+ Content = #content{class_id = element(1, rabbit_framing:method_id(
+ 'basic.publish')),
+ properties = none,
+ properties_bin = <<>>,
+ payload_fragments_rev = []},
+ ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content),
+ expect_normal_channel_termination(MRef0, Ch0),
+
+ {Writer1, Ch1, MRef1} = test_memory_pressure_spawn(),
+ ok = rabbit_channel:conserve_memory(Ch1, true),
+ ok = test_memory_pressure_receive_flow(false),
+ ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
+ ok = test_memory_pressure_sync(Ch1, Writer1),
+ ok = rabbit_channel:conserve_memory(Ch1, false),
+ ok = test_memory_pressure_receive_flow(true),
+ %% send back the wrong flow_ok. Channel should die.
+ ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
+ expect_normal_channel_termination(MRef1, Ch1),
+
+ {_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(),
+ %% just out of the blue, send a flow_ok. Life should end.
+ ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}),
+ expect_normal_channel_termination(MRef2, Ch2),
+
+ {_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(),
+ ok = rabbit_channel:conserve_memory(Ch3, true),
+ receive {'DOWN', MRef3, process, Ch3, _} ->
+ ok
+ after 12000 ->
+ throw(channel_failed_to_exit)
+ end,
+
+ alarm_handler:set_alarm({vm_memory_high_watermark, []}),
+ Me = self(),
+ Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end),
+ Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>,
+ self()),
+ ok = rabbit_channel:do(Ch4, #'channel.open'{}),
+ MRef4 = erlang:monitor(process, Ch4),
+ Writer4 ! sync,
+ receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
+ receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok)
+ after 0 -> ok
+ end,
+ alarm_handler:clear_alarm(vm_memory_high_watermark),
+ Writer4 ! sync,
+ receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
+ receive #'channel.open_ok'{} -> ok
+ after 1000 -> throw(failed_to_receive_channel_open_ok)
+ end,
+ rabbit_channel:shutdown(Ch4),
+ expect_normal_channel_termination(MRef4, Ch4),
+
+ passed.
+
test_delegates_async(SecondaryNode) ->
Self = self(),
Sender = fun (Pid) -> Pid ! {invoked, Self} end,