diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-11-20 18:05:11 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-11-20 18:05:11 +0000 |
commit | a4ad123c9c39921465a26326c561c73ef50e1431 (patch) | |
tree | 30e406efa8d0aa4d7a5159c14d59bb71f41603d2 | |
parent | b9e643bb45c32d214125ba06f3c95b97147a3f6c (diff) | |
download | rabbitmq-server-a4ad123c9c39921465a26326c561c73ef50e1431.tar.gz |
First hack. Note that whilst this does the "right" thing, there is no attempt made to avoid deadlock. The Java tests (in particular the QoS Tests) all pass. Basically, I take the longest queue length, double it. The probability of that queue being unblocked is 50% (i.e. len >= 2*Len). That 2*Len is then used against all the queues, with their own length. Thus the shorter they are, the lower their probability of being unblocked. We stop work when we guarantee that either we've woken everyone up, or we've woken up enough to guarantee that we're now reblocked.
In general, this is pretty rough and ready and more like a proof of concept. Needs refinement.
-rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 53 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 118 |
3 files changed, 119 insertions, 56 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1a5e82d7..169c7c6d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -100,7 +100,7 @@ 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). --spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(unblock/2 :: (pid(), pid()) -> boolean()). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -306,7 +306,7 @@ notify_sent(QPid, ChPid) -> gen_server2:pcast(QPid, 8, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:pcast(QPid, 8, {unblock, ChPid}). + gen_server2:pcall(QPid, 8, {unblock, ChPid}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 80b7a92c..ca14c3dd 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -171,7 +171,8 @@ deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, active_consumers = ActiveConsumers, blocked_consumers = BlockedConsumers, - next_msg_id = NextId}) -> + next_msg_id = NextId, + message_buffer = MessageBuffer}) -> ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, @@ -180,7 +181,8 @@ deliver_immediately(Message, Delivered, C = #cr{limiter_pid = LimiterPid, unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), - case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of + case rabbit_limiter:can_send(LimiterPid, self(), AckRequired, + queue:len(MessageBuffer)) of true -> rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, @@ -202,7 +204,7 @@ deliver_immediately(Message, Delivered, ActiveConsumersTail, BlockedConsumers), {ActiveConsumers1, - queue:in(QEntry, BlockedConsumers1)} + queue:in_r(QEntry, BlockedConsumers1)} end, {offered, AckRequired, State#q{active_consumers = NewActiveConsumers, @@ -270,24 +272,29 @@ remove_consumers(ChPid, Queue) -> move_consumers(ChPid, From, To) -> {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end, queue:to_list(From)), - {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}. + {queue:from_list(Kept), queue:join(queue:from_list(Removed), To)}. -possibly_unblock(State, ChPid, Update) -> +possibly_unblock(State, ChPid, Update, Result) -> case lookup_ch(ChPid) of not_found -> + Result(false, State), State; C -> NewC = Update(C), store_ch_record(NewC), case ch_record_state_transition(C, NewC) of - ok -> State; - unblock -> {NewBlockedConsumers, NewActiveConsumers} = - move_consumers(ChPid, - State#q.blocked_consumers, - State#q.active_consumers), - run_poke_burst( - State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers}) + ok -> + Result(false, State), + State; + unblock -> + Result(true, State), + {NewBlockedConsumers, NewActiveConsumers} = + move_consumers(ChPid, + State#q.blocked_consumers, + State#q.active_consumers), + run_poke_burst( + State#q{active_consumers = NewActiveConsumers, + blocked_consumers = NewBlockedConsumers}) end end. @@ -733,7 +740,16 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, reply(ok, State); _ -> reply(locked, State) - end. + end; + +handle_call({unblock, ChPid}, From, State) -> + noreply( + possibly_unblock(State, ChPid, + fun (C) -> C#cr{is_limit_active = false} end, + fun (Unblocked, #q{message_buffer = MessageBuffer}) -> + gen_server2:reply(From, Unblocked andalso not + queue:is_empty(MessageBuffer)) + end)). handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. @@ -777,17 +793,12 @@ handle_cast({requeue, MsgIds, ChPid}, State) -> [{Message, true} || Message <- Messages], State)) end; -handle_cast({unblock, ChPid}, State) -> - noreply( - possibly_unblock(State, ChPid, - fun (C) -> C#cr{is_limit_active = false} end)); - handle_cast({notify_sent, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, fun (C = #cr{unsent_message_count = Count}) -> C#cr{unsent_message_count = Count - 1} - end)); + end, fun (_, _) -> ok end)); handle_cast({limit, ChPid, LimiterPid}, State) -> noreply( @@ -803,7 +814,7 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end, NewLimited = Limited andalso LimiterPid =/= undefined, C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} - end)). + end, fun (_, _) -> ok end)). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 087a9f64..4e95f37d 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -36,7 +36,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1, shutdown/1]). --export([limit/2, can_send/3, ack/2, register/2, unregister/2]). +-export([limit/2, can_send/4, ack/2, register/2, unregister/2]). %%---------------------------------------------------------------------------- @@ -47,7 +47,8 @@ -spec(start_link/1 :: (pid()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). --spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). +-spec(can_send/4 :: (maybe_pid(), pid(), boolean(), non_neg_integer()) -> + boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). @@ -58,7 +59,7 @@ -record(lim, {prefetch_count = 0, ch_pid, - queues = dict:new(), % QPid -> {MonitorRef, Notify} + queues = dict:new(), % QPid -> {MonitorRef, Notify, Length} volume = 0}). %% 'Notify' is a boolean that indicates whether a queue should be %% notified of a change in the limit or volume that may allow it to @@ -85,13 +86,13 @@ limit(LimiterPid, PrefetchCount) -> %% Ask the limiter whether the queue can deliver a message without %% breaching a limit -can_send(undefined, _QPid, _AckRequired) -> +can_send(undefined, _QPid, _AckRequired, _Length) -> true; -can_send(LimiterPid, QPid, AckRequired) -> +can_send(LimiterPid, QPid, AckRequired, Length) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired}, - infinity) end). + fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired, + Length}, infinity) end). %% Let the limiter know that the channel has received some acks from a %% consumer @@ -111,13 +112,17 @@ unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}) init([ChPid]) -> {ok, #lim{ch_pid = ChPid} }. -handle_call({can_send, QPid, AckRequired}, _From, +handle_call({can_send, QPid, AckRequired, Length}, _From, State = #lim{volume = Volume}) -> case limit_reached(State) of - true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; - true -> Volume - end}} + true -> + {reply, false, limit_queue(QPid, Length, State)}; + false -> + {reply, true, + update_length(QPid, Length, + State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end})} end. handle_cast(shutdown, State) -> @@ -130,6 +135,7 @@ handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count end, + io:format("~p OldVolume ~p ; Count ~p ; NewVolume ~p~n", [self(), Volume, Count, NewVolume]), {noreply, maybe_notify(State, State#lim{volume = NewVolume})}; handle_cast({register, QPid}, State) -> @@ -163,37 +169,83 @@ limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of false -> MRef = erlang:monitor(process, QPid), - State#lim{queues = dict:store(QPid, {MRef, false}, Queues)}; + State#lim{queues = dict:store(QPid, {MRef, false, 0}, Queues)}; true -> State end. forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) -> case dict:find(QPid, Queues) of - {ok, {MRef, _}} -> + {ok, {MRef, _, _}} -> true = erlang:demonitor(MRef), - ok = rabbit_amqqueue:unblock(QPid, ChPid), + unblock(QPid, ChPid), State#lim{queues = dict:erase(QPid, Queues)}; error -> State end. -limit_queue(QPid, State = #lim{queues = Queues}) -> - UpdateFun = fun ({MRef, _}) -> {MRef, true} end, +limit_queue(QPid, Length, State = #lim{queues = Queues}) -> + UpdateFun = fun ({MRef, _, _}) -> {MRef, true, Length} end, State#lim{queues = dict:update(QPid, UpdateFun, Queues)}. -notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> - {QList, NewQueues} = - dict:fold(fun (_QPid, {_, false}, Acc) -> Acc; - (QPid, {MRef, true}, {L, D}) -> - {[QPid | L], dict:store(QPid, {MRef, false}, D)} - end, {[], Queues}, Queues), - case length(QList) of - 0 -> ok; - L -> - %% We randomly vary the position of queues in the list, - %% thus ensuring that each queue has an equal chance of - %% being notified first. - {L1, L2} = lists:split(random:uniform(L), QList), - [ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L2 ++ L1], - ok - end, +update_length(QPid, Length, State = #lim{queues = Queues}) -> + UpdateFun = fun ({MRef, Notify, _}) -> {MRef, Notify, Length} end, + State#lim{queues = dict:update(QPid, UpdateFun, Queues)}. + +notify_queues(State = #lim{ch_pid = ChPid, queues = Queues, + prefetch_count = PrefetchCount, volume = Volume}) -> + QList = + dict:fold(fun (_QPid, {_, false, _}, Acc) -> Acc; + (QPid, {_MRef, true, Length}, L) -> + gb_trees:enter(Length, QPid, L) + end, gb_trees:empty(), Queues), + NewQueues = + case gb_trees:size(QList) of + 0 -> Queues; + QCount -> + Capacity = PrefetchCount - Volume, + {BiggestLength, _QPid} = gb_trees:largest(QList), + BiggestLength1 = lists:max([2*BiggestLength, 1]), + %% try to tell enough queues that we guarantee we'll get + %% blocked again + {Capacity1, NewQueues1} = + unblock_queue(ChPid, BiggestLength1, Capacity, QList, + Queues), + case 0 == Capacity1 of + true -> NewQueues1; + false -> %% just tell everyone + {_Capacity2, NewQueues2} = + unblock_queue(ChPid, 1, QCount, QList, NewQueues1), + NewQueues2 + end + end, State#lim{queues = NewQueues}. + +unblock_queue(_ChPid, _L, 0, _QList, Queues) -> + {0, Queues}; +unblock_queue(ChPid, L, QueueCount, QList, Queues) -> + UpdateFunUnBlock = fun ({MRef, _, Length}) -> {MRef, false, Length} end, + {Length, QPid, QList1} = gb_trees:take_largest(QList), + {_MRef, Blocked, Length} = dict:fetch(QPid, Queues), + {QueueCount1, Queues1} = + case Blocked of + false -> + {QueueCount, Queues}; + true -> + %% if 0 == Length, and L == 1, we still need to inform the q + case Length + 1 >= random:uniform(L) of + true -> case unblock(QPid, ChPid) of + true -> {QueueCount - 1, + dict:update(QPid, UpdateFunUnBlock, Queues)}; + false -> {QueueCount, Queues} + end; + false -> {QueueCount, Queues} + end + end, + case gb_trees:is_empty(QList1) of + true -> {QueueCount1, Queues1}; + false -> unblock_queue(ChPid, L, QueueCount1, QList1, Queues1) + end. + +unblock(QPid, ChPid) -> + rabbit_misc:with_exit_handler( + fun () -> false end, + fun () -> rabbit_amqqueue:unblock(QPid, ChPid) end). |