diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-08-03 14:36:12 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-08-03 14:36:12 +0100 |
commit | 030b28a5c43ff1bf6b40700a3f4b64fd4bf948ee (patch) | |
tree | 80317cb3744c94685038dcf67041c4c32ec4e2ae | |
parent | 3383b7ccaa079ebeca4b7ab701abed7d7324ca6e (diff) | |
parent | 5a4b32e1ccf37570973097086d4a07b6958207d1 (diff) | |
download | rabbitmq-server-030b28a5c43ff1bf6b40700a3f4b64fd4bf948ee.tar.gz |
Merge in default
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 43 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 146 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 51 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 26 |
5 files changed, 235 insertions, 41 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 05de48d6..43c90327 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -364,7 +364,9 @@ ch_record_state_transition(OldCR, NewCR) -> deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, State = #q{q = #amqqueue{name = QName}, active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers}) -> + blocked_consumers = BlockedConsumers, + backing_queue = BQ, + backing_queue_state = BQS}) -> case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, @@ -374,7 +376,8 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, acktags = ChAckTags} = ch_record(ChPid), IsMsgReady = PredFun(FunAcc, State), case (IsMsgReady andalso - rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of + rabbit_limiter:can_send( LimiterPid, self(), AckRequired, + ConsumerTag, BQ:len(BQS) )) of true -> {{Message, IsDelivered, AckTag}, FunAcc1, State1} = DeliverFun(AckRequired, FunAcc, State), @@ -1070,7 +1073,8 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> true -> ok end, - NewLimited = Limited andalso LimiterPid =/= undefined, + NewLimited = Limited andalso LimiterPid =/= undefined + andalso rabbit_limiter:is_blocked(LimiterPid), C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} end)); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 45f0032d..4d1712c4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1098,6 +1098,37 @@ handle_method(#'channel.flow'{active = false}, _, {noreply, State1#ch{blocking = dict:from_list(Queues)}} end; +handle_method(#'basic.credit'{consumer_tag = CTag, + credit = Credit, + count = Count, + drain = Drain}, _, + State = #ch{limiter_pid = LimiterPid, + consumer_mapping = Consumers}) -> + %% We get Available first because it's likely that as soon as we set + %% the credit msgs will get consumed and it'll be out of date. Why do we + %% want that? Because at least then it's consistent with the credit value + %% we return. And Available is always going to be racy. + Available = case dict:find(CTag, Consumers) of + {ok, {Q, _}} -> case rabbit_amqqueue:stat(Q) of + {ok, Len, _} -> Len; + _ -> -1 + end; + error -> -1 %% TODO these -1s smell very iffy! + end, + LimiterPid1 = case LimiterPid of + undefined -> start_limiter(State); + Other -> Other + end, + LimiterPid2 = + case rabbit_limiter:set_credit( + LimiterPid1, CTag, Credit, Count, Drain) of + ok -> limit_queues(LimiterPid1, State), + LimiterPid1; + stopped -> unlimit_queues(State) + end, + State1 = State#ch{limiter_pid = LimiterPid2}, + return_ok(State1, false, #'basic.credit_ok'{available = Available}); + handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). @@ -1304,12 +1335,12 @@ consumer_queues(Consumers) -> notify_limiter(undefined, _Acked) -> ok; notify_limiter(LimiterPid, Acked) -> - case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 - end, 0, Acked) of - 0 -> ok; - Count -> rabbit_limiter:ack(LimiterPid, Count) - end. + %% TODO this could be faster, group the acks + rabbit_misc:queue_fold( + fun ({_, none, _}, Acc) -> Acc; + ({_, CTag, _}, Acc) -> rabbit_limiter:ack(LimiterPid, CTag), + Acc + end, ok, Acked). deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 8f9ab032..33e654d2 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -15,14 +15,17 @@ %% -module(rabbit_limiter). +-include("rabbit_framing.hrl"). -behaviour(gen_server2). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/3]). -export([start_link/2]). --export([limit/2, can_send/3, ack/2, register/2, unregister/2]). --export([get_limit/1, block/1, unblock/1, is_blocked/1]). +-export([limit/2, can_send/5, ack/2, register/2, unregister/2]). +-export([get_limit/1, block/1, unblock/1, set_credit/5, is_blocked/1]). + +-import(rabbit_misc, [serial_add/2, serial_diff/2]). %%---------------------------------------------------------------------------- @@ -33,8 +36,10 @@ -spec(start_link/2 :: (pid(), non_neg_integer()) -> rabbit_types:ok_pid_or_error()). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). --spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). --spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). +-spec(can_send/5 :: + (maybe_pid(), pid(), boolean(), binary(), non_neg_integer()) + -> boolean()). +-spec(ack/2 :: (maybe_pid(), binary()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). @@ -49,8 +54,12 @@ -record(lim, {prefetch_count = 0, ch_pid, blocked = false, + credits = dict:new(), queues = orddict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). + +-record(credit, {count = 0, credit = 0, drain = false}). + %% 'Notify' is a boolean that indicates whether a queue should be %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. @@ -69,18 +78,19 @@ limit(LimiterPid, PrefetchCount) -> %% Ask the limiter whether the queue can deliver a message without %% breaching a limit -can_send(undefined, _QPid, _AckRequired) -> +can_send(undefined, _QPid, _AckRequired, _CTag, _Len) -> true; -can_send(LimiterPid, QPid, AckRequired) -> +can_send(LimiterPid, QPid, AckRequired, CTag, Len) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired}, + fun () -> gen_server2:call(LimiterPid, + {can_send, QPid, AckRequired, CTag, Len}, infinity) end). %% Let the limiter know that the channel has received some acks from a %% consumer -ack(undefined, _Count) -> ok; -ack(LimiterPid, Count) -> gen_server2:cast(LimiterPid, {ack, Count}). +ack(undefined, _CTag) -> ok; +ack(LimiterPid, CTag) -> gen_server2:cast(LimiterPid, {ack, CTag}). register(undefined, _QPid) -> ok; register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}). @@ -105,6 +115,12 @@ unblock(undefined) -> unblock(LimiterPid) -> gen_server2:call(LimiterPid, unblock, infinity). +set_credit(undefined, _, _, _, _) -> + ok; +set_credit(LimiterPid, CTag, Credit, Count, Drain) -> + gen_server2:call( + LimiterPid, {set_credit, CTag, Credit, Count, Drain}, infinity). + is_blocked(undefined) -> false; is_blocked(LimiterPid) -> @@ -120,23 +136,26 @@ init([ChPid, UnackedMsgCount]) -> prioritise_call(get_limit, _From, _State) -> 9; prioritise_call(_Msg, _From, _State) -> 0. -handle_call({can_send, QPid, _AckRequired}, _From, +handle_call({can_send, QPid, _AckRequired, _CTag, _Len}, _From, State = #lim{blocked = true}) -> {reply, false, limit_queue(QPid, State)}; -handle_call({can_send, QPid, AckRequired}, _From, +handle_call({can_send, QPid, AckRequired, CTag, Len}, _From, State = #lim{volume = Volume}) -> - case limit_reached(State) of + case limit_reached(CTag, State) of true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; - true -> Volume - end}} + false -> {reply, true, + decr_credit(CTag, Len, + State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end})} end; handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> {reply, PrefetchCount, State}; handle_call({limit, PrefetchCount}, _From, State) -> - case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of + case maybe_notify(irrelevant, + State, State#lim{prefetch_count = PrefetchCount}) of {cont, State1} -> {reply, ok, State1}; {stop, State1} -> {stop, normal, stopped, State1} end; @@ -145,19 +164,20 @@ handle_call(block, _From, State) -> {reply, ok, State#lim{blocked = true}}; handle_call(unblock, _From, State) -> - case maybe_notify(State, State#lim{blocked = false}) of - {cont, State1} -> {reply, ok, State1}; - {stop, State1} -> {stop, normal, stopped, State1} - end; + maybe_notify_reply(irrelevant, State, State#lim{blocked = false}); + +handle_call({set_credit, CTag, Credit, Count, Drain}, _From, State) -> + maybe_notify_reply(CTag, State, + reset_credit(CTag, Credit, Count, Drain, State)); handle_call(is_blocked, _From, State) -> {reply, blocked(State), State}. -handle_cast({ack, Count}, State = #lim{volume = Volume}) -> +handle_cast({ack, CTag}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; - true -> Volume - Count + true -> Volume - 1 end, - {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}), + {cont, State1} = maybe_notify(CTag, State, State#lim{volume = NewVolume}), {noreply, State1}; handle_cast({register, QPid}, State) -> @@ -179,19 +199,83 @@ code_change(_, State, _) -> %% Internal plumbing %%---------------------------------------------------------------------------- -maybe_notify(OldState, NewState) -> - case (limit_reached(OldState) orelse blocked(OldState)) andalso - not (limit_reached(NewState) orelse blocked(NewState)) of +maybe_notify_reply(CTag, OldState, NewState) -> + case maybe_notify(CTag, OldState, NewState) of + {cont, State} -> {reply, ok, State}; + {stop, State} -> {stop, normal, stopped, State} + end. + +maybe_notify(CTag, OldState, NewState) -> + case (limit_reached(CTag, OldState) orelse blocked(OldState)) andalso + not (limit_reached(CTag, NewState) orelse blocked(NewState)) of true -> NewState1 = notify_queues(NewState), - {case NewState1#lim.prefetch_count of - 0 -> stop; - _ -> cont + {case {NewState1#lim.prefetch_count, + dict:size(NewState1#lim.credits)} of + {0, 0} -> stop; + _ -> cont end, NewState1}; false -> {cont, NewState} end. -limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> - Limit =/= 0 andalso Volume >= Limit. +limit_reached(CTag, #lim{prefetch_count = Limit, volume = Volume, + credits = Credits}) -> + case dict:find(CTag, Credits) of + {ok, #credit{ credit = 0 }} -> true; + _ -> false + end orelse (Limit =/= 0 andalso Volume >= Limit). + +decr_credit(CTag, Len, State = #lim{ credits = Credits, + ch_pid = ChPid } ) -> + case dict:find(CTag, Credits) of + {ok, #credit{ credit = Credit, count = Count, drain = Drain }} -> + {NewCredit, NewCount} = + case {Credit, Len, Drain} of + {1, _, _} -> {0, serial_add(Count, 1)}; + {_, 1, true} -> + %% Drain, so advance til credit = 0 + NewCount0 = serial_add(Count, (Credit - 1)), + send_drained(ChPid, CTag, NewCount0), + {0, NewCount0}; %% Magic reduction to 0 + {_, _, _} -> {Credit - 1, serial_add(Count, 1)} + end, + update_credit(CTag, NewCredit, NewCount, Drain, State); + error -> + State + end. + +send_drained(ChPid, CTag, Count) -> + rabbit_channel:send_command(ChPid, + #'basic.credit_state'{consumer_tag = CTag, + credit = 0, + count = Count, + available = 0, + drain = true}). + +%% Assert the credit state. The count may not match ours, in which +%% case we must rebase the credit. +%% TODO Edge case: if the queue has nothing in it, and drain is set, +%% we want to send a basic.credit back. +reset_credit(CTag, Credit0, Count0, Drain, State = #lim{credits = Credits}) -> + Count = + case dict:find(CTag, Credits) of + {ok, #credit{ count = LocalCount }} -> + LocalCount; + _ -> Count0 + end, + %% Our credit may have been reduced while messages are in flight, + %% so we bottom out at 0. + Credit = erlang:max(0, serial_diff(serial_add(Count0, Credit0), Count)), + update_credit(CTag, Credit, Count, Drain, State). + +%% Store the credit +update_credit(CTag, -1, _Count, _Drain, State = #lim{credits = Credits}) -> + State#lim{credits = dict:erase(CTag, Credits)}; + +update_credit(CTag, Credit, Count, Drain, State = #lim{credits = Credits}) -> + State#lim{credits = dict:store(CTag, + #credit{credit = Credit, + count = Count, + drain = Drain}, Credits)}. blocked(#lim{blocked = Blocked}) -> Blocked. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 3bbfb1d7..0c4ca186 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -55,6 +55,7 @@ -export([lock_file/1]). -export([const_ok/0, const/1]). -export([ntoa/1, ntoab/1]). +-export([serial_add/2, serial_compare/2, serial_diff/2]). -export([is_process_alive/1]). -export([pget/2, pget/3, pget_or_die/2]). -export([format_message_queue/2]). @@ -64,6 +65,7 @@ -ifdef(use_specs). -export_type([resource_name/0, thunk/1]). +-export_type([serial_number/0]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). @@ -76,6 +78,8 @@ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])). -type(graph_edge_fun() :: fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])). +-type(serial_number() :: non_neg_integer()). +-type(serial_compare_result() :: 'equal' | 'less' | 'greater'). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). @@ -207,6 +211,12 @@ -spec(pget/3 :: (term(), [term()], term()) -> term()). -spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()). -spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()). +-spec(serial_add/2 :: (serial_number(), non_neg_integer()) -> + serial_number()). +-spec(serial_compare/2 :: (serial_number(), serial_number()) -> + serial_compare_result()). +-spec(serial_diff/2 :: (serial_number(), serial_number()) -> + integer()). -endif. @@ -905,6 +915,47 @@ ntoab(IP) -> _ -> "[" ++ Str ++ "]" end. +%% Serial arithmetic for unsigned ints. +%% http://www.faqs.org/rfcs/rfc1982.html +%% SERIAL_BITS = 32 + +%% 2 ^ SERIAL_BITS +-define(SERIAL_MAX, 16#100000000). +%% 2 ^ (SERIAL_BITS - 1) - 1 +-define(SERIAL_MAX_ADDEND, 16#7fffffff). + +serial_add(S, N) when N =< ?SERIAL_MAX_ADDEND -> + (S + N) rem ?SERIAL_MAX; +serial_add(S, N) -> + exit({out_of_bound_serial_addition, S, N}). + +serial_compare(A, B) -> + if A =:= B -> + equal; + (A < B andalso B - A < ?SERIAL_MAX_ADDEND) orelse + (A > B andalso A - B > ?SERIAL_MAX_ADDEND) -> + less; + (A < B andalso B - A > ?SERIAL_MAX_ADDEND) orelse + (A > B andalso B - A < ?SERIAL_MAX_ADDEND) -> + greater; + true -> exit({indeterminate_serial_comparison, A, B}) + end. + +-define(SERIAL_DIFF_BOUND, 16#80000000). + +serial_diff(A, B) -> + Diff = A - B, + if Diff > (?SERIAL_DIFF_BOUND) -> + %% B is actually greater than A + - (?SERIAL_MAX - Diff); + Diff < - (?SERIAL_DIFF_BOUND) -> + ?SERIAL_MAX + Diff; + Diff < ?SERIAL_DIFF_BOUND andalso Diff > -?SERIAL_DIFF_BOUND -> + Diff; + true -> + exit({indeterminate_serial_diff, A, B}) + end. + is_process_alive(Pid) when node(Pid) =:= node() -> erlang:is_process_alive(Pid); is_process_alive(Pid) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ed4efb47..f9fbd5f6 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -18,7 +18,7 @@ -compile([export_all]). --export([all_tests/0, test_parsing/0]). +-export([all_tests/0, test_parsing/0, test_serial_arithmetic/0]). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -48,6 +48,7 @@ all_tests() -> passed = test_parsing(), passed = test_content_framing(), passed = test_content_transcoding(), + passed = test_serial_arithmetic(), passed = test_topic_matching(), passed = test_log_management(), passed = test_app_management(), @@ -623,6 +624,29 @@ sequence_with_content(Sequence) -> rabbit_framing_amqp_0_9_1), Sequence). +test_serial_arithmetic() -> + 1 = rabbit_misc:serial_add(0, 1), + 16#7fffffff = rabbit_misc:serial_add(0, 16#7fffffff), + 0 = rabbit_misc:serial_add(16#ffffffff, 1), + %% Cannot add more than 2 ^ 31 - 1 + case catch rabbit_misc:serial_add(200, 16#80000000) of + {'EXIT', {out_of_bound_serial_addition, _, _}} -> ok; + _ -> exit(fail_out_of_bound_serial_addition) + end, + + 1 = rabbit_misc:serial_diff(1, 0), + 2 = rabbit_misc:serial_diff(1, 16#ffffffff), + -2 = rabbit_misc:serial_diff(16#ffffffff, 1), + case catch rabbit_misc:serial_diff(0, 16#80000000) of + {'EXIT', {indeterminate_serial_diff, _, _}} -> ok; + _ -> exit(fail_indeterminate_serial_difference) + end, + case catch rabbit_misc:serial_diff(16#ffffffff, 16#7fffffff) of + {'EXIT', {indeterminate_serial_diff, _, _}} -> ok; + _ -> exit(fail_indeterminate_serial_difference) + end, + passed. + test_topic_matching() -> XName = #resource{virtual_host = <<"/">>, kind = exchange, |