diff options
author | Matthias Radestock <matthias@lshift.net> | 2008-12-23 20:47:16 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2008-12-23 20:47:16 +0000 |
commit | cd234a9d8c95c5ef501554935d879b020a9678fa (patch) | |
tree | 23c59906cff7bed5746a1e39f3aec7cc93969a39 | |
parent | ef5e395acb15c1ffd2ecd5b82cc3ee7208890ea5 (diff) | |
download | rabbitmq-server-cd234a9d8c95c5ef501554935d879b020a9678fa.tar.gz |
deal with limiting after consumer subscription
-rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 38 |
3 files changed, 43 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 24ded98c..a345f5ab 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -40,7 +40,7 @@ -export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2]). -export([unblock/2]). --export([commit_all/2, rollback_all/2, notify_down_all/2]). +-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -import(mnesia). @@ -92,6 +92,7 @@ -spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). +-spec(limit_all/3 :: ([pid()], pid(), pid()) -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). @@ -261,6 +262,12 @@ notify_down_all(QPids, ChPid) -> fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end, QPids). +limit_all(QPids, ChPid, LimiterPid) -> + safe_pmap_ok( + fun (_) -> ok end, + fun (QPid) -> gen_server:cast(QPid, {limit, ChPid, LimiterPid}) end, + QPids). + claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> gen_server:call(QPid, {claim_queue, ReaderPid}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c01f08df..c6bb0502 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -781,7 +781,20 @@ handle_cast({notify_sent, ChPid}, State) -> possibly_unblock(State, ChPid, fun (C = #cr{unsent_message_count = Count}) -> C#cr{unsent_message_count = Count - 1} - end)). + end)); + +handle_cast({limit, ChPid, LimiterPid}, State) -> + case lookup_ch(ChPid) of + not_found -> + ok; + C = #cr{consumers = Consumers} -> + if Consumers =/= [] -> + ok = rabbit_limiter:register(LimiterPid, self()); + true -> ok + end, + store_ch_record(C#cr{limiter_pid = LimiterPid}) + end, + noreply(State). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 001fa4af..51e550ed 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -439,11 +439,11 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, %% TODO: terminate limiter when transitioning to 'unlimited' NewLimiter = case Limiter of undefined -> - %% TODO: tell queues with subscribers about - %% the limiter - rabbit_limiter:start_link(ProxyPid); - Pid -> - Pid + LPid = rabbit_limiter:start_link(ProxyPid), + ok = limit_queues(LPid, State), + LPid; + LPid -> + LPid end, ok = rabbit_limiter:limit(NewLimiter, PrefetchCount), {reply, #'basic.qos_ok'{}, State#ch{limiter = NewLimiter}}; @@ -832,18 +832,22 @@ fold_per_queue(F, Acc0, UAQ) -> Acc0, D). notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> - rabbit_amqqueue:notify_down_all( - [QPid || QueueName <- - sets:to_list( - dict:fold(fun (_ConsumerTag, QueueName, S) -> - sets:add_element(QueueName, S) - end, sets:new(), Consumers)), - case rabbit_amqqueue:lookup(QueueName) of - {ok, Q} -> QPid = Q#amqqueue.pid, true; - %% queue has been deleted in the meantime - {error, not_found} -> QPid = none, false - end], - ProxyPid). + rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), ProxyPid). + +limit_queues(LPid, #ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> + rabbit_amqqueue:limit_all(consumer_queues(Consumers), ProxyPid, LPid). + +consumer_queues(Consumers) -> + [QPid || QueueName <- + sets:to_list( + dict:fold(fun (_ConsumerTag, QueueName, S) -> + sets:add_element(QueueName, S) + end, sets:new(), Consumers)), + case rabbit_amqqueue:lookup(QueueName) of + {ok, Q} -> QPid = Q#amqqueue.pid, true; + %% queue has been deleted in the meantime + {error, not_found} -> QPid = none, false + end]. %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, rather than those |