diff options
author | Ben Hood <0x6e6562@gmail.com> | 2008-12-07 01:19:46 +0000 |
---|---|---|
committer | Ben Hood <0x6e6562@gmail.com> | 2008-12-07 01:19:46 +0000 |
commit | 54967f60e2ccf692e84c286582ede2f00fe365d8 (patch) | |
tree | ab2f728299a0cdfb9816b8125d39e81a4e33c709 | |
parent | 8bca4d5e133b088e5ab493211343c57116726177 (diff) | |
parent | c4f8ddb5cb914b0f825a5c8fc30df594f92c8703 (diff) | |
download | rabbitmq-server-54967f60e2ccf692e84c286582ede2f00fe365d8.tar.gz |
Merged default into 18557
-rw-r--r-- | src/rabbit_amqqueue.erl | 17 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 115 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 27 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 120 |
4 files changed, 236 insertions, 43 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4b318eeb..d142507d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -30,8 +30,9 @@ -export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1, stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). -export([claim_queue/2]). --export([basic_get/3, basic_consume/7, basic_cancel/4]). +-export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2]). +-export([unblock/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2]). -export([on_node_down/1]). @@ -82,12 +83,13 @@ -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). --spec(basic_consume/7 :: - (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) -> +-spec(basic_consume/8 :: + (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). +-spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -238,10 +240,10 @@ claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> gen_server:call(QPid, {basic_get, ChPid, NoAck}). -basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, - ConsumerTag, ExclusiveConsume, OkMsg}). + gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). @@ -249,6 +251,9 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> notify_sent(QPid, ChPid) -> gen_server:cast(QPid, {notify_sent, ChPid}). +unblock(QPid, ChPid) -> + gen_server:cast(QPid, {unblock, ChPid}). + internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f8964e34..b4d0d52d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -56,8 +56,10 @@ %% These are held in our process dictionary -record(cr, {consumers, ch_pid, + limiter_pid, monitor_ref, unacked_messages, + is_limit_active, is_overload_protection_active, unsent_message_count}). @@ -124,18 +126,22 @@ all_ch_record() -> [C || {{ch, _}, C} <- get()]. update_store_and_maybe_block_ch( - C = #cr{is_overload_protection_active = Active, + C = #cr{is_overload_protection_active = Overloaded, + is_limit_active = Limited, unsent_message_count = Count}) -> - {Result, NewActive} = + {Result, NewOverloaded, NewLimited} = if - not(Active) and (Count > ?UNSENT_MESSAGE_LIMIT) -> - {block_ch, true}; - Active and (Count == 0) -> - {unblock_ch, false}; + not(Overloaded) and (Count > ?UNSENT_MESSAGE_LIMIT) -> + {block_ch, true, Limited}; + Overloaded and (Count == 0) -> + {unblock_ch, false, Limited}; + Limited and (Count < ?UNSENT_MESSAGE_LIMIT) -> + {unblock_ch, Overloaded, false}; true -> - {ok, Active} + {ok, Overloaded, Limited} end, - store_ch_record(C#cr{is_overload_protection_active = NewActive}), + store_ch_record(C#cr{is_overload_protection_active = NewOverloaded, + is_limit_active = NewLimited}), Result. deliver_immediately(Message, Delivered, @@ -144,31 +150,59 @@ deliver_immediately(Message, Delivered, next_msg_id = NextId}) -> ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), case queue:out(RoundRobin) of - {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, - ack_required = AckRequired}}}, + {{value, QEntry = {ChPid, + #consumer{tag = ConsumerTag, + ack_required = AckRequired = true}}}, RoundRobinTail} -> - rabbit_channel:deliver( - ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, Delivered, Message}), - C = #cr{unsent_message_count = Count, - unacked_messages = UAM} = ch_record(ChPid), - NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); - false -> UAM - end, - NewConsumers = - case update_store_and_maybe_block_ch( - C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}) of - ok -> queue:in(QEntry, RoundRobinTail); - block_ch -> block_consumers(ChPid, RoundRobinTail) - end, - {offered, AckRequired, State#q{round_robin = NewConsumers, - next_msg_id = NextId +1}}; + % Use Qos Limits if an ack is required + % Query the limiter to find out if a limit has been breached + #cr{limiter_pid = LimiterPid} = ch_record(ChPid), + case rabbit_limiter:can_send(LimiterPid, self()) of + true -> + really_deliver(AckRequired, ChPid, ConsumerTag, + Delivered, Message, NextId, QName, + QEntry, RoundRobinTail, State); + false -> + % Have another go by cycling through the consumer + % queue + C = ch_record(ChPid), + store_ch_record(C#cr{is_limit_active = true}), + NewConsumers = block_consumers(ChPid, RoundRobinTail), + deliver_immediately(Message, Delivered, + State#q{round_robin = NewConsumers}) + end; + {{value, QEntry = {ChPid, + #consumer{tag = ConsumerTag, + ack_required = AckRequired = false}}}, + RoundRobinTail} -> + really_deliver(AckRequired, ChPid, ConsumerTag, + Delivered, Message, NextId, QName, + QEntry, RoundRobinTail, State); {empty, _} -> not_offered end. +% TODO The arity of this function seems a bit large :-( +really_deliver(AckRequired, ChPid, ConsumerTag, Delivered, Message, NextId, + QName, QEntry, RoundRobinTail, State) -> + rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, + {QName, self(), NextId, Delivered, Message}), + C = #cr{unsent_message_count = Count, + unacked_messages = UAM} = ch_record(ChPid), + NewUAM = case AckRequired of + true -> dict:store(NextId, Message, UAM); + false -> UAM + end, + NewConsumers = + case update_store_and_maybe_block_ch( + C#cr{unsent_message_count = Count + 1, + unacked_messages = NewUAM}) of + ok -> queue:in(QEntry, RoundRobinTail); + block_ch -> block_consumers(ChPid, RoundRobinTail) + end, + {offered, AckRequired, State#q{round_robin = NewConsumers, + next_msg_id = NextId +1}}. + attempt_delivery(none, Message, State) -> case deliver_immediately(Message, false, State) of {offered, false, State1} -> @@ -280,7 +314,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, {stop, normal, NewState} end end. - + cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) -> none; cancel_holder(_ChPid, _ConsumerTag, Holder) -> @@ -519,8 +553,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply(empty, State) end; -handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, - ExclusiveConsume, OkMsg}, +handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, + ConsumerTag, ExclusiveConsume, OkMsg}, _From, State = #q{owner = Owner, exclusive_consumer = ExistingHolder, round_robin = RoundRobin}) -> @@ -534,7 +568,8 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, ok -> C = #cr{consumers = Consumers} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, - C1 = C#cr{consumers = [Consumer | Consumers]}, + C1 = C#cr{consumers = [Consumer | Consumers], + limiter_pid = LimiterPid}, store_ch_record(C1), State1 = State#q{has_had_consumers = true, exclusive_consumer = @@ -630,7 +665,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> noreply(State); - C = #cr{unacked_messages = UAM} -> + C = #cr{unacked_messages = UAM, limiter_pid = LimiterPid} -> {Acked, Remaining} = collect_messages(MsgIds, UAM), persist_acks(Txn, qname(State), Acked), case Txn of @@ -663,6 +698,20 @@ handle_cast({requeue, MsgIds, ChPid}, State) -> [{Message, true} || Message <- Messages], State)) end; +handle_cast({unblock, ChPid}, State) -> + % TODO Refactor the code duplication + % between this an the notify_sent cast handler + case lookup_ch(ChPid) of + not_found -> + noreply(State); + C = #cr{is_limit_active = true} -> + noreply(possibly_unblock(C, State)); + C -> + rabbit_log:warning("Ignoring unblock for an active ch: ~p~n", + [C]), + noreply(State) + end; + handle_cast({notify_sent, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> noreply(State); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 1eb421ca..f9f92959 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,7 +36,7 @@ -record(ch, {state, proxy_pid, reader_pid, writer_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, - username, virtual_host, + username, virtual_host, limiter, most_recently_declared_queue, consumer_mapping}). %%---------------------------------------------------------------------------- @@ -102,6 +102,8 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, + % TODO See point 3.1.1 of the design - start the limiter lazily + limiter = rabbit_limiter:start_link(ProxyPid), consumer_mapping = dict:new()}. handle_message({method, Method, Content}, State) -> @@ -269,6 +271,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{transaction_id = TxnKey, + limiter = Limiter, next_tag = NextDeliveryTag, unacked_message_q = UAMQ}) -> if DeliveryTag >= NextDeliveryTag -> @@ -277,6 +280,20 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, true -> ok end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), + % CC the limiter on the number of acks that have been received + % but don't include any acks from a basic.get bottom half + % (hence the differentiation between tags set to none and other tags) + % TODO - this is quite crude and is probably more expensive than it should + % be - according to the OTP documentation, len/1 runs in O(n), probably + % not so cool for a queuing system + NotBasicGet = queue:filter( + fun({_CurrentDeliveryTag, ConsumerTag, _Msg}) -> + case ConsumerTag of + none -> false; + _ -> true + end + end, Acked), + rabbit_limiter:decrement_capacity(Limiter, queue:len(NotBasicGet)), Participants = ack(State#ch.proxy_pid, TxnKey, Acked), {noreply, case TxnKey of none -> State#ch{unacked_message_q = Remaining}; @@ -323,6 +340,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{ proxy_pid = ProxyPid, reader_pid = ReaderPid, + limiter = LimiterPid, consumer_mapping = ConsumerMapping }) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -340,7 +358,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, fun (Q) -> rabbit_amqqueue:basic_consume( - Q, NoAck, ReaderPid, ProxyPid, + Q, NoAck, ReaderPid, ProxyPid, LimiterPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -405,8 +423,9 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, end end; -handle_method(#'basic.qos'{}, _, State) -> - %% FIXME: Need to implement QOS +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, + _, State = #ch{limiter = Limiter}) -> + rabbit_limiter:set_prefetch_count(Limiter, PrefetchCount), {reply, #'basic.qos_ok'{}, State}; handle_method(#'basic.recover'{requeue = true}, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl new file mode 100644 index 00000000..4e130ea0 --- /dev/null +++ b/src/rabbit_limiter.erl @@ -0,0 +1,120 @@ +%% TODO Decide what to do with the license statement now that Cohesive have +%% bailed. +-module(rabbit_limiter). + + +% I'm starting out with a gen_server because of the synchronous query +% that the queue process makes +-behaviour(gen_server). + +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, + handle_info/2]). +-export([start_link/1]). +-export([set_prefetch_count/2, can_send/2, decrement_capacity/2]). + +-record(lim, {prefetch_count = 0, + ch_pid, + queues = sets:new(), + in_use = 0}). + +%--------------------------------------------------------------------------- +% API +%--------------------------------------------------------------------------- + +% Kicks this pig +start_link(ChPid) -> + {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []), + Pid. + +set_prefetch_count(LimiterPid, PrefetchCount) -> + gen_server:cast(LimiterPid, {prefetch_count, PrefetchCount}). + +% Queries the limiter to ask whether the queue can deliver a message +% without breaching a limit +can_send(LimiterPid, QPid) -> + gen_server:call(LimiterPid, {can_send, QPid}). + +% Lets the limiter know that a queue has received an ack from a consumer +% and hence can reduce the in-use-by-that queue capcity information +decrement_capacity(LimiterPid, Magnitude) -> + gen_server:cast(LimiterPid, {decrement_capacity, Magnitude}). + +%--------------------------------------------------------------------------- +% gen_server callbacks +%--------------------------------------------------------------------------- + +init([ChPid]) -> + {ok, #lim{ch_pid = ChPid} }. + +% This queuries the limiter to ask if it is possible to send a message without +% breaching a limit for this queue process +handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse, + queues = Queues}) -> + NewState = State#lim{queues = sets:add_element(QPid, Queues)}, + case limit_reached(NewState) of + true -> {reply, false, NewState}; + false -> + {reply, true, NewState#lim{in_use = InUse + 1}} + end. + +% When the new limit is larger than the existing limit, +% notify all queues and forget about queues with an in-use +% capcity of zero +handle_cast({prefetch_count, PrefetchCount}, + State = #lim{prefetch_count = CurrentLimit}) + when PrefetchCount > CurrentLimit -> + notify_queues(State), + {noreply, State#lim{prefetch_count = PrefetchCount, + queues = sets:new(), + in_use = 0}}; + +% Default setter of the prefetch count +handle_cast({prefetch_count, PrefetchCount}, State) -> + {noreply, State#lim{prefetch_count = PrefetchCount}}; + +% This is an asynchronous ack from a queue that it has received an ack from +% a queue. This allows the limiter to update the the in-use-by-that queue +% capacity infromation. +handle_cast({decrement_capacity, Magnitude}, State = #lim{in_use = InUse}) -> + NewState = decrement_in_use(Magnitude, State), + ShouldNotify = limit_reached(State) and not(limit_reached(NewState)), + if + ShouldNotify -> + notify_queues(State), + {noreply, State#lim{queues = sets:new(), in_use = InUse - 1}}; + true -> + {noreply, NewState} + end. + +handle_info(_, State) -> + {noreply, State}. + +terminate(_, _) -> + ok. + +code_change(_, State, _) -> + State. + +%--------------------------------------------------------------------------- +% Internal plumbing +%--------------------------------------------------------------------------- + +% Reduces the in-use-count of the queue by a specific magnitude +decrement_in_use(_, State = #lim{in_use = 0}) -> + State#lim{in_use = 0}; + +decrement_in_use(Magnitude, State = #lim{in_use = InUse}) -> + State#lim{in_use = InUse - Magnitude}. + +% Unblocks every queue that this limiter knows about +notify_queues(#lim{ch_pid = ChPid, queues = Queues}) -> + sets:fold(fun(Q,_) -> rabbit_amqqueue:unblock(Q, ChPid) end, [], Queues). + +% A prefetch limit of zero means unlimited +limit_reached(#lim{prefetch_count = 0}) -> + false; + +% Works out whether the limit is breached for the current limiter state +limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) -> + InUse == Limit. + |