summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2008-12-23 20:47:16 +0000
committerMatthias Radestock <matthias@lshift.net>2008-12-23 20:47:16 +0000
commitcd234a9d8c95c5ef501554935d879b020a9678fa (patch)
tree23c59906cff7bed5746a1e39f3aec7cc93969a39
parentef5e395acb15c1ffd2ecd5b82cc3ee7208890ea5 (diff)
downloadrabbitmq-server-cd234a9d8c95c5ef501554935d879b020a9678fa.tar.gz
deal with limiting after consumer subscription
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_amqqueue_process.erl15
-rw-r--r--src/rabbit_channel.erl38
3 files changed, 43 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 24ded98c..a345f5ab 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -40,7 +40,7 @@
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2]).
-export([unblock/2]).
--export([commit_all/2, rollback_all/2, notify_down_all/2]).
+-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-import(mnesia).
@@ -92,6 +92,7 @@
-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
+-spec(limit_all/3 :: ([pid()], pid(), pid()) -> ok_or_errors()).
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
@@ -261,6 +262,12 @@ notify_down_all(QPids, ChPid) ->
fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end,
QPids).
+limit_all(QPids, ChPid, LimiterPid) ->
+ safe_pmap_ok(
+ fun (_) -> ok end,
+ fun (QPid) -> gen_server:cast(QPid, {limit, ChPid, LimiterPid}) end,
+ QPids).
+
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
gen_server:call(QPid, {claim_queue, ReaderPid}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c01f08df..c6bb0502 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -781,7 +781,20 @@ handle_cast({notify_sent, ChPid}, State) ->
possibly_unblock(State, ChPid,
fun (C = #cr{unsent_message_count = Count}) ->
C#cr{unsent_message_count = Count - 1}
- end)).
+ end));
+
+handle_cast({limit, ChPid, LimiterPid}, State) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ ok;
+ C = #cr{consumers = Consumers} ->
+ if Consumers =/= [] ->
+ ok = rabbit_limiter:register(LimiterPid, self());
+ true -> ok
+ end,
+ store_ch_record(C#cr{limiter_pid = LimiterPid})
+ end,
+ noreply(State).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 001fa4af..51e550ed 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -439,11 +439,11 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
%% TODO: terminate limiter when transitioning to 'unlimited'
NewLimiter = case Limiter of
undefined ->
- %% TODO: tell queues with subscribers about
- %% the limiter
- rabbit_limiter:start_link(ProxyPid);
- Pid ->
- Pid
+ LPid = rabbit_limiter:start_link(ProxyPid),
+ ok = limit_queues(LPid, State),
+ LPid;
+ LPid ->
+ LPid
end,
ok = rabbit_limiter:limit(NewLimiter, PrefetchCount),
{reply, #'basic.qos_ok'{}, State#ch{limiter = NewLimiter}};
@@ -832,18 +832,22 @@ fold_per_queue(F, Acc0, UAQ) ->
Acc0, D).
notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) ->
- rabbit_amqqueue:notify_down_all(
- [QPid || QueueName <-
- sets:to_list(
- dict:fold(fun (_ConsumerTag, QueueName, S) ->
- sets:add_element(QueueName, S)
- end, sets:new(), Consumers)),
- case rabbit_amqqueue:lookup(QueueName) of
- {ok, Q} -> QPid = Q#amqqueue.pid, true;
- %% queue has been deleted in the meantime
- {error, not_found} -> QPid = none, false
- end],
- ProxyPid).
+ rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), ProxyPid).
+
+limit_queues(LPid, #ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) ->
+ rabbit_amqqueue:limit_all(consumer_queues(Consumers), ProxyPid, LPid).
+
+consumer_queues(Consumers) ->
+ [QPid || QueueName <-
+ sets:to_list(
+ dict:fold(fun (_ConsumerTag, QueueName, S) ->
+ sets:add_element(QueueName, S)
+ end, sets:new(), Consumers)),
+ case rabbit_amqqueue:lookup(QueueName) of
+ {ok, Q} -> QPid = Q#amqqueue.pid, true;
+ %% queue has been deleted in the meantime
+ {error, not_found} -> QPid = none, false
+ end].
%% tell the limiter about the number of acks that have been received
%% for messages delivered to subscribed consumers, rather than those