diff options
author | Matthias Radestock <matthias@lshift.net> | 2008-12-23 15:02:32 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2008-12-23 15:02:32 +0000 |
commit | 0d160f405915259c1b7aa64a8f8c1ecf07978434 (patch) | |
tree | 738f04023d34bb9a85de835a37d0fe0f3616a5e7 | |
parent | cf1272653c67dd02824a416999f6cfb1891f829c (diff) | |
parent | 05fa30a345228631d8c7002252ad133ef38e0e9e (diff) | |
download | rabbitmq-server-0d160f405915259c1b7aa64a8f8c1ecf07978434.tar.gz |
merge default into bug18557
-rw-r--r-- | ebin/rabbit.app | 1 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 17 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 155 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 40 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 145 |
5 files changed, 286 insertions, 72 deletions
diff --git a/ebin/rabbit.app b/ebin/rabbit.app index 70a13208..e377a33a 100644 --- a/ebin/rabbit.app +++ b/ebin/rabbit.app @@ -19,6 +19,7 @@ rabbit_framing_channel, rabbit_framing, rabbit_heartbeat, + rabbit_limiter, rabbit_load, rabbit_log, rabbit_memsup_linux, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2b9abb29..24ded98c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -37,8 +37,9 @@ stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). --export([basic_get/3, basic_consume/7, basic_cancel/4]). +-export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2]). +-export([unblock/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2]). -export([on_node_down/1]). @@ -94,12 +95,13 @@ -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). --spec(basic_consume/7 :: - (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) -> +-spec(basic_consume/8 :: + (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). +-spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -265,10 +267,10 @@ claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> gen_server:call(QPid, {basic_get, ChPid, NoAck}). -basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, - ConsumerTag, ExclusiveConsume, OkMsg}). + gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). @@ -276,6 +278,9 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> notify_sent(QPid, ChPid) -> gen_server:cast(QPid, {notify_sent, ChPid}). +unblock(QPid, ChPid) -> + gen_server:cast(QPid, {unblock, ChPid}). + internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 709e355e..53b569b4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -62,9 +62,10 @@ %% These are held in our process dictionary -record(cr, {consumers, ch_pid, + limiter_pid, monitor_ref, unacked_messages, - is_overload_protection_active, + is_limit_active, unsent_message_count}). -define(INFO_KEYS, @@ -131,7 +132,7 @@ ch_record(ChPid) -> ch_pid = ChPid, monitor_ref = MonitorRef, unacked_messages = dict:new(), - is_overload_protection_active = false, + is_limit_active = false, unsent_message_count = 0}, put(Key, C), C; @@ -144,20 +145,16 @@ store_ch_record(C = #cr{ch_pid = ChPid}) -> all_ch_record() -> [C || {{ch, _}, C} <- get()]. -update_store_and_maybe_block_ch( - C = #cr{is_overload_protection_active = Active, - unsent_message_count = Count}) -> - {Result, NewActive} = - if - not(Active) and (Count > ?UNSENT_MESSAGE_LIMIT) -> - {block_ch, true}; - Active and (Count == 0) -> - {unblock_ch, false}; - true -> - {ok, Active} - end, - store_ch_record(C#cr{is_overload_protection_active = NewActive}), - Result. +is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> + Limited orelse Count > ?UNSENT_MESSAGE_LIMIT. + +ch_record_state_transition(OldCR, NewCR) -> + BlockedOld = is_ch_blocked(OldCR), + BlockedNew = is_ch_blocked(NewCR), + if BlockedOld andalso not(BlockedNew) -> unblock; + BlockedNew andalso not(BlockedOld) -> block; + true -> ok + end. deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, @@ -165,31 +162,59 @@ deliver_immediately(Message, Delivered, next_msg_id = NextId}) -> ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), case queue:out(RoundRobin) of - {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, - ack_required = AckRequired}}}, + {{value, QEntry = {ChPid, + #consumer{tag = ConsumerTag, + ack_required = AckRequired = true}}}, RoundRobinTail} -> - rabbit_channel:deliver( - ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, Delivered, Message}), - C = #cr{unsent_message_count = Count, - unacked_messages = UAM} = ch_record(ChPid), - NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); - false -> UAM - end, - NewConsumers = - case update_store_and_maybe_block_ch( - C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}) of - ok -> queue:in(QEntry, RoundRobinTail); - block_ch -> block_consumers(ChPid, RoundRobinTail) - end, - {offered, AckRequired, State#q{round_robin = NewConsumers, - next_msg_id = NextId +1}}; + % Use Qos Limits if an ack is required + % Query the limiter to find out if a limit has been breached + C = #cr{limiter_pid = LimiterPid} = ch_record(ChPid), + case rabbit_limiter:can_send(LimiterPid, self()) of + true -> + really_deliver(AckRequired, ChPid, ConsumerTag, + Delivered, Message, NextId, QName, + QEntry, RoundRobinTail, State); + false -> + % Have another go by cycling through the consumer + % queue + store_ch_record(C#cr{is_limit_active = true}), + NewConsumers = block_consumers(ChPid, RoundRobinTail), + deliver_immediately(Message, Delivered, + State#q{round_robin = NewConsumers}) + end; + {{value, QEntry = {ChPid, + #consumer{tag = ConsumerTag, + ack_required = AckRequired = false}}}, + RoundRobinTail} -> + really_deliver(AckRequired, ChPid, ConsumerTag, + Delivered, Message, NextId, QName, + QEntry, RoundRobinTail, State); {empty, _} -> - not_offered + {not_offered, State} end. +% TODO The arity of this function seems a bit large :-( +really_deliver(AckRequired, ChPid, ConsumerTag, Delivered, Message, NextId, + QName, QEntry, RoundRobinTail, State) -> + rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, + {QName, self(), NextId, Delivered, Message}), + C = #cr{unsent_message_count = Count, + unacked_messages = UAM} = ch_record(ChPid), + NewUAM = case AckRequired of + true -> dict:store(NextId, Message, UAM); + false -> UAM + end, + NewC = C#cr{unsent_message_count = Count + 1, + unacked_messages = NewUAM}, + store_ch_record(NewC), + NewConsumers = + case ch_record_state_transition(C, NewC) of + ok -> queue:in(QEntry, RoundRobinTail); + block -> block_consumers(ChPid, RoundRobinTail) + end, + {offered, AckRequired, State#q{round_robin = NewConsumers, + next_msg_id = NextId +1}}. + attempt_delivery(none, Message, State) -> case deliver_immediately(Message, false, State) of {offered, false, State1} -> @@ -198,8 +223,8 @@ attempt_delivery(none, Message, State) -> persist_message(none, qname(State), Message), persist_delivery(qname(State), Message, false), {true, State1}; - not_offered -> - {false, State} + {not_offered, State1} -> + {false, State1} end; attempt_delivery(Txn, Message, State) -> persist_message(Txn, qname(State), Message), @@ -237,16 +262,22 @@ block_consumer(ChPid, ConsumerTag, RoundRobin) -> (CP /= ChPid) or (CT /= ConsumerTag) end, queue:to_list(RoundRobin))). -possibly_unblock(C = #cr{consumers = Consumers, ch_pid = ChPid}, - State = #q{round_robin = RoundRobin}) -> - case update_store_and_maybe_block_ch(C) of - ok -> +possibly_unblock(State, ChPid, Update) -> + case lookup_ch(ChPid) of + not_found -> State; - unblock_ch -> - run_poke_burst(State#q{round_robin = - unblock_consumers(ChPid, Consumers, RoundRobin)}) + C -> + NewC = Update(C), + store_ch_record(NewC), + case ch_record_state_transition(C, NewC) of + ok -> State; + unblock -> NewRR = unblock_consumers(ChPid, + NewC#cr.consumers, + State#q.round_robin), + run_poke_burst(State#q{round_robin = NewRR}) + end end. - + check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) -> {continue, State}; check_auto_delete(State = #q{has_had_consumers = false}) -> @@ -301,7 +332,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, {stop, normal, NewState} end end. - + cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) -> none; cancel_holder(_ChPid, _ConsumerTag, Holder) -> @@ -334,8 +365,8 @@ run_poke_burst(MessageBuffer, State) -> {offered, false, NewState} -> persist_auto_ack(qname(State), Message), run_poke_burst(BufferTail, NewState); - not_offered -> - State#q{message_buffer = MessageBuffer} + {not_offered, NewState} -> + NewState#q{message_buffer = MessageBuffer} end; {empty, _} -> State#q{message_buffer = MessageBuffer} @@ -585,8 +616,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply(empty, State) end; -handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, - ExclusiveConsume, OkMsg}, +handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, + ConsumerTag, ExclusiveConsume, OkMsg}, _From, State = #q{owner = Owner, exclusive_consumer = ExistingHolder, round_robin = RoundRobin}) -> @@ -600,7 +631,8 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, ok -> C = #cr{consumers = Consumers} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, - C1 = C#cr{consumers = [Consumer | Consumers]}, + C1 = C#cr{consumers = [Consumer | Consumers], + limiter_pid = LimiterPid}, store_ch_record(C1), State1 = State#q{has_had_consumers = true, exclusive_consumer = @@ -729,14 +761,17 @@ handle_cast({requeue, MsgIds, ChPid}, State) -> [{Message, true} || Message <- Messages], State)) end; +handle_cast({unblock, ChPid}, State) -> + noreply( + possibly_unblock(State, ChPid, + fun (C) -> C#cr{is_limit_active = false} end)); + handle_cast({notify_sent, ChPid}, State) -> - case lookup_ch(ChPid) of - not_found -> noreply(State); - T = #cr{unsent_message_count =Count} -> - noreply(possibly_unblock( - T#cr{unsent_message_count = Count - 1}, - State)) - end. + noreply( + possibly_unblock(State, ChPid, + fun (C = #cr{unsent_message_count = Count}) -> + C#cr{unsent_message_count = Count - 1} + end)). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 19104bcb..af1923a7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -42,7 +42,7 @@ -record(ch, {state, proxy_pid, reader_pid, writer_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, - username, virtual_host, + username, virtual_host, limiter, most_recently_declared_queue, consumer_mapping}). %%---------------------------------------------------------------------------- @@ -108,6 +108,8 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, + % TODO See point 3.1.1 of the design - start the limiter lazily + limiter = rabbit_limiter:start_link(ProxyPid), consumer_mapping = dict:new()}. handle_message({method, Method, Content}, State) -> @@ -290,7 +292,8 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), Participants = ack(State#ch.proxy_pid, TxnKey, Acked), {noreply, case TxnKey of - none -> State#ch{unacked_message_q = Remaining}; + none -> ok = notify_limiter(State#ch.limiter, Acked), + State#ch{unacked_message_q = Remaining}; _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q, Acked), add_tx_participants( @@ -334,6 +337,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{ proxy_pid = ProxyPid, reader_pid = ReaderPid, + limiter = LimiterPid, consumer_mapping = ConsumerMapping }) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -351,7 +355,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, fun (Q) -> rabbit_amqqueue:basic_consume( - Q, NoAck, ReaderPid, ProxyPid, + Q, NoAck, ReaderPid, ProxyPid, LimiterPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -416,8 +420,19 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, end end; -handle_method(#'basic.qos'{}, _, State) -> - %% FIXME: Need to implement QOS +handle_method(#'basic.qos'{global = Flag = true}, _, _State) -> + rabbit_misc:protocol_error(not_implemented, + "Global flag (~s) for basic.qos not implementented", [Flag]); + +handle_method(#'basic.qos'{prefetch_size = Size}, + _, _State) when Size /= 0 -> + rabbit_misc:protocol_error(not_implemented, + "Pre-fetch size (~s) for basic.qos not implementented", + [Size]); + +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, + _, State = #ch{limiter = Limiter}) -> + ok = rabbit_limiter:limit(Limiter, PrefetchCount), {reply, #'basic.qos_ok'{}, State}; handle_method(#'basic.recover'{requeue = true}, @@ -764,7 +779,9 @@ internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> case rabbit_amqqueue:commit_all(sets:to_list(Participants), TxnKey) of - ok -> new_tx(State); + ok -> ok = notify_limiter(State#ch.limiter, + State#ch.uncommitted_ack_q), + new_tx(State); {error, Errors} -> rabbit_misc:protocol_error( internal_error, "commit failed: ~w", [Errors]) end. @@ -815,6 +832,17 @@ notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> end], ProxyPid). +%% tell the limiter about the number of acks that have been received +%% for messages delivered to subscribed consumers, rather than those +%% for messages sent in a response to a basic.get +notify_limiter(Limiter, Acked) -> + case lists:foldl(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, queue:to_list(Acked)) of + 0 -> ok; + Count -> rabbit_limiter:ack(Limiter, Count) + end. + is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> case Mode of diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl new file mode 100644 index 00000000..12632625 --- /dev/null +++ b/src/rabbit_limiter.erl @@ -0,0 +1,145 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_limiter). + +-behaviour(gen_server). + +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, + handle_info/2]). +-export([start_link/1]). +-export([limit/2, can_send/2, ack/2]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(limit/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(can_send/2 :: (pid(), pid()) -> bool()). +-spec(ack/2 :: (pid(), non_neg_integer()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +-record(lim, {prefetch_count = 0, + ch_pid, + queues = dict:new(), + in_use = 0}). + +%%---------------------------------------------------------------------------- +%% API +%%---------------------------------------------------------------------------- + +start_link(ChPid) -> + {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []), + Pid. + +limit(LimiterPid, PrefetchCount) -> + gen_server:cast(LimiterPid, {limit, PrefetchCount}). + +%% Ask the limiter whether the queue can deliver a message without +%% breaching a limit +can_send(LimiterPid, QPid) -> + gen_server:call(LimiterPid, {can_send, QPid}). + +%% Let the limiter know that the channel has received some acks from a +%% consumer +ack(LimiterPid, Count) -> + gen_server:cast(LimiterPid, {ack, Count}). + +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- + +init([ChPid]) -> + {ok, #lim{ch_pid = ChPid} }. + +handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) -> + case limit_reached(State) of + true -> {reply, false, remember_queue(QPid, State)}; + false -> {reply, true, State#lim{in_use = InUse + 1}} + end. + +handle_cast({limit, PrefetchCount}, State) -> + {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})}; + +handle_cast({ack, Count}, State = #lim{in_use = InUse}) -> + NewInUse = if InUse == 0 -> 0; + true -> InUse - Count + end, + {noreply, maybe_notify(State, State#lim{in_use = NewInUse})}. + +handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, + State = #lim{queues = Queues}) -> + {noreply, State#lim{queues = dict:erase(QPid, Queues)}}. + +terminate(_, _) -> + ok. + +code_change(_, State, _) -> + State. + +%%---------------------------------------------------------------------------- +%% Internal plumbing +%%---------------------------------------------------------------------------- + +maybe_notify(OldState, NewState) -> + case limit_reached(OldState) andalso not(limit_reached(NewState)) of + true -> forget_queues(NewState); + false -> NewState + end. + +limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) -> + Limit =/= 0 andalso InUse >= Limit. + +remember_queue(QPid, State = #lim{queues = Queues}) -> + case dict:is_key(QPid, Queues) of + false -> MonitorRef = erlang:monitor(process, QPid), + State#lim{queues = dict:store(QPid, MonitorRef, Queues)}; + true -> State + end. + +forget_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> + QList = dict:to_list(Queues), + case length(QList) of + 0 -> ok; + L -> + %% We randomly vary the position in which each queue + %% appears in the list, thus ensuring that each queue has + %% an equal chance of being notified first. + {L1, L2} = lists:split(random:uniform(L), QList), + [begin + true = erlang:demonitor(Ref), + ok = rabbit_amqqueue:unblock(Q, ChPid) + end || {Q, Ref} <- L2 ++ L1] + end, + State#lim{queues = dict:new()}. |