summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-02-18 18:09:18 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-02-18 18:09:18 +0000
commit0c275d9b01f77e47baac84cac83bf02a883fa32b (patch)
tree6f1196e500bfccd368dd71a6ba21ca10c1c74c90
parent1f3f3b0c6c9d5aa181b3bb9306e383b22ea1d9f1 (diff)
parent2970f6af9cc43ce2b9b44a3b116cb1dfe6b51493 (diff)
downloadrabbitmq-server-0c275d9b01f77e47baac84cac83bf02a883fa32b.tar.gz
Merge in default
-rw-r--r--src/rabbit_amqqueue_process.erl10
-rw-r--r--src/rabbit_channel.erl54
-rw-r--r--src/rabbit_limiter.erl142
-rw-r--r--src/rabbit_misc.erl51
-rw-r--r--src/rabbit_tests.erl26
5 files changed, 242 insertions, 41 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e794b4aa..053878e8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -336,7 +336,9 @@ ch_record_state_transition(OldCR, NewCR) ->
deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
State = #q{q = #amqqueue{name = QName},
active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers}) ->
+ blocked_consumers = BlockedConsumers,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
@@ -346,7 +348,8 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
acktags = ChAckTags} = ch_record(ChPid),
IsMsgReady = PredFun(FunAcc, State),
case (IsMsgReady andalso
- rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of
+ rabbit_limiter:can_send( LimiterPid, self(), AckRequired,
+ ConsumerTag, BQ:len(BQS) )) of
true ->
{{Message, IsDelivered, AckTag}, FunAcc1, State1} =
DeliverFun(AckRequired, FunAcc, State),
@@ -1078,7 +1081,8 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
true ->
ok
end,
- NewLimited = Limited andalso LimiterPid =/= undefined,
+ NewLimited = Limited andalso LimiterPid =/= undefined
+ andalso rabbit_limiter:is_blocked(LimiterPid),
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
end));
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 73c031de..7c57f568 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -469,6 +469,7 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
check_name(_Kind, NameBin) ->
NameBin.
+%% TODO port this(?)
queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
case dict:find(QPid, Blocking) of
error -> State;
@@ -1047,6 +1048,47 @@ handle_method(#'channel.flow'{active = false}, _,
{noreply, State1#ch{blocking = dict:from_list(Queues)}}
end;
+handle_method(#'basic.credit'{consumer_tag = CTag,
+ credit = Credit,
+ count = Count,
+ drain = Drain}, _,
+ State = #ch{limiter_pid = LimiterPid,
+ consumer_mapping = Consumers}) ->
+ %% We get Available first because it's likely that as soon as we set
+ %% the credit msgs will get consumed and it'll be out of date. Why do we
+ %% want that? Because at least then it's consistent with the credit value
+ %% we return. And Available is always going to be racy.
+ Available = case dict:find(CTag, Consumers) of
+ {ok, QName} ->
+ case rabbit_amqqueue:with(
+ QName, fun (Q) -> rabbit_amqqueue:stat(Q) end) of
+ {ok, Len, _} -> Len;
+ _ -> -1
+ end;
+ error -> -1
+ end,
+ LimiterPid1 = case LimiterPid of
+ undefined -> start_limiter(State);
+ Other -> Other
+ end,
+ LimiterPid2 =
+ case rabbit_limiter:set_credit(LimiterPid1, CTag, Credit, Count, Drain) of
+ ok -> limit_queues(LimiterPid1, State),
+ LimiterPid1;
+ stopped -> unlimit_queues(State)
+ end,
+ State1 = State#ch{limiter_pid = LimiterPid2},
+ return_ok(State1, false, #'basic.credit_ok'{available = Available});
+
+ %% TODO port this bit ?
+ %% case consumer_queues(Consumers) of
+ %% [] -> {reply, #'channel.flow_ok'{active = false}, State1};
+ %% QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} ||
+ %% QPid <- QPids],
+ %% ok = rabbit_amqqueue:flush_all(QPids, self()),
+ %% {noreply, State1#ch{blocking = dict:from_list(Queues)}}
+ %% end;
+
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
command_invalid, "unimplemented method", []).
@@ -1239,12 +1281,12 @@ consumer_queues(Consumers) ->
notify_limiter(undefined, _Acked) ->
ok;
notify_limiter(LimiterPid, Acked) ->
- case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
- end, 0, Acked) of
- 0 -> ok;
- Count -> rabbit_limiter:ack(LimiterPid, Count)
- end.
+ %% TODO this could be faster, group the acks
+ rabbit_misc:queue_fold(
+ fun ({_, none, _}, Acc) -> Acc;
+ ({_, CTag, _}, Acc) -> rabbit_limiter:ack(LimiterPid, CTag),
+ Acc
+ end, ok, Acked).
is_message_persistent(Content) ->
case rabbit_basic:is_message_persistent(Content) of
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 1b72dd76..bf4ca797 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -15,14 +15,17 @@
%%
-module(rabbit_limiter).
+-include("rabbit_framing.hrl").
-behaviour(gen_server2).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/3]).
-export([start_link/2]).
--export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
--export([get_limit/1, block/1, unblock/1, is_blocked/1]).
+-export([limit/2, can_send/5, ack/2, register/2, unregister/2]).
+-export([get_limit/1, block/1, unblock/1, set_credit/5, is_blocked/1]).
+
+-import(rabbit_misc, [serial_add/2, serial_diff/2]).
%%----------------------------------------------------------------------------
@@ -33,8 +36,8 @@
-spec(start_link/2 :: (pid(), non_neg_integer()) ->
rabbit_types:ok_pid_or_error()).
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped').
--spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
--spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
+-spec(can_send/5 :: (maybe_pid(), pid(), boolean(), binary(), non_neg_integer()) -> boolean()).
+-spec(ack/2 :: (maybe_pid(), binary()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()).
@@ -49,8 +52,12 @@
-record(lim, {prefetch_count = 0,
ch_pid,
blocked = false,
+ credits = dict:new(),
queues = dict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
+
+-record(credit, {count = 0, credit = 0, drain = false}).
+
%% 'Notify' is a boolean that indicates whether a queue should be
%% notified of a change in the limit or volume that may allow it to
%% deliver more messages via the limiter's channel.
@@ -69,18 +76,19 @@ limit(LimiterPid, PrefetchCount) ->
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
-can_send(undefined, _QPid, _AckRequired) ->
+can_send(undefined, _QPid, _AckRequired, _CTag, _Len) ->
true;
-can_send(LimiterPid, QPid, AckRequired) ->
+can_send(LimiterPid, QPid, AckRequired, CTag, Len) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
- fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired},
+ fun () -> gen_server2:call(LimiterPid,
+ {can_send, QPid, AckRequired, CTag, Len},
infinity) end).
%% Let the limiter know that the channel has received some acks from a
%% consumer
-ack(undefined, _Count) -> ok;
-ack(LimiterPid, Count) -> gen_server2:cast(LimiterPid, {ack, Count}).
+ack(undefined, _CTag) -> ok;
+ack(LimiterPid, CTag) -> gen_server2:cast(LimiterPid, {ack, CTag}).
register(undefined, _QPid) -> ok;
register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}).
@@ -105,6 +113,11 @@ unblock(undefined) ->
unblock(LimiterPid) ->
gen_server2:call(LimiterPid, unblock, infinity).
+set_credit(undefined, _, _, _, _) ->
+ ok;
+set_credit(LimiterPid, CTag, Credit, Count, Drain) ->
+ gen_server2:call(LimiterPid, {set_credit, CTag, Credit, Count, Drain}, infinity).
+
is_blocked(undefined) ->
false;
is_blocked(LimiterPid) ->
@@ -120,23 +133,26 @@ init([ChPid, UnackedMsgCount]) ->
prioritise_call(get_limit, _From, _State) -> 9;
prioritise_call(_Msg, _From, _State) -> 0.
-handle_call({can_send, _QPid, _AckRequired}, _From,
+handle_call({can_send, _QPid, _AckRequired, _CTag, _Len}, _From,
State = #lim{blocked = true}) ->
{reply, false, State};
-handle_call({can_send, QPid, AckRequired}, _From,
+handle_call({can_send, QPid, AckRequired, CTag, Len}, _From,
State = #lim{volume = Volume}) ->
- case limit_reached(State) of
+ case limit_reached(CTag, State) of
true -> {reply, false, limit_queue(QPid, State)};
- false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
- true -> Volume
- end}}
+ false -> {reply, true,
+ decr_credit(CTag, Len,
+ State#lim{volume = if AckRequired -> Volume + 1;
+ true -> Volume
+ end})}
end;
handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
{reply, PrefetchCount, State};
handle_call({limit, PrefetchCount}, _From, State) ->
- case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of
+ case maybe_notify(irrelevant,
+ State, State#lim{prefetch_count = PrefetchCount}) of
{cont, State1} -> {reply, ok, State1};
{stop, State1} -> {stop, normal, stopped, State1}
end;
@@ -145,19 +161,19 @@ handle_call(block, _From, State) ->
{reply, ok, State#lim{blocked = true}};
handle_call(unblock, _From, State) ->
- case maybe_notify(State, State#lim{blocked = false}) of
- {cont, State1} -> {reply, ok, State1};
- {stop, State1} -> {stop, normal, stopped, State1}
- end;
+ maybe_notify_reply(irrelevant, State, State#lim{blocked = false});
+
+handle_call({set_credit, CTag, Credit, Count, Drain}, _From, State) ->
+ maybe_notify_reply(CTag, State, reset_credit(CTag, Credit, Count, Drain, State));
handle_call(is_blocked, _From, State) ->
{reply, blocked(State), State}.
-handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
+handle_cast({ack, CTag}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
- true -> Volume - Count
+ true -> Volume - 1
end,
- {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}),
+ {cont, State1} = maybe_notify(CTag, State, State#lim{volume = NewVolume}),
{noreply, State1};
handle_cast({register, QPid}, State) ->
@@ -179,19 +195,83 @@ code_change(_, State, _) ->
%% Internal plumbing
%%----------------------------------------------------------------------------
-maybe_notify(OldState, NewState) ->
- case (limit_reached(OldState) orelse blocked(OldState)) andalso
- not (limit_reached(NewState) orelse blocked(NewState)) of
+maybe_notify_reply(CTag, OldState, NewState) ->
+ case maybe_notify(CTag, OldState, NewState) of
+ {cont, State} -> {reply, ok, State};
+ {stop, State} -> {stop, normal, stopped, State}
+ end.
+
+maybe_notify(CTag, OldState, NewState) ->
+ case (limit_reached(CTag, OldState) orelse blocked(OldState)) andalso
+ not (limit_reached(CTag, NewState) orelse blocked(NewState)) of
true -> NewState1 = notify_queues(NewState),
- {case NewState1#lim.prefetch_count of
- 0 -> stop;
- _ -> cont
+ {case {NewState1#lim.prefetch_count,
+ dict:size(NewState1#lim.credits)} of
+ {0, 0} -> stop;
+ _ -> cont
end, NewState1};
false -> {cont, NewState}
end.
-limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
- Limit =/= 0 andalso Volume >= Limit.
+limit_reached(CTag, #lim{prefetch_count = Limit, volume = Volume,
+ credits = Credits}) ->
+ case dict:find(CTag, Credits) of
+ {ok, #credit{ credit = 0 }} -> true;
+ _ -> false
+ end orelse (Limit =/= 0 andalso Volume >= Limit).
+
+decr_credit(CTag, Len, State = #lim{ credits = Credits,
+ ch_pid = ChPid } ) ->
+ case dict:find(CTag, Credits) of
+ {ok, #credit{ credit = Credit, count = Count, drain = Drain }} ->
+ {NewCredit, NewCount} =
+ case {Credit, Len, Drain} of
+ {1, _, _} -> {0, serial_add(Count, 1)};
+ {_, 1, true} ->
+ %% Drain, so advance til credit = 0
+ NewCount0 = serial_add(Count, (Credit - 1)),
+ send_drained(ChPid, CTag, NewCount0),
+ {0, NewCount0}; %% Magic reduction to 0
+ {_, _, _} -> {Credit - 1, serial_add(Count, 1)}
+ end,
+ update_credit(CTag, NewCredit, NewCount, Drain, State);
+ error ->
+ State
+ end.
+
+send_drained(ChPid, CTag, Count) ->
+ rabbit_channel:send_command(ChPid,
+ #'basic.credit_state'{consumer_tag = CTag,
+ credit = 0,
+ count = Count,
+ available = 0,
+ drain = true}).
+
+%% Assert the credit state. The count may not match ours, in which
+%% case we must rebase the credit.
+%% TODO Edge case: if the queue has nothing in it, and drain is set,
+%% we want to send a basic.credit back.
+reset_credit(CTag, Credit0, Count0, Drain, State = #lim{credits = Credits}) ->
+ Count =
+ case dict:find(CTag, Credits) of
+ {ok, #credit{ count = LocalCount }} ->
+ LocalCount;
+ _ -> Count0
+ end,
+ %% Our credit may have been reduced while messages are in flight,
+ %% so we bottom out at 0.
+ Credit = erlang:max(0, serial_diff(serial_add(Count0, Credit0), Count)),
+ update_credit(CTag, Credit, Count, Drain, State).
+
+%% Store the credit
+update_credit(CTag, -1, _Count, _Drain, State = #lim{credits = Credits}) ->
+ State#lim{credits = dict:erase(CTag, Credits)};
+
+update_credit(CTag, Credit, Count, Drain, State = #lim{credits = Credits}) ->
+ State#lim{credits = dict:store(CTag,
+ #credit{credit = Credit,
+ count = Count,
+ drain = Drain}, Credits)}.
blocked(#lim{blocked = Blocked}) -> Blocked.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index abc27c5f..cdf20071 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -56,6 +56,7 @@
-export([lock_file/1]).
-export([const_ok/1, const/1]).
-export([ntoa/1, ntoab/1]).
+-export([serial_add/2, serial_compare/2, serial_diff/2]).
-export([is_process_alive/1]).
%%----------------------------------------------------------------------------
@@ -63,6 +64,7 @@
-ifdef(use_specs).
-export_type([resource_name/0, thunk/1, const/1]).
+-export_type([serial_number/0]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
@@ -76,6 +78,8 @@
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])).
-type(graph_edge_fun() ::
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])).
+-type(serial_number() :: non_neg_integer()).
+-type(serial_compare_result() :: 'equal' | 'less' | 'greater').
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
@@ -196,6 +200,12 @@
-spec(ntoa/1 :: (inet:ip_address()) -> string()).
-spec(ntoab/1 :: (inet:ip_address()) -> string()).
-spec(is_process_alive/1 :: (pid()) -> boolean()).
+-spec(serial_add/2 :: (serial_number(), non_neg_integer()) ->
+ serial_number()).
+-spec(serial_compare/2 :: (serial_number(), serial_number()) ->
+ serial_compare_result()).
+-spec(serial_diff/2 :: (serial_number(), serial_number()) ->
+ integer()).
-endif.
@@ -864,6 +874,47 @@ ntoab(IP) ->
_ -> "[" ++ Str ++ "]"
end.
+%% Serial arithmetic for unsigned ints.
+%% http://www.faqs.org/rfcs/rfc1982.html
+%% SERIAL_BITS = 32
+
+%% 2 ^ SERIAL_BITS
+-define(SERIAL_MAX, 16#100000000).
+%% 2 ^ (SERIAL_BITS - 1) - 1
+-define(SERIAL_MAX_ADDEND, 16#7fffffff).
+
+serial_add(S, N) when N =< ?SERIAL_MAX_ADDEND ->
+ (S + N) rem ?SERIAL_MAX;
+serial_add(S, N) ->
+ exit({out_of_bound_serial_addition, S, N}).
+
+serial_compare(A, B) ->
+ if A =:= B ->
+ equal;
+ (A < B andalso B - A < ?SERIAL_MAX_ADDEND) orelse
+ (A > B andalso A - B > ?SERIAL_MAX_ADDEND) ->
+ less;
+ (A < B andalso B - A > ?SERIAL_MAX_ADDEND) orelse
+ (A > B andalso B - A < ?SERIAL_MAX_ADDEND) ->
+ greater;
+ true -> exit({indeterminate_serial_comparison, A, B})
+ end.
+
+-define(SERIAL_DIFF_BOUND, 16#80000000).
+
+serial_diff(A, B) ->
+ Diff = A - B,
+ if Diff > (?SERIAL_DIFF_BOUND) ->
+ %% B is actually greater than A
+ - (?SERIAL_MAX - Diff);
+ Diff < - (?SERIAL_DIFF_BOUND) ->
+ ?SERIAL_MAX + Diff;
+ Diff < ?SERIAL_DIFF_BOUND andalso Diff > -?SERIAL_DIFF_BOUND ->
+ Diff;
+ true ->
+ exit({indeterminate_serial_diff, A, B})
+ end.
+
is_process_alive(Pid) when node(Pid) =:= node() ->
erlang:is_process_alive(Pid);
is_process_alive(Pid) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 2015170a..ea0a19ac 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -18,7 +18,7 @@
-compile([export_all]).
--export([all_tests/0, test_parsing/0]).
+-export([all_tests/0, test_parsing/0, test_serial_arithmetic/0]).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -47,6 +47,7 @@ all_tests() ->
passed = test_parsing(),
passed = test_content_framing(),
passed = test_content_transcoding(),
+ passed = test_serial_arithmetic(),
passed = test_topic_matching(),
passed = test_log_management(),
passed = test_app_management(),
@@ -585,6 +586,29 @@ sequence_with_content(Sequence) ->
rabbit_framing_amqp_0_9_1),
Sequence).
+test_serial_arithmetic() ->
+ 1 = rabbit_misc:serial_add(0, 1),
+ 16#7fffffff = rabbit_misc:serial_add(0, 16#7fffffff),
+ 0 = rabbit_misc:serial_add(16#ffffffff, 1),
+ %% Cannot add more than 2 ^ 31 - 1
+ case catch rabbit_misc:serial_add(200, 16#80000000) of
+ {'EXIT', {out_of_bound_serial_addition, _, _}} -> ok;
+ _ -> exit(fail_out_of_bound_serial_addition)
+ end,
+
+ 1 = rabbit_misc:serial_diff(1, 0),
+ 2 = rabbit_misc:serial_diff(1, 16#ffffffff),
+ -2 = rabbit_misc:serial_diff(16#ffffffff, 1),
+ case catch rabbit_misc:serial_diff(0, 16#80000000) of
+ {'EXIT', {indeterminate_serial_diff, _, _}} -> ok;
+ _ -> exit(fail_indeterminate_serial_difference)
+ end,
+ case catch rabbit_misc:serial_diff(16#ffffffff, 16#7fffffff) of
+ {'EXIT', {indeterminate_serial_diff, _, _}} -> ok;
+ _ -> exit(fail_indeterminate_serial_difference)
+ end,
+ passed.
+
test_topic_match(P, R) ->
test_topic_match(P, R, true).