diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-02-18 18:09:18 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-02-18 18:09:18 +0000 |
commit | 0c275d9b01f77e47baac84cac83bf02a883fa32b (patch) | |
tree | 6f1196e500bfccd368dd71a6ba21ca10c1c74c90 | |
parent | 1f3f3b0c6c9d5aa181b3bb9306e383b22ea1d9f1 (diff) | |
parent | 2970f6af9cc43ce2b9b44a3b116cb1dfe6b51493 (diff) | |
download | rabbitmq-server-0c275d9b01f77e47baac84cac83bf02a883fa32b.tar.gz |
Merge in default
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 54 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 142 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 51 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 26 |
5 files changed, 242 insertions, 41 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e794b4aa..053878e8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -336,7 +336,9 @@ ch_record_state_transition(OldCR, NewCR) -> deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, State = #q{q = #amqqueue{name = QName}, active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers}) -> + blocked_consumers = BlockedConsumers, + backing_queue = BQ, + backing_queue_state = BQS}) -> case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, @@ -346,7 +348,8 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, acktags = ChAckTags} = ch_record(ChPid), IsMsgReady = PredFun(FunAcc, State), case (IsMsgReady andalso - rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of + rabbit_limiter:can_send( LimiterPid, self(), AckRequired, + ConsumerTag, BQ:len(BQS) )) of true -> {{Message, IsDelivered, AckTag}, FunAcc1, State1} = DeliverFun(AckRequired, FunAcc, State), @@ -1078,7 +1081,8 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> true -> ok end, - NewLimited = Limited andalso LimiterPid =/= undefined, + NewLimited = Limited andalso LimiterPid =/= undefined + andalso rabbit_limiter:is_blocked(LimiterPid), C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} end)); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 73c031de..7c57f568 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -469,6 +469,7 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) -> check_name(_Kind, NameBin) -> NameBin. +%% TODO port this(?) queue_blocked(QPid, State = #ch{blocking = Blocking}) -> case dict:find(QPid, Blocking) of error -> State; @@ -1047,6 +1048,47 @@ handle_method(#'channel.flow'{active = false}, _, {noreply, State1#ch{blocking = dict:from_list(Queues)}} end; +handle_method(#'basic.credit'{consumer_tag = CTag, + credit = Credit, + count = Count, + drain = Drain}, _, + State = #ch{limiter_pid = LimiterPid, + consumer_mapping = Consumers}) -> + %% We get Available first because it's likely that as soon as we set + %% the credit msgs will get consumed and it'll be out of date. Why do we + %% want that? Because at least then it's consistent with the credit value + %% we return. And Available is always going to be racy. + Available = case dict:find(CTag, Consumers) of + {ok, QName} -> + case rabbit_amqqueue:with( + QName, fun (Q) -> rabbit_amqqueue:stat(Q) end) of + {ok, Len, _} -> Len; + _ -> -1 + end; + error -> -1 + end, + LimiterPid1 = case LimiterPid of + undefined -> start_limiter(State); + Other -> Other + end, + LimiterPid2 = + case rabbit_limiter:set_credit(LimiterPid1, CTag, Credit, Count, Drain) of + ok -> limit_queues(LimiterPid1, State), + LimiterPid1; + stopped -> unlimit_queues(State) + end, + State1 = State#ch{limiter_pid = LimiterPid2}, + return_ok(State1, false, #'basic.credit_ok'{available = Available}); + + %% TODO port this bit ? + %% case consumer_queues(Consumers) of + %% [] -> {reply, #'channel.flow_ok'{active = false}, State1}; + %% QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} || + %% QPid <- QPids], + %% ok = rabbit_amqqueue:flush_all(QPids, self()), + %% {noreply, State1#ch{blocking = dict:from_list(Queues)}} + %% end; + handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). @@ -1239,12 +1281,12 @@ consumer_queues(Consumers) -> notify_limiter(undefined, _Acked) -> ok; notify_limiter(LimiterPid, Acked) -> - case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 - end, 0, Acked) of - 0 -> ok; - Count -> rabbit_limiter:ack(LimiterPid, Count) - end. + %% TODO this could be faster, group the acks + rabbit_misc:queue_fold( + fun ({_, none, _}, Acc) -> Acc; + ({_, CTag, _}, Acc) -> rabbit_limiter:ack(LimiterPid, CTag), + Acc + end, ok, Acked). is_message_persistent(Content) -> case rabbit_basic:is_message_persistent(Content) of diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 1b72dd76..bf4ca797 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -15,14 +15,17 @@ %% -module(rabbit_limiter). +-include("rabbit_framing.hrl"). -behaviour(gen_server2). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/3]). -export([start_link/2]). --export([limit/2, can_send/3, ack/2, register/2, unregister/2]). --export([get_limit/1, block/1, unblock/1, is_blocked/1]). +-export([limit/2, can_send/5, ack/2, register/2, unregister/2]). +-export([get_limit/1, block/1, unblock/1, set_credit/5, is_blocked/1]). + +-import(rabbit_misc, [serial_add/2, serial_diff/2]). %%---------------------------------------------------------------------------- @@ -33,8 +36,8 @@ -spec(start_link/2 :: (pid(), non_neg_integer()) -> rabbit_types:ok_pid_or_error()). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). --spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). --spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). +-spec(can_send/5 :: (maybe_pid(), pid(), boolean(), binary(), non_neg_integer()) -> boolean()). +-spec(ack/2 :: (maybe_pid(), binary()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). @@ -49,8 +52,12 @@ -record(lim, {prefetch_count = 0, ch_pid, blocked = false, + credits = dict:new(), queues = dict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). + +-record(credit, {count = 0, credit = 0, drain = false}). + %% 'Notify' is a boolean that indicates whether a queue should be %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. @@ -69,18 +76,19 @@ limit(LimiterPid, PrefetchCount) -> %% Ask the limiter whether the queue can deliver a message without %% breaching a limit -can_send(undefined, _QPid, _AckRequired) -> +can_send(undefined, _QPid, _AckRequired, _CTag, _Len) -> true; -can_send(LimiterPid, QPid, AckRequired) -> +can_send(LimiterPid, QPid, AckRequired, CTag, Len) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired}, + fun () -> gen_server2:call(LimiterPid, + {can_send, QPid, AckRequired, CTag, Len}, infinity) end). %% Let the limiter know that the channel has received some acks from a %% consumer -ack(undefined, _Count) -> ok; -ack(LimiterPid, Count) -> gen_server2:cast(LimiterPid, {ack, Count}). +ack(undefined, _CTag) -> ok; +ack(LimiterPid, CTag) -> gen_server2:cast(LimiterPid, {ack, CTag}). register(undefined, _QPid) -> ok; register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}). @@ -105,6 +113,11 @@ unblock(undefined) -> unblock(LimiterPid) -> gen_server2:call(LimiterPid, unblock, infinity). +set_credit(undefined, _, _, _, _) -> + ok; +set_credit(LimiterPid, CTag, Credit, Count, Drain) -> + gen_server2:call(LimiterPid, {set_credit, CTag, Credit, Count, Drain}, infinity). + is_blocked(undefined) -> false; is_blocked(LimiterPid) -> @@ -120,23 +133,26 @@ init([ChPid, UnackedMsgCount]) -> prioritise_call(get_limit, _From, _State) -> 9; prioritise_call(_Msg, _From, _State) -> 0. -handle_call({can_send, _QPid, _AckRequired}, _From, +handle_call({can_send, _QPid, _AckRequired, _CTag, _Len}, _From, State = #lim{blocked = true}) -> {reply, false, State}; -handle_call({can_send, QPid, AckRequired}, _From, +handle_call({can_send, QPid, AckRequired, CTag, Len}, _From, State = #lim{volume = Volume}) -> - case limit_reached(State) of + case limit_reached(CTag, State) of true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; - true -> Volume - end}} + false -> {reply, true, + decr_credit(CTag, Len, + State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end})} end; handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> {reply, PrefetchCount, State}; handle_call({limit, PrefetchCount}, _From, State) -> - case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of + case maybe_notify(irrelevant, + State, State#lim{prefetch_count = PrefetchCount}) of {cont, State1} -> {reply, ok, State1}; {stop, State1} -> {stop, normal, stopped, State1} end; @@ -145,19 +161,19 @@ handle_call(block, _From, State) -> {reply, ok, State#lim{blocked = true}}; handle_call(unblock, _From, State) -> - case maybe_notify(State, State#lim{blocked = false}) of - {cont, State1} -> {reply, ok, State1}; - {stop, State1} -> {stop, normal, stopped, State1} - end; + maybe_notify_reply(irrelevant, State, State#lim{blocked = false}); + +handle_call({set_credit, CTag, Credit, Count, Drain}, _From, State) -> + maybe_notify_reply(CTag, State, reset_credit(CTag, Credit, Count, Drain, State)); handle_call(is_blocked, _From, State) -> {reply, blocked(State), State}. -handle_cast({ack, Count}, State = #lim{volume = Volume}) -> +handle_cast({ack, CTag}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; - true -> Volume - Count + true -> Volume - 1 end, - {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}), + {cont, State1} = maybe_notify(CTag, State, State#lim{volume = NewVolume}), {noreply, State1}; handle_cast({register, QPid}, State) -> @@ -179,19 +195,83 @@ code_change(_, State, _) -> %% Internal plumbing %%---------------------------------------------------------------------------- -maybe_notify(OldState, NewState) -> - case (limit_reached(OldState) orelse blocked(OldState)) andalso - not (limit_reached(NewState) orelse blocked(NewState)) of +maybe_notify_reply(CTag, OldState, NewState) -> + case maybe_notify(CTag, OldState, NewState) of + {cont, State} -> {reply, ok, State}; + {stop, State} -> {stop, normal, stopped, State} + end. + +maybe_notify(CTag, OldState, NewState) -> + case (limit_reached(CTag, OldState) orelse blocked(OldState)) andalso + not (limit_reached(CTag, NewState) orelse blocked(NewState)) of true -> NewState1 = notify_queues(NewState), - {case NewState1#lim.prefetch_count of - 0 -> stop; - _ -> cont + {case {NewState1#lim.prefetch_count, + dict:size(NewState1#lim.credits)} of + {0, 0} -> stop; + _ -> cont end, NewState1}; false -> {cont, NewState} end. -limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> - Limit =/= 0 andalso Volume >= Limit. +limit_reached(CTag, #lim{prefetch_count = Limit, volume = Volume, + credits = Credits}) -> + case dict:find(CTag, Credits) of + {ok, #credit{ credit = 0 }} -> true; + _ -> false + end orelse (Limit =/= 0 andalso Volume >= Limit). + +decr_credit(CTag, Len, State = #lim{ credits = Credits, + ch_pid = ChPid } ) -> + case dict:find(CTag, Credits) of + {ok, #credit{ credit = Credit, count = Count, drain = Drain }} -> + {NewCredit, NewCount} = + case {Credit, Len, Drain} of + {1, _, _} -> {0, serial_add(Count, 1)}; + {_, 1, true} -> + %% Drain, so advance til credit = 0 + NewCount0 = serial_add(Count, (Credit - 1)), + send_drained(ChPid, CTag, NewCount0), + {0, NewCount0}; %% Magic reduction to 0 + {_, _, _} -> {Credit - 1, serial_add(Count, 1)} + end, + update_credit(CTag, NewCredit, NewCount, Drain, State); + error -> + State + end. + +send_drained(ChPid, CTag, Count) -> + rabbit_channel:send_command(ChPid, + #'basic.credit_state'{consumer_tag = CTag, + credit = 0, + count = Count, + available = 0, + drain = true}). + +%% Assert the credit state. The count may not match ours, in which +%% case we must rebase the credit. +%% TODO Edge case: if the queue has nothing in it, and drain is set, +%% we want to send a basic.credit back. +reset_credit(CTag, Credit0, Count0, Drain, State = #lim{credits = Credits}) -> + Count = + case dict:find(CTag, Credits) of + {ok, #credit{ count = LocalCount }} -> + LocalCount; + _ -> Count0 + end, + %% Our credit may have been reduced while messages are in flight, + %% so we bottom out at 0. + Credit = erlang:max(0, serial_diff(serial_add(Count0, Credit0), Count)), + update_credit(CTag, Credit, Count, Drain, State). + +%% Store the credit +update_credit(CTag, -1, _Count, _Drain, State = #lim{credits = Credits}) -> + State#lim{credits = dict:erase(CTag, Credits)}; + +update_credit(CTag, Credit, Count, Drain, State = #lim{credits = Credits}) -> + State#lim{credits = dict:store(CTag, + #credit{credit = Credit, + count = Count, + drain = Drain}, Credits)}. blocked(#lim{blocked = Blocked}) -> Blocked. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index abc27c5f..cdf20071 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -56,6 +56,7 @@ -export([lock_file/1]). -export([const_ok/1, const/1]). -export([ntoa/1, ntoab/1]). +-export([serial_add/2, serial_compare/2, serial_diff/2]). -export([is_process_alive/1]). %%---------------------------------------------------------------------------- @@ -63,6 +64,7 @@ -ifdef(use_specs). -export_type([resource_name/0, thunk/1, const/1]). +-export_type([serial_number/0]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). @@ -76,6 +78,8 @@ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])). -type(graph_edge_fun() :: fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])). +-type(serial_number() :: non_neg_integer()). +-type(serial_compare_result() :: 'equal' | 'less' | 'greater'). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). @@ -196,6 +200,12 @@ -spec(ntoa/1 :: (inet:ip_address()) -> string()). -spec(ntoab/1 :: (inet:ip_address()) -> string()). -spec(is_process_alive/1 :: (pid()) -> boolean()). +-spec(serial_add/2 :: (serial_number(), non_neg_integer()) -> + serial_number()). +-spec(serial_compare/2 :: (serial_number(), serial_number()) -> + serial_compare_result()). +-spec(serial_diff/2 :: (serial_number(), serial_number()) -> + integer()). -endif. @@ -864,6 +874,47 @@ ntoab(IP) -> _ -> "[" ++ Str ++ "]" end. +%% Serial arithmetic for unsigned ints. +%% http://www.faqs.org/rfcs/rfc1982.html +%% SERIAL_BITS = 32 + +%% 2 ^ SERIAL_BITS +-define(SERIAL_MAX, 16#100000000). +%% 2 ^ (SERIAL_BITS - 1) - 1 +-define(SERIAL_MAX_ADDEND, 16#7fffffff). + +serial_add(S, N) when N =< ?SERIAL_MAX_ADDEND -> + (S + N) rem ?SERIAL_MAX; +serial_add(S, N) -> + exit({out_of_bound_serial_addition, S, N}). + +serial_compare(A, B) -> + if A =:= B -> + equal; + (A < B andalso B - A < ?SERIAL_MAX_ADDEND) orelse + (A > B andalso A - B > ?SERIAL_MAX_ADDEND) -> + less; + (A < B andalso B - A > ?SERIAL_MAX_ADDEND) orelse + (A > B andalso B - A < ?SERIAL_MAX_ADDEND) -> + greater; + true -> exit({indeterminate_serial_comparison, A, B}) + end. + +-define(SERIAL_DIFF_BOUND, 16#80000000). + +serial_diff(A, B) -> + Diff = A - B, + if Diff > (?SERIAL_DIFF_BOUND) -> + %% B is actually greater than A + - (?SERIAL_MAX - Diff); + Diff < - (?SERIAL_DIFF_BOUND) -> + ?SERIAL_MAX + Diff; + Diff < ?SERIAL_DIFF_BOUND andalso Diff > -?SERIAL_DIFF_BOUND -> + Diff; + true -> + exit({indeterminate_serial_diff, A, B}) + end. + is_process_alive(Pid) when node(Pid) =:= node() -> erlang:is_process_alive(Pid); is_process_alive(Pid) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2015170a..ea0a19ac 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -18,7 +18,7 @@ -compile([export_all]). --export([all_tests/0, test_parsing/0]). +-export([all_tests/0, test_parsing/0, test_serial_arithmetic/0]). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -47,6 +47,7 @@ all_tests() -> passed = test_parsing(), passed = test_content_framing(), passed = test_content_transcoding(), + passed = test_serial_arithmetic(), passed = test_topic_matching(), passed = test_log_management(), passed = test_app_management(), @@ -585,6 +586,29 @@ sequence_with_content(Sequence) -> rabbit_framing_amqp_0_9_1), Sequence). +test_serial_arithmetic() -> + 1 = rabbit_misc:serial_add(0, 1), + 16#7fffffff = rabbit_misc:serial_add(0, 16#7fffffff), + 0 = rabbit_misc:serial_add(16#ffffffff, 1), + %% Cannot add more than 2 ^ 31 - 1 + case catch rabbit_misc:serial_add(200, 16#80000000) of + {'EXIT', {out_of_bound_serial_addition, _, _}} -> ok; + _ -> exit(fail_out_of_bound_serial_addition) + end, + + 1 = rabbit_misc:serial_diff(1, 0), + 2 = rabbit_misc:serial_diff(1, 16#ffffffff), + -2 = rabbit_misc:serial_diff(16#ffffffff, 1), + case catch rabbit_misc:serial_diff(0, 16#80000000) of + {'EXIT', {indeterminate_serial_diff, _, _}} -> ok; + _ -> exit(fail_indeterminate_serial_difference) + end, + case catch rabbit_misc:serial_diff(16#ffffffff, 16#7fffffff) of + {'EXIT', {indeterminate_serial_diff, _, _}} -> ok; + _ -> exit(fail_indeterminate_serial_difference) + end, + passed. + test_topic_match(P, R) -> test_topic_match(P, R, true). |