summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2008-12-23 16:28:22 +0000
committerMatthias Radestock <matthias@lshift.net>2008-12-23 16:28:22 +0000
commitef5e395acb15c1ffd2ecd5b82cc3ee7208890ea5 (patch)
tree2b7a497e23759e73b38018f3bd0970ad7378e368
parent62874e89b1cd969e12ffc288fb43f46f1b72e05d (diff)
downloadrabbitmq-server-ef5e395acb15c1ffd2ecd5b82cc3ee7208890ea5.tar.gz
create limiter lazily
This makes an 'unlimited' channel as efficient as it used to be
-rw-r--r--src/rabbit_channel.erl25
-rw-r--r--src/rabbit_limiter.erl2
2 files changed, 22 insertions, 5 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index af1923a7..001fa4af 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -108,8 +108,7 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
username = Username,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- % TODO See point 3.1.1 of the design - start the limiter lazily
- limiter = rabbit_limiter:start_link(ProxyPid),
+ limiter = undefined,
consumer_mapping = dict:new()}.
handle_message({method, Method, Content}, State) ->
@@ -430,11 +429,25 @@ handle_method(#'basic.qos'{prefetch_size = Size},
"Pre-fetch size (~s) for basic.qos not implementented",
[Size]);
-handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
- _, State = #ch{limiter = Limiter}) ->
- ok = rabbit_limiter:limit(Limiter, PrefetchCount),
+handle_method(#'basic.qos'{prefetch_count = 0},
+ _, State = #ch{ limiter = undefined }) ->
{reply, #'basic.qos_ok'{}, State};
+handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
+ _, State = #ch{ limiter = Limiter,
+ proxy_pid = ProxyPid }) ->
+ %% TODO: terminate limiter when transitioning to 'unlimited'
+ NewLimiter = case Limiter of
+ undefined ->
+ %% TODO: tell queues with subscribers about
+ %% the limiter
+ rabbit_limiter:start_link(ProxyPid);
+ Pid ->
+ Pid
+ end,
+ ok = rabbit_limiter:limit(NewLimiter, PrefetchCount),
+ {reply, #'basic.qos_ok'{}, State#ch{limiter = NewLimiter}};
+
handle_method(#'basic.recover'{requeue = true},
_, State = #ch{ transaction_id = none,
proxy_pid = ProxyPid,
@@ -835,6 +848,8 @@ notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) ->
%% tell the limiter about the number of acks that have been received
%% for messages delivered to subscribed consumers, rather than those
%% for messages sent in a response to a basic.get
+notify_limiter(undefined, _Acked) ->
+ ok;
notify_limiter(Limiter, Acked) ->
case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
({_, _, _}, Acc) -> Acc + 1
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index f1a45415..824de072 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -70,6 +70,8 @@ limit(LimiterPid, PrefetchCount) ->
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
+can_send(undefined, _QPid) ->
+ true;
can_send(LimiterPid, QPid) ->
gen_server:call(LimiterPid, {can_send, QPid}).