diff options
author | Matthias Radestock <matthias@lshift.net> | 2008-12-23 16:28:22 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2008-12-23 16:28:22 +0000 |
commit | ef5e395acb15c1ffd2ecd5b82cc3ee7208890ea5 (patch) | |
tree | 2b7a497e23759e73b38018f3bd0970ad7378e368 | |
parent | 62874e89b1cd969e12ffc288fb43f46f1b72e05d (diff) | |
download | rabbitmq-server-ef5e395acb15c1ffd2ecd5b82cc3ee7208890ea5.tar.gz |
create limiter lazily
This makes an 'unlimited' channel as efficient as it used to be
-rw-r--r-- | src/rabbit_channel.erl | 25 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 2 |
2 files changed, 22 insertions, 5 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index af1923a7..001fa4af 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -108,8 +108,7 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, - % TODO See point 3.1.1 of the design - start the limiter lazily - limiter = rabbit_limiter:start_link(ProxyPid), + limiter = undefined, consumer_mapping = dict:new()}. handle_message({method, Method, Content}, State) -> @@ -430,11 +429,25 @@ handle_method(#'basic.qos'{prefetch_size = Size}, "Pre-fetch size (~s) for basic.qos not implementented", [Size]); -handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{limiter = Limiter}) -> - ok = rabbit_limiter:limit(Limiter, PrefetchCount), +handle_method(#'basic.qos'{prefetch_count = 0}, + _, State = #ch{ limiter = undefined }) -> {reply, #'basic.qos_ok'{}, State}; +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, + _, State = #ch{ limiter = Limiter, + proxy_pid = ProxyPid }) -> + %% TODO: terminate limiter when transitioning to 'unlimited' + NewLimiter = case Limiter of + undefined -> + %% TODO: tell queues with subscribers about + %% the limiter + rabbit_limiter:start_link(ProxyPid); + Pid -> + Pid + end, + ok = rabbit_limiter:limit(NewLimiter, PrefetchCount), + {reply, #'basic.qos_ok'{}, State#ch{limiter = NewLimiter}}; + handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, proxy_pid = ProxyPid, @@ -835,6 +848,8 @@ notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, rather than those %% for messages sent in a response to a basic.get +notify_limiter(undefined, _Acked) -> + ok; notify_limiter(Limiter, Acked) -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; ({_, _, _}, Acc) -> Acc + 1 diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index f1a45415..824de072 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -70,6 +70,8 @@ limit(LimiterPid, PrefetchCount) -> %% Ask the limiter whether the queue can deliver a message without %% breaching a limit +can_send(undefined, _QPid) -> + true; can_send(LimiterPid, QPid) -> gen_server:call(LimiterPid, {can_send, QPid}). |