summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-20 18:05:11 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-20 18:05:11 +0000
commita4ad123c9c39921465a26326c561c73ef50e1431 (patch)
tree30e406efa8d0aa4d7a5159c14d59bb71f41603d2
parentb9e643bb45c32d214125ba06f3c95b97147a3f6c (diff)
downloadrabbitmq-server-a4ad123c9c39921465a26326c561c73ef50e1431.tar.gz
First hack. Note that whilst this does the "right" thing, there is no attempt made to avoid deadlock. The Java tests (in particular the QoS Tests) all pass. Basically, I take the longest queue length, double it. The probability of that queue being unblocked is 50% (i.e. len >= 2*Len). That 2*Len is then used against all the queues, with their own length. Thus the shorter they are, the lower their probability of being unblocked. We stop work when we guarantee that either we've woken everyone up, or we've woken up enough to guarantee that we're now reblocked.
In general, this is pretty rough and ready and more like a proof of concept. Needs refinement.
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_amqqueue_process.erl53
-rw-r--r--src/rabbit_limiter.erl118
3 files changed, 119 insertions, 56 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1a5e82d7..169c7c6d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -100,7 +100,7 @@
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
--spec(unblock/2 :: (pid(), pid()) -> 'ok').
+-spec(unblock/2 :: (pid(), pid()) -> boolean()).
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
@@ -306,7 +306,7 @@ notify_sent(QPid, ChPid) ->
gen_server2:pcast(QPid, 8, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
- gen_server2:pcast(QPid, 8, {unblock, ChPid}).
+ gen_server2:pcall(QPid, 8, {unblock, ChPid}).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 80b7a92c..ca14c3dd 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -171,7 +171,8 @@ deliver_immediately(Message, Delivered,
State = #q{q = #amqqueue{name = QName},
active_consumers = ActiveConsumers,
blocked_consumers = BlockedConsumers,
- next_msg_id = NextId}) ->
+ next_msg_id = NextId,
+ message_buffer = MessageBuffer}) ->
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
@@ -180,7 +181,8 @@ deliver_immediately(Message, Delivered,
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
- case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of
+ case rabbit_limiter:can_send(LimiterPid, self(), AckRequired,
+ queue:len(MessageBuffer)) of
true ->
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
@@ -202,7 +204,7 @@ deliver_immediately(Message, Delivered,
ActiveConsumersTail,
BlockedConsumers),
{ActiveConsumers1,
- queue:in(QEntry, BlockedConsumers1)}
+ queue:in_r(QEntry, BlockedConsumers1)}
end,
{offered, AckRequired,
State#q{active_consumers = NewActiveConsumers,
@@ -270,24 +272,29 @@ remove_consumers(ChPid, Queue) ->
move_consumers(ChPid, From, To) ->
{Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
queue:to_list(From)),
- {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}.
+ {queue:from_list(Kept), queue:join(queue:from_list(Removed), To)}.
-possibly_unblock(State, ChPid, Update) ->
+possibly_unblock(State, ChPid, Update, Result) ->
case lookup_ch(ChPid) of
not_found ->
+ Result(false, State),
State;
C ->
NewC = Update(C),
store_ch_record(NewC),
case ch_record_state_transition(C, NewC) of
- ok -> State;
- unblock -> {NewBlockedConsumers, NewActiveConsumers} =
- move_consumers(ChPid,
- State#q.blocked_consumers,
- State#q.active_consumers),
- run_poke_burst(
- State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers})
+ ok ->
+ Result(false, State),
+ State;
+ unblock ->
+ Result(true, State),
+ {NewBlockedConsumers, NewActiveConsumers} =
+ move_consumers(ChPid,
+ State#q.blocked_consumers,
+ State#q.active_consumers),
+ run_poke_burst(
+ State#q{active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedConsumers})
end
end.
@@ -733,7 +740,16 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
reply(ok, State);
_ ->
reply(locked, State)
- end.
+ end;
+
+handle_call({unblock, ChPid}, From, State) ->
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C) -> C#cr{is_limit_active = false} end,
+ fun (Unblocked, #q{message_buffer = MessageBuffer}) ->
+ gen_server2:reply(From, Unblocked andalso not
+ queue:is_empty(MessageBuffer))
+ end)).
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
@@ -777,17 +793,12 @@ handle_cast({requeue, MsgIds, ChPid}, State) ->
[{Message, true} || Message <- Messages], State))
end;
-handle_cast({unblock, ChPid}, State) ->
- noreply(
- possibly_unblock(State, ChPid,
- fun (C) -> C#cr{is_limit_active = false} end));
-
handle_cast({notify_sent, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,
fun (C = #cr{unsent_message_count = Count}) ->
C#cr{unsent_message_count = Count - 1}
- end));
+ end, fun (_, _) -> ok end));
handle_cast({limit, ChPid, LimiterPid}, State) ->
noreply(
@@ -803,7 +814,7 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end,
NewLimited = Limited andalso LimiterPid =/= undefined,
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
- end)).
+ end, fun (_, _) -> ok end)).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 087a9f64..4e95f37d 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -36,7 +36,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-export([start_link/1, shutdown/1]).
--export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
+-export([limit/2, can_send/4, ack/2, register/2, unregister/2]).
%%----------------------------------------------------------------------------
@@ -47,7 +47,8 @@
-spec(start_link/1 :: (pid()) -> pid()).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
--spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
+-spec(can_send/4 :: (maybe_pid(), pid(), boolean(), non_neg_integer()) ->
+ boolean()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
@@ -58,7 +59,7 @@
-record(lim, {prefetch_count = 0,
ch_pid,
- queues = dict:new(), % QPid -> {MonitorRef, Notify}
+ queues = dict:new(), % QPid -> {MonitorRef, Notify, Length}
volume = 0}).
%% 'Notify' is a boolean that indicates whether a queue should be
%% notified of a change in the limit or volume that may allow it to
@@ -85,13 +86,13 @@ limit(LimiterPid, PrefetchCount) ->
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
-can_send(undefined, _QPid, _AckRequired) ->
+can_send(undefined, _QPid, _AckRequired, _Length) ->
true;
-can_send(LimiterPid, QPid, AckRequired) ->
+can_send(LimiterPid, QPid, AckRequired, Length) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
- fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired},
- infinity) end).
+ fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired,
+ Length}, infinity) end).
%% Let the limiter know that the channel has received some acks from a
%% consumer
@@ -111,13 +112,17 @@ unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid})
init([ChPid]) ->
{ok, #lim{ch_pid = ChPid} }.
-handle_call({can_send, QPid, AckRequired}, _From,
+handle_call({can_send, QPid, AckRequired, Length}, _From,
State = #lim{volume = Volume}) ->
case limit_reached(State) of
- true -> {reply, false, limit_queue(QPid, State)};
- false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
- true -> Volume
- end}}
+ true ->
+ {reply, false, limit_queue(QPid, Length, State)};
+ false ->
+ {reply, true,
+ update_length(QPid, Length,
+ State#lim{volume = if AckRequired -> Volume + 1;
+ true -> Volume
+ end})}
end.
handle_cast(shutdown, State) ->
@@ -130,6 +135,7 @@ handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
true -> Volume - Count
end,
+ io:format("~p OldVolume ~p ; Count ~p ; NewVolume ~p~n", [self(), Volume, Count, NewVolume]),
{noreply, maybe_notify(State, State#lim{volume = NewVolume})};
handle_cast({register, QPid}, State) ->
@@ -163,37 +169,83 @@ limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
remember_queue(QPid, State = #lim{queues = Queues}) ->
case dict:is_key(QPid, Queues) of
false -> MRef = erlang:monitor(process, QPid),
- State#lim{queues = dict:store(QPid, {MRef, false}, Queues)};
+ State#lim{queues = dict:store(QPid, {MRef, false, 0}, Queues)};
true -> State
end.
forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) ->
case dict:find(QPid, Queues) of
- {ok, {MRef, _}} ->
+ {ok, {MRef, _, _}} ->
true = erlang:demonitor(MRef),
- ok = rabbit_amqqueue:unblock(QPid, ChPid),
+ unblock(QPid, ChPid),
State#lim{queues = dict:erase(QPid, Queues)};
error -> State
end.
-limit_queue(QPid, State = #lim{queues = Queues}) ->
- UpdateFun = fun ({MRef, _}) -> {MRef, true} end,
+limit_queue(QPid, Length, State = #lim{queues = Queues}) ->
+ UpdateFun = fun ({MRef, _, _}) -> {MRef, true, Length} end,
State#lim{queues = dict:update(QPid, UpdateFun, Queues)}.
-notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
- {QList, NewQueues} =
- dict:fold(fun (_QPid, {_, false}, Acc) -> Acc;
- (QPid, {MRef, true}, {L, D}) ->
- {[QPid | L], dict:store(QPid, {MRef, false}, D)}
- end, {[], Queues}, Queues),
- case length(QList) of
- 0 -> ok;
- L ->
- %% We randomly vary the position of queues in the list,
- %% thus ensuring that each queue has an equal chance of
- %% being notified first.
- {L1, L2} = lists:split(random:uniform(L), QList),
- [ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L2 ++ L1],
- ok
- end,
+update_length(QPid, Length, State = #lim{queues = Queues}) ->
+ UpdateFun = fun ({MRef, Notify, _}) -> {MRef, Notify, Length} end,
+ State#lim{queues = dict:update(QPid, UpdateFun, Queues)}.
+
+notify_queues(State = #lim{ch_pid = ChPid, queues = Queues,
+ prefetch_count = PrefetchCount, volume = Volume}) ->
+ QList =
+ dict:fold(fun (_QPid, {_, false, _}, Acc) -> Acc;
+ (QPid, {_MRef, true, Length}, L) ->
+ gb_trees:enter(Length, QPid, L)
+ end, gb_trees:empty(), Queues),
+ NewQueues =
+ case gb_trees:size(QList) of
+ 0 -> Queues;
+ QCount ->
+ Capacity = PrefetchCount - Volume,
+ {BiggestLength, _QPid} = gb_trees:largest(QList),
+ BiggestLength1 = lists:max([2*BiggestLength, 1]),
+ %% try to tell enough queues that we guarantee we'll get
+ %% blocked again
+ {Capacity1, NewQueues1} =
+ unblock_queue(ChPid, BiggestLength1, Capacity, QList,
+ Queues),
+ case 0 == Capacity1 of
+ true -> NewQueues1;
+ false -> %% just tell everyone
+ {_Capacity2, NewQueues2} =
+ unblock_queue(ChPid, 1, QCount, QList, NewQueues1),
+ NewQueues2
+ end
+ end,
State#lim{queues = NewQueues}.
+
+unblock_queue(_ChPid, _L, 0, _QList, Queues) ->
+ {0, Queues};
+unblock_queue(ChPid, L, QueueCount, QList, Queues) ->
+ UpdateFunUnBlock = fun ({MRef, _, Length}) -> {MRef, false, Length} end,
+ {Length, QPid, QList1} = gb_trees:take_largest(QList),
+ {_MRef, Blocked, Length} = dict:fetch(QPid, Queues),
+ {QueueCount1, Queues1} =
+ case Blocked of
+ false ->
+ {QueueCount, Queues};
+ true ->
+ %% if 0 == Length, and L == 1, we still need to inform the q
+ case Length + 1 >= random:uniform(L) of
+ true -> case unblock(QPid, ChPid) of
+ true -> {QueueCount - 1,
+ dict:update(QPid, UpdateFunUnBlock, Queues)};
+ false -> {QueueCount, Queues}
+ end;
+ false -> {QueueCount, Queues}
+ end
+ end,
+ case gb_trees:is_empty(QList1) of
+ true -> {QueueCount1, Queues1};
+ false -> unblock_queue(ChPid, L, QueueCount1, QList1, Queues1)
+ end.
+
+unblock(QPid, ChPid) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> false end,
+ fun () -> rabbit_amqqueue:unblock(QPid, ChPid) end).