From 1fefb9ca7a1aa99da5c6a910fecc2bb0c901c51e Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 6 Aug 2008 10:16:03 +0100 Subject: Special-case global=true: we don't implement it --- src/rabbit_channel.erl | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ec1d1fba..fa39ecf7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -444,6 +444,12 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, end end; +handle_method(#'basic.qos'{global = true}, _, State) -> + rabbit_misc:protocol_error(not_implemented, + "Basic.Qos global (per-connection) setting not implemented", + [], + 'basic.qos'); + handle_method(#'basic.qos'{}, _, State) -> %% FIXME: Need to implement QOS {reply, #'basic.qos_ok'{}, State}; -- cgit v1.2.1 From f85a177a9b1c58076455b660660e07ca379d211d Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 6 Aug 2008 12:00:21 +0100 Subject: Remove unnecessary method name from protocol_error --- src/rabbit_channel.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index fa39ecf7..d4034358 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -447,8 +447,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, handle_method(#'basic.qos'{global = true}, _, State) -> rabbit_misc:protocol_error(not_implemented, "Basic.Qos global (per-connection) setting not implemented", - [], - 'basic.qos'); + []); handle_method(#'basic.qos'{}, _, State) -> %% FIXME: Need to implement QOS -- cgit v1.2.1 From b50e4f85ee18b5351e4d9c4cfe7305e8147c0239 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Tue, 18 Nov 2008 19:40:02 +0000 Subject: First stab at basic.qos --- src/rabbit_amqqueue.erl | 12 +++--- src/rabbit_amqqueue_process.erl | 82 ++++++++++++++++++++++++++++------------- src/rabbit_channel.erl | 12 ++++-- 3 files changed, 71 insertions(+), 35 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 56d2c35d..938182da 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -30,7 +30,7 @@ -export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1, stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). -export([claim_queue/2]). --export([basic_get/3, basic_consume/7, basic_cancel/4]). +-export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2]). -export([on_node_down/1]). @@ -82,8 +82,8 @@ -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). --spec(basic_consume/7 :: - (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) -> +-spec(basic_consume/8 :: + (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). @@ -238,10 +238,10 @@ claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> gen_server:call(QPid, {basic_get, ChPid, NoAck}). -basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, - ConsumerTag, ExclusiveConsume, OkMsg}). + gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f8964e34..43355a5a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -43,6 +43,7 @@ % Queue's state -record(q, {q, owner, + limiter_mapping, exclusive_consumer, has_had_consumers, next_msg_id, @@ -75,6 +76,7 @@ init(Q) -> exclusive_consumer = none, has_had_consumers = false, next_msg_id = 1, + limiter_mapping = dict:new(), message_buffer = queue:new(), round_robin = queue:new()}, ?HIBERNATE_AFTER}. @@ -141,34 +143,61 @@ update_store_and_maybe_block_ch( deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, round_robin = RoundRobin, + limiter_mapping = LimiterMapping, next_msg_id = NextId}) -> ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), case queue:out(RoundRobin) of - {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, - ack_required = AckRequired}}}, + {{value, QEntry = {ChPid, + #consumer{tag = ConsumerTag, + ack_required = AckRequired = true}}}, RoundRobinTail} -> - rabbit_channel:deliver( - ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, Delivered, Message}), - C = #cr{unsent_message_count = Count, - unacked_messages = UAM} = ch_record(ChPid), - NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); - false -> UAM - end, - NewConsumers = - case update_store_and_maybe_block_ch( - C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}) of - ok -> queue:in(QEntry, RoundRobinTail); - block_ch -> block_consumers(ChPid, RoundRobinTail) - end, - {offered, AckRequired, State#q{round_robin = NewConsumers, - next_msg_id = NextId +1}}; + % Use Qos Limits if an ack is required + % Query the limiter to find out if a limit has been breached + LimiterPid = dict:fetch(ChPid, LimiterMapping), + case rabbit_limiter:can_send(LimiterPid, self()) of + true -> + really_deliver(AckRequired, ChPid, ConsumerTag, + Delivered, Message, NextId, QName, + QEntry, RoundRobinTail, State); + false -> + % Have another go by cycling through the consumer + % queue + NewConsumers = block_consumers(ChPid, RoundRobinTail), + deliver_immediately(Message, Delivered, + State#q{round_robin = NewConsumers}) + end; + {{value, QEntry = {ChPid, + #consumer{tag = ConsumerTag, + ack_required = AckRequired = false}}}, + RoundRobinTail} -> + really_deliver(AckRequired, ChPid, ConsumerTag, + Delivered, Message, NextId, QName, + QEntry, RoundRobinTail, State); {empty, _} -> not_offered end. +% TODO The arity of this function seems a bit large :-( +really_deliver(AckRequired, ChPid, ConsumerTag, Delivered, Message, NextId, + QName, QEntry, RoundRobinTail, State) -> + rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, + {QName, self(), NextId, Delivered, Message}), + C = #cr{unsent_message_count = Count, + unacked_messages = UAM} = ch_record(ChPid), + NewUAM = case AckRequired of + true -> dict:store(NextId, Message, UAM); + false -> UAM + end, + NewConsumers = + case update_store_and_maybe_block_ch( + C#cr{unsent_message_count = Count + 1, + unacked_messages = NewUAM}) of + ok -> queue:in(QEntry, RoundRobinTail); + block_ch -> block_consumers(ChPid, RoundRobinTail) + end, + {offered, AckRequired, State#q{round_robin = NewConsumers, + next_msg_id = NextId +1}}. + attempt_delivery(none, Message, State) -> case deliver_immediately(Message, false, State) of {offered, false, State1} -> @@ -519,11 +548,14 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply(empty, State) end; -handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, - ExclusiveConsume, OkMsg}, - _From, State = #q{owner = Owner, - exclusive_consumer = ExistingHolder, - round_robin = RoundRobin}) -> +handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, + ConsumerTag, ExclusiveConsume, OkMsg}, + _From, _State = #q{owner = Owner, + limiter_mapping = Mapping, + exclusive_consumer = ExistingHolder, + round_robin = RoundRobin}) -> + % TODO Remove the underscore in front of the first State variable + State = _State#q{limiter_mapping = dict:store(ChPid, LimiterPid, Mapping)}, case check_queue_owner(Owner, ReaderPid) of mismatch -> reply({error, queue_owned_by_another_connection}, State); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 1eb421ca..7331a34b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,7 +36,7 @@ -record(ch, {state, proxy_pid, reader_pid, writer_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, - username, virtual_host, + username, virtual_host, limiter, most_recently_declared_queue, consumer_mapping}). %%---------------------------------------------------------------------------- @@ -102,6 +102,8 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, + % TODO See point 3.1.1 of the design - start the limiter lazily + limiter = rabbit_limiter:start_link(), consumer_mapping = dict:new()}. handle_message({method, Method, Content}, State) -> @@ -323,6 +325,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{ proxy_pid = ProxyPid, reader_pid = ReaderPid, + limiter = LimiterPid, consumer_mapping = ConsumerMapping }) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -340,7 +343,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, fun (Q) -> rabbit_amqqueue:basic_consume( - Q, NoAck, ReaderPid, ProxyPid, + Q, NoAck, ReaderPid, ProxyPid, LimiterPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -405,8 +408,9 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, end end; -handle_method(#'basic.qos'{}, _, State) -> - %% FIXME: Need to implement QOS +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, + _, State = #ch{limiter = Limiter}) -> + Limiter ! {prefetch_count, PrefetchCount}, {reply, #'basic.qos_ok'{}, State}; handle_method(#'basic.recover'{requeue = true}, -- cgit v1.2.1 From f2243fc3cfa0560e13667a0c503901d94d7268b8 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Wed, 19 Nov 2008 11:33:35 +0000 Subject: Moved limiter out of the queue state into the process dict --- src/rabbit_amqqueue_process.erl | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 43355a5a..9bb41b6b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -43,7 +43,6 @@ % Queue's state -record(q, {q, owner, - limiter_mapping, exclusive_consumer, has_had_consumers, next_msg_id, @@ -57,6 +56,7 @@ %% These are held in our process dictionary -record(cr, {consumers, ch_pid, + limiter_pid, monitor_ref, unacked_messages, is_overload_protection_active, @@ -76,7 +76,6 @@ init(Q) -> exclusive_consumer = none, has_had_consumers = false, next_msg_id = 1, - limiter_mapping = dict:new(), message_buffer = queue:new(), round_robin = queue:new()}, ?HIBERNATE_AFTER}. @@ -143,7 +142,6 @@ update_store_and_maybe_block_ch( deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, round_robin = RoundRobin, - limiter_mapping = LimiterMapping, next_msg_id = NextId}) -> ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), case queue:out(RoundRobin) of @@ -153,7 +151,7 @@ deliver_immediately(Message, Delivered, RoundRobinTail} -> % Use Qos Limits if an ack is required % Query the limiter to find out if a limit has been breached - LimiterPid = dict:fetch(ChPid, LimiterMapping), + #cr{limiter_pid = LimiterPid} = ch_record(ChPid), case rabbit_limiter:can_send(LimiterPid, self()) of true -> really_deliver(AckRequired, ChPid, ConsumerTag, @@ -550,12 +548,9 @@ handle_call({basic_get, ChPid, NoAck}, _From, handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, - _From, _State = #q{owner = Owner, - limiter_mapping = Mapping, - exclusive_consumer = ExistingHolder, - round_robin = RoundRobin}) -> - % TODO Remove the underscore in front of the first State variable - State = _State#q{limiter_mapping = dict:store(ChPid, LimiterPid, Mapping)}, + _From, State = #q{owner = Owner, + exclusive_consumer = ExistingHolder, + round_robin = RoundRobin}) -> case check_queue_owner(Owner, ReaderPid) of mismatch -> reply({error, queue_owned_by_another_connection}, State); @@ -566,7 +561,8 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ok -> C = #cr{consumers = Consumers} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, - C1 = C#cr{consumers = [Consumer | Consumers]}, + C1 = C#cr{consumers = [Consumer | Consumers], + limiter_pid = LimiterPid}, store_ch_record(C1), State1 = State#q{has_had_consumers = true, exclusive_consumer = -- cgit v1.2.1 From 92bce20b237ff4bf96dc49077f83d300f4d8b125 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Wed, 19 Nov 2008 16:31:51 +0000 Subject: Savepoint --- src/rabbit_amqqueue_process.erl | 5 +- src/rabbit_channel.erl | 2 +- src/rabbit_limiter.erl | 155 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 159 insertions(+), 3 deletions(-) create mode 100644 src/rabbit_limiter.erl diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9bb41b6b..c5a6a343 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -307,7 +307,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, {stop, normal, NewState} end end. - + cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) -> none; cancel_holder(_ChPid, _ConsumerTag, Holder) -> @@ -658,7 +658,8 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> noreply(State); - C = #cr{unacked_messages = UAM} -> + C = #cr{unacked_messages = UAM, limiter_pid = LimiterPid} -> + rabbit_limiter:decrement_capacity(LimiterPid, qname(State)), {Acked, Remaining} = collect_messages(MsgIds, UAM), persist_acks(Txn, qname(State), Acked), case Txn of diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7331a34b..ac186cfa 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -103,7 +103,7 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> virtual_host = VHost, most_recently_declared_queue = <<>>, % TODO See point 3.1.1 of the design - start the limiter lazily - limiter = rabbit_limiter:start_link(), + limiter = rabbit_limiter:start_link(self()), consumer_mapping = dict:new()}. handle_message({method, Method, Content}, State) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl new file mode 100644 index 00000000..3dfeb5fe --- /dev/null +++ b/src/rabbit_limiter.erl @@ -0,0 +1,155 @@ +-module(rabbit_limiter). + + +% I'm starting out with a gen_server because of the synchronous query +% that the queue process makes +-behviour(gen_server). + +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, + handle_info/2]). +-export([start_link/1]). +-export([can_send/2, decrement_capacity/2]). + +-record(lim, {prefetch_count = 1, + ch_pid, + blocked = false, + in_use = dict:new()}). + +%--------------------------------------------------------------------------- +% API +%--------------------------------------------------------------------------- + +% Kicks this pig +start_link(ChPid) -> + {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []), + Pid. + +% Queries the limiter to ask whether the queue can deliver a message +% without breaching a limit +can_send(LimiterPid, QPid) -> + gen_server:call(LimiterPid, {can_send, QPid}). + +% Lets the limiter know that a queue has received an ack from a consumer +% and hence can reduce the in-use-by-that queue capcity information +decrement_capacity(LimiterPid, QPid) -> + gen_server:cast(LimiterPid, {decrement_capacity, QPid}). + +%--------------------------------------------------------------------------- +% gen_server callbacks +%--------------------------------------------------------------------------- + +init([ChPid]) -> + {ok, #lim{ch_pid = ChPid} }. + +% This queuries the limiter to ask if it is possible to send a message without +% breaching a limit for this queue process +handle_call({can_send, QPid}, _From, State) -> + {CanSend, NewState} = maybe_can_send(QPid, State), + {reply, CanSend, NewState}. + +% This is an asynchronous ack from a queue that it has received an ack from +% a queue. This allows the limiter to update the the in-use-by-that queue +% capacity infromation. +handle_cast({decrement_capacity, QPid}, State) -> + NewState = decrement_in_use(QPid, State), + maybe_notify_queues(NewState), + {noreply, NewState}. + +% When the prefetch count has not been set, +% e.g. when the channel has not yet been issued a basic.qos +handle_info({prefetch_count, PrefetchCount}, + State = #lim{prefetch_count = 0}) -> + {noreply, State#lim{prefetch_count = PrefetchCount}}; + +% When the new limit is larger than the existing limit, +% notify all queues and forget about queues with an in-use +% capcity of zero +handle_info({prefetch_count, PrefetchCount}, + State = #lim{prefetch_count = CurrentLimit}) + when PrefetchCount > CurrentLimit -> + % TODO implement this requirement + {noreply, State#lim{prefetch_count = PrefetchCount}}; + +% Default setter of the prefetch count +handle_info({prefetch_count, PrefetchCount}, State) -> + {noreply, State#lim{prefetch_count = PrefetchCount}}. + +terminate(_, _) -> + ok. + +code_change(_, State, _) -> + State. + +%--------------------------------------------------------------------------- +% Internal plumbing +%--------------------------------------------------------------------------- + +decrement_in_use(QPid, State = #lim{in_use = InUse}) -> + case dict:find(QPid, InUse) of + {ok, Capacity} -> + io:format("capacity ~p~n",[Capacity]), + if + % Is there a lower bound on capacity? + % i.e. what is the zero mark, how much is unlimited? + Capacity > 0 -> + NewInUse = dict:store(QPid, Capacity - 1, InUse), + State#lim{in_use = NewInUse}; + true -> + % TODO How should this be handled? + State + end; + error -> + % TODO How should this case be handled? + State + end. + +maybe_notify_queues(State = #lim{ch_pid = ChPid, in_use = InUse}) -> + Capacity = current_capcity(State), + case should_notify(Capacity, State) of + true -> + dict:map(fun(Q,_) -> + rabbit_amqqueue:notify_sent(Q, ChPid) + end, InUse), + State#lim{blocked = false}; + false -> + ok + end. + +current_capcity(#lim{in_use = InUse}) -> + % TODO This *seems* expensive to compute this on the fly + dict:fold(fun(_, PerQ, Acc) -> PerQ + Acc end, 0, InUse). + + +% This is a very naive way of deciding wether to unblock or not, +% it *might* be better to wait for a time or volume threshold +% instead of broadcasting notifications +should_notify(Capacity, #lim{prefetch_count = Limit, blocked = true}) + when Capacity < Limit -> + true; + +should_notify(_,_) -> false. + +maybe_can_send(_, State = #lim{blocked = true}) -> + {false, State}; + +maybe_can_send(QPid, State = #lim{prefetch_count = Limit, + in_use = InUse, + blocked = false}) -> + Capacity = current_capcity(State), + io:format("Limit was ~p, capacity ~p~n",[Limit, Capacity]), + if + Capacity < Limit -> + NewInUse = update_in_use_capacity(QPid, InUse), + { true, State#lim{in_use = NewInUse} }; + true -> + { false, State#lim{blocked = true}} + end. + +update_in_use_capacity(QPid, InUse) -> + case dict:find(QPid, InUse) of + {ok, Capacity} -> + dict:store(QPid, Capacity + 1, InUse); + error -> + dict:store(QPid, 0, InUse) + end. + -- cgit v1.2.1 From d42a48dfea34f0cf3047ab1336216b95c6a1d586 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Thu, 20 Nov 2008 14:29:11 +0000 Subject: First cut with some actual load balancing working --- src/rabbit_amqqueue.erl | 5 +++++ src/rabbit_amqqueue_process.erl | 39 ++++++++++++++++++++++++++++++--------- src/rabbit_channel.erl | 2 +- src/rabbit_limiter.erl | 28 ++++++++++++++++++---------- 4 files changed, 54 insertions(+), 20 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 938182da..4e524e3c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -32,6 +32,7 @@ -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2]). +-export([unblock/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2]). -export([on_node_down/1]). @@ -88,6 +89,7 @@ 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). +-spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -249,6 +251,9 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> notify_sent(QPid, ChPid) -> gen_server:cast(QPid, {notify_sent, ChPid}). +unblock(QPid, ChPid) -> + gen_server:cast(QPid, {unblock, ChPid}). + internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c5a6a343..6ef5f970 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -59,6 +59,7 @@ limiter_pid, monitor_ref, unacked_messages, + is_limit_active, is_overload_protection_active, unsent_message_count}). @@ -125,18 +126,22 @@ all_ch_record() -> [C || {{ch, _}, C} <- get()]. update_store_and_maybe_block_ch( - C = #cr{is_overload_protection_active = Active, + C = #cr{is_overload_protection_active = Overloaded, + is_limit_active = Limited, unsent_message_count = Count}) -> - {Result, NewActive} = + {Result, NewOverloaded, NewLimited} = if - not(Active) and (Count > ?UNSENT_MESSAGE_LIMIT) -> - {block_ch, true}; - Active and (Count == 0) -> - {unblock_ch, false}; + not(Overloaded) and (Count > ?UNSENT_MESSAGE_LIMIT) -> + {block_ch, true, Limited}; + Overloaded and (Count == 0) -> + {unblock_ch, false, Limited}; + Limited and (Count < ?UNSENT_MESSAGE_LIMIT) -> + {unblock_ch, Overloaded, false}; true -> - {ok, Active} + {ok, Overloaded, Limited} end, - store_ch_record(C#cr{is_overload_protection_active = NewActive}), + store_ch_record(C#cr{is_overload_protection_active = NewOverloaded, + is_limit_active = NewLimited}), Result. deliver_immediately(Message, Delivered, @@ -160,6 +165,8 @@ deliver_immediately(Message, Delivered, false -> % Have another go by cycling through the consumer % queue + C = ch_record(ChPid), + store_ch_record(C#cr{is_limit_active = true}), NewConsumers = block_consumers(ChPid, RoundRobinTail), deliver_immediately(Message, Delivered, State#q{round_robin = NewConsumers}) @@ -659,7 +666,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> not_found -> noreply(State); C = #cr{unacked_messages = UAM, limiter_pid = LimiterPid} -> - rabbit_limiter:decrement_capacity(LimiterPid, qname(State)), + rabbit_limiter:decrement_capacity(LimiterPid, self()), {Acked, Remaining} = collect_messages(MsgIds, UAM), persist_acks(Txn, qname(State), Acked), case Txn of @@ -692,6 +699,20 @@ handle_cast({requeue, MsgIds, ChPid}, State) -> [{Message, true} || Message <- Messages], State)) end; +handle_cast({unblock, ChPid}, State) -> + % TODO Refactor the code duplication + % between this an the notify_sent cast handler + case lookup_ch(ChPid) of + not_found -> + noreply(State); + C = #cr{is_limit_active = true} -> + noreply(possibly_unblock(C, State)); + C -> + rabbit_log:warning("Ignoring unblock for an active ch: ~p~n", + [C]), + noreply(State) + end; + handle_cast({notify_sent, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> noreply(State); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ac186cfa..3306d6f6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -103,7 +103,7 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> virtual_host = VHost, most_recently_declared_queue = <<>>, % TODO See point 3.1.1 of the design - start the limiter lazily - limiter = rabbit_limiter:start_link(self()), + limiter = rabbit_limiter:start_link(ProxyPid), consumer_mapping = dict:new()}. handle_message({method, Method, Content}, State) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 3dfeb5fe..1973d358 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -51,9 +51,9 @@ handle_call({can_send, QPid}, _From, State) -> % a queue. This allows the limiter to update the the in-use-by-that queue % capacity infromation. handle_cast({decrement_capacity, QPid}, State) -> - NewState = decrement_in_use(QPid, State), - maybe_notify_queues(NewState), - {noreply, NewState}. + State1 = decrement_in_use(QPid, State), + State2 = maybe_notify_queues(State1), + {noreply, State2}. % When the prefetch count has not been set, % e.g. when the channel has not yet been issued a basic.qos @@ -84,10 +84,10 @@ code_change(_, State, _) -> % Internal plumbing %--------------------------------------------------------------------------- +% Reduces the in-use-count of the queue by one decrement_in_use(QPid, State = #lim{in_use = InUse}) -> case dict:find(QPid, InUse) of {ok, Capacity} -> - io:format("capacity ~p~n",[Capacity]), if % Is there a lower bound on capacity? % i.e. what is the zero mark, how much is unlimited? @@ -96,26 +96,35 @@ decrement_in_use(QPid, State = #lim{in_use = InUse}) -> State#lim{in_use = NewInUse}; true -> % TODO How should this be handled? + rabbit_log:warning( + "Ignoring decrement for zero capacity: ~p~n", + [QPid]), State end; error -> % TODO How should this case be handled? + rabbit_log:warning("Ignoring decrement for unknown queue: ~p~n", + [QPid]), State end. +% Works out whether any queues should be notified +% If any notification is required, it propagates a transition +% of the blocked state maybe_notify_queues(State = #lim{ch_pid = ChPid, in_use = InUse}) -> - Capacity = current_capcity(State), + Capacity = current_capacity(State), case should_notify(Capacity, State) of true -> dict:map(fun(Q,_) -> - rabbit_amqqueue:notify_sent(Q, ChPid) + rabbit_amqqueue:unblock(Q, ChPid) end, InUse), State#lim{blocked = false}; false -> - ok + State end. -current_capcity(#lim{in_use = InUse}) -> +% Computes the current aggregrate capacity of all of the in-use queues +current_capacity(#lim{in_use = InUse}) -> % TODO This *seems* expensive to compute this on the fly dict:fold(fun(_, PerQ, Acc) -> PerQ + Acc end, 0, InUse). @@ -135,8 +144,7 @@ maybe_can_send(_, State = #lim{blocked = true}) -> maybe_can_send(QPid, State = #lim{prefetch_count = Limit, in_use = InUse, blocked = false}) -> - Capacity = current_capcity(State), - io:format("Limit was ~p, capacity ~p~n",[Limit, Capacity]), + Capacity = current_capacity(State), if Capacity < Limit -> NewInUse = update_in_use_capacity(QPid, InUse), -- cgit v1.2.1 From 77249f371e49a9bf21869d027d89c21ec5a7c554 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Thu, 20 Nov 2008 17:15:52 +0000 Subject: Refactored the internal structure of the limiter --- src/rabbit_limiter.erl | 112 +++++++++++++++---------------------------------- 1 file changed, 34 insertions(+), 78 deletions(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 1973d358..adf4cd4b 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -1,3 +1,5 @@ +%% TODO Decide what to do with the license statement now that Cohesive have +%% bailed. -module(rabbit_limiter). @@ -10,9 +12,8 @@ -export([start_link/1]). -export([can_send/2, decrement_capacity/2]). --record(lim, {prefetch_count = 1, +-record(lim, {prefetch_count = 0, ch_pid, - blocked = false, in_use = dict:new()}). %--------------------------------------------------------------------------- @@ -44,22 +45,22 @@ init([ChPid]) -> % This queuries the limiter to ask if it is possible to send a message without % breaching a limit for this queue process handle_call({can_send, QPid}, _From, State) -> - {CanSend, NewState} = maybe_can_send(QPid, State), - {reply, CanSend, NewState}. + case limit_reached(State) of + true -> {reply, false, State}; + false -> {reply, true, update_in_use_capacity(QPid, State)} + end. % This is an asynchronous ack from a queue that it has received an ack from % a queue. This allows the limiter to update the the in-use-by-that queue % capacity infromation. handle_cast({decrement_capacity, QPid}, State) -> - State1 = decrement_in_use(QPid, State), - State2 = maybe_notify_queues(State1), - {noreply, State2}. - -% When the prefetch count has not been set, -% e.g. when the channel has not yet been issued a basic.qos -handle_info({prefetch_count, PrefetchCount}, - State = #lim{prefetch_count = 0}) -> - {noreply, State#lim{prefetch_count = PrefetchCount}}; + NewState = decrement_in_use(QPid, State), + ShouldNotify = limit_reached(State) and not(limit_reached(State)), + if + ShouldNotify -> notify_queues(NewState); + true -> ok + end, + {noreply, NewState}. % When the new limit is larger than the existing limit, % notify all queues and forget about queues with an in-use @@ -86,78 +87,33 @@ code_change(_, State, _) -> % Reduces the in-use-count of the queue by one decrement_in_use(QPid, State = #lim{in_use = InUse}) -> - case dict:find(QPid, InUse) of - {ok, Capacity} -> - if - % Is there a lower bound on capacity? - % i.e. what is the zero mark, how much is unlimited? - Capacity > 0 -> - NewInUse = dict:store(QPid, Capacity - 1, InUse), - State#lim{in_use = NewInUse}; - true -> - % TODO How should this be handled? - rabbit_log:warning( - "Ignoring decrement for zero capacity: ~p~n", - [QPid]), - State - end; - error -> - % TODO How should this case be handled? - rabbit_log:warning("Ignoring decrement for unknown queue: ~p~n", - [QPid]), - State + NewInUse = dict:update_counter(QPid, -1, InUse), + Count = dict:fetch(QPid, NewInUse), + if + Count < 1 -> + State#lim{in_use = dict:erase(QPid, NewInUse)}; + true -> + State#lim{in_use = NewInUse} end. -% Works out whether any queues should be notified -% If any notification is required, it propagates a transition -% of the blocked state -maybe_notify_queues(State = #lim{ch_pid = ChPid, in_use = InUse}) -> - Capacity = current_capacity(State), - case should_notify(Capacity, State) of - true -> - dict:map(fun(Q,_) -> - rabbit_amqqueue:unblock(Q, ChPid) - end, InUse), - State#lim{blocked = false}; - false -> - State - end. +% Unblocks every queue that this limiter knows about +notify_queues(#lim{ch_pid = ChPid, in_use = InUse}) -> + dict:map(fun(Q,_) -> rabbit_amqqueue:unblock(Q, ChPid) end, InUse). % Computes the current aggregrate capacity of all of the in-use queues current_capacity(#lim{in_use = InUse}) -> - % TODO This *seems* expensive to compute this on the fly + % TODO It *seems* expensive to compute this on the fly dict:fold(fun(_, PerQ, Acc) -> PerQ + Acc end, 0, InUse). +% A prefetch limit of zero means unlimited +limit_reached(#lim{prefetch_count = 0}) -> + false; -% This is a very naive way of deciding wether to unblock or not, -% it *might* be better to wait for a time or volume threshold -% instead of broadcasting notifications -should_notify(Capacity, #lim{prefetch_count = Limit, blocked = true}) - when Capacity < Limit -> - true; +% Works out whether the limit is breached for the current limiter state +limit_reached(State = #lim{prefetch_count = Limit}) -> + current_capacity(State) == Limit. -should_notify(_,_) -> false. - -maybe_can_send(_, State = #lim{blocked = true}) -> - {false, State}; - -maybe_can_send(QPid, State = #lim{prefetch_count = Limit, - in_use = InUse, - blocked = false}) -> - Capacity = current_capacity(State), - if - Capacity < Limit -> - NewInUse = update_in_use_capacity(QPid, InUse), - { true, State#lim{in_use = NewInUse} }; - true -> - { false, State#lim{blocked = true}} - end. - -update_in_use_capacity(QPid, InUse) -> - case dict:find(QPid, InUse) of - {ok, Capacity} -> - dict:store(QPid, Capacity + 1, InUse); - error -> - dict:store(QPid, 0, InUse) - end. +% Increments the counter for the in-use-capacity of a particular queue +update_in_use_capacity(QPid, State = #lim{in_use = InUse}) -> + State#lim{in_use = dict:update_counter(QPid, 1, InUse)}. -- cgit v1.2.1 From d0bc61aef26fb7f1d9d97e1ca2e0e3d71fd6a4d7 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Fri, 21 Nov 2008 14:08:11 +0000 Subject: Fixed bug in limiter --- src/rabbit_limiter.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index adf4cd4b..abca7ce1 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -5,7 +5,7 @@ % I'm starting out with a gen_server because of the synchronous query % that the queue process makes --behviour(gen_server). +-behaviour(gen_server). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). @@ -55,9 +55,9 @@ handle_call({can_send, QPid}, _From, State) -> % capacity infromation. handle_cast({decrement_capacity, QPid}, State) -> NewState = decrement_in_use(QPid, State), - ShouldNotify = limit_reached(State) and not(limit_reached(State)), + ShouldNotify = limit_reached(State) and not(limit_reached(NewState)), if - ShouldNotify -> notify_queues(NewState); + ShouldNotify -> notify_queues(State); true -> ok end, {noreply, NewState}. -- cgit v1.2.1 From d990241d9b9bf2b492788a3b3d2b82c8fd0dd88d Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Fri, 21 Nov 2008 16:44:00 +0000 Subject: Made set_prefetch_count into a proper gen_server call --- src/rabbit_channel.erl | 2 +- src/rabbit_limiter.erl | 34 ++++++++++++++++++++-------------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3306d6f6..c6108489 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -410,7 +410,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, State = #ch{limiter = Limiter}) -> - Limiter ! {prefetch_count, PrefetchCount}, + rabbit_limiter:set_prefetch_count(Limiter, PrefetchCount), {reply, #'basic.qos_ok'{}, State}; handle_method(#'basic.recover'{requeue = true}, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index abca7ce1..fbce5ea4 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -10,7 +10,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1]). --export([can_send/2, decrement_capacity/2]). +-export([set_prefetch_count/2, can_send/2, decrement_capacity/2]). -record(lim, {prefetch_count = 0, ch_pid, @@ -25,6 +25,9 @@ start_link(ChPid) -> {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []), Pid. +set_prefetch_count(LimiterPid, PrefetchCount) -> + gen_server:call(LimiterPid, {prefetch_count, PrefetchCount}). + % Queries the limiter to ask whether the queue can deliver a message % without breaching a limit can_send(LimiterPid, QPid) -> @@ -48,7 +51,20 @@ handle_call({can_send, QPid}, _From, State) -> case limit_reached(State) of true -> {reply, false, State}; false -> {reply, true, update_in_use_capacity(QPid, State)} - end. + end; + +% When the new limit is larger than the existing limit, +% notify all queues and forget about queues with an in-use +% capcity of zero +handle_call({prefetch_count, PrefetchCount}, _From, + State = #lim{prefetch_count = CurrentLimit}) + when PrefetchCount > CurrentLimit -> + % TODO implement this requirement + {reply, ok, State#lim{prefetch_count = PrefetchCount}}; + +% Default setter of the prefetch count +handle_call({prefetch_count, PrefetchCount}, _From, State) -> + {reply, ok, State#lim{prefetch_count = PrefetchCount}}. % This is an asynchronous ack from a queue that it has received an ack from % a queue. This allows the limiter to update the the in-use-by-that queue @@ -62,18 +78,8 @@ handle_cast({decrement_capacity, QPid}, State) -> end, {noreply, NewState}. -% When the new limit is larger than the existing limit, -% notify all queues and forget about queues with an in-use -% capcity of zero -handle_info({prefetch_count, PrefetchCount}, - State = #lim{prefetch_count = CurrentLimit}) - when PrefetchCount > CurrentLimit -> - % TODO implement this requirement - {noreply, State#lim{prefetch_count = PrefetchCount}}; - -% Default setter of the prefetch count -handle_info({prefetch_count, PrefetchCount}, State) -> - {noreply, State#lim{prefetch_count = PrefetchCount}}. +handle_info(_, State) -> + {noreply, State}. terminate(_, _) -> ok. -- cgit v1.2.1 From a826871c0142034ddbf48af719139e8af1516a26 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Fri, 21 Nov 2008 17:34:01 +0000 Subject: Got rid o the per-queue in-use capacity --- src/rabbit_amqqueue_process.erl | 2 +- src/rabbit_limiter.erl | 51 +++++++++++++++++------------------------ 2 files changed, 22 insertions(+), 31 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6ef5f970..c0f48ad1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -666,7 +666,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> not_found -> noreply(State); C = #cr{unacked_messages = UAM, limiter_pid = LimiterPid} -> - rabbit_limiter:decrement_capacity(LimiterPid, self()), + rabbit_limiter:decrement_capacity(LimiterPid), {Acked, Remaining} = collect_messages(MsgIds, UAM), persist_acks(Txn, qname(State), Acked), case Txn of diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index fbce5ea4..3f194b31 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -10,11 +10,12 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1]). --export([set_prefetch_count/2, can_send/2, decrement_capacity/2]). +-export([set_prefetch_count/2, can_send/2, decrement_capacity/1]). -record(lim, {prefetch_count = 0, ch_pid, - in_use = dict:new()}). + queues = sets:new(), + in_use = 0}). %--------------------------------------------------------------------------- % API @@ -35,8 +36,8 @@ can_send(LimiterPid, QPid) -> % Lets the limiter know that a queue has received an ack from a consumer % and hence can reduce the in-use-by-that queue capcity information -decrement_capacity(LimiterPid, QPid) -> - gen_server:cast(LimiterPid, {decrement_capacity, QPid}). +decrement_capacity(LimiterPid) -> + gen_server:cast(LimiterPid, decrement_capacity). %--------------------------------------------------------------------------- % gen_server callbacks @@ -47,10 +48,13 @@ init([ChPid]) -> % This queuries the limiter to ask if it is possible to send a message without % breaching a limit for this queue process -handle_call({can_send, QPid}, _From, State) -> +handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse, + queues = Queues}) -> case limit_reached(State) of true -> {reply, false, State}; - false -> {reply, true, update_in_use_capacity(QPid, State)} + false -> + NewQueues = sets:add_element(QPid, Queues), + {reply, true, State#lim{in_use = InUse + 1, queues = NewQueues}} end; % When the new limit is larger than the existing limit, @@ -69,8 +73,8 @@ handle_call({prefetch_count, PrefetchCount}, _From, State) -> % This is an asynchronous ack from a queue that it has received an ack from % a queue. This allows the limiter to update the the in-use-by-that queue % capacity infromation. -handle_cast({decrement_capacity, QPid}, State) -> - NewState = decrement_in_use(QPid, State), +handle_cast(decrement_capacity, State) -> + NewState = decrement_in_use(State), ShouldNotify = limit_reached(State) and not(limit_reached(NewState)), if ShouldNotify -> notify_queues(State); @@ -92,34 +96,21 @@ code_change(_, State, _) -> %--------------------------------------------------------------------------- % Reduces the in-use-count of the queue by one -decrement_in_use(QPid, State = #lim{in_use = InUse}) -> - NewInUse = dict:update_counter(QPid, -1, InUse), - Count = dict:fetch(QPid, NewInUse), - if - Count < 1 -> - State#lim{in_use = dict:erase(QPid, NewInUse)}; - true -> - State#lim{in_use = NewInUse} - end. +decrement_in_use(State = #lim{in_use = 0}) -> + State#lim{in_use = 0}; -% Unblocks every queue that this limiter knows about -notify_queues(#lim{ch_pid = ChPid, in_use = InUse}) -> - dict:map(fun(Q,_) -> rabbit_amqqueue:unblock(Q, ChPid) end, InUse). +decrement_in_use(State = #lim{in_use = InUse}) -> + State#lim{in_use = InUse - 1}. -% Computes the current aggregrate capacity of all of the in-use queues -current_capacity(#lim{in_use = InUse}) -> - % TODO It *seems* expensive to compute this on the fly - dict:fold(fun(_, PerQ, Acc) -> PerQ + Acc end, 0, InUse). +% Unblocks every queue that this limiter knows about +notify_queues(#lim{ch_pid = ChPid, queues = Queues}) -> + sets:fold(fun(Q,_) -> rabbit_amqqueue:unblock(Q, ChPid) end, [], Queues). % A prefetch limit of zero means unlimited limit_reached(#lim{prefetch_count = 0}) -> false; % Works out whether the limit is breached for the current limiter state -limit_reached(State = #lim{prefetch_count = Limit}) -> - current_capacity(State) == Limit. - -% Increments the counter for the in-use-capacity of a particular queue -update_in_use_capacity(QPid, State = #lim{in_use = InUse}) -> - State#lim{in_use = dict:update_counter(QPid, 1, InUse)}. +limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) -> + InUse == Limit. -- cgit v1.2.1 From 284ee2c626e3a55545496681997c395684d2c3f0 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Fri, 21 Nov 2008 17:40:25 +0000 Subject: Changed prefetch from call to cast --- src/rabbit_limiter.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 3f194b31..e38843c9 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -27,7 +27,7 @@ start_link(ChPid) -> Pid. set_prefetch_count(LimiterPid, PrefetchCount) -> - gen_server:call(LimiterPid, {prefetch_count, PrefetchCount}). + gen_server:cast(LimiterPid, {prefetch_count, PrefetchCount}). % Queries the limiter to ask whether the queue can deliver a message % without breaching a limit @@ -55,20 +55,20 @@ handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse, false -> NewQueues = sets:add_element(QPid, Queues), {reply, true, State#lim{in_use = InUse + 1, queues = NewQueues}} - end; + end. % When the new limit is larger than the existing limit, % notify all queues and forget about queues with an in-use % capcity of zero -handle_call({prefetch_count, PrefetchCount}, _From, +handle_cast({prefetch_count, PrefetchCount}, State = #lim{prefetch_count = CurrentLimit}) when PrefetchCount > CurrentLimit -> % TODO implement this requirement - {reply, ok, State#lim{prefetch_count = PrefetchCount}}; + {noreply, State#lim{prefetch_count = PrefetchCount}}; % Default setter of the prefetch count -handle_call({prefetch_count, PrefetchCount}, _From, State) -> - {reply, ok, State#lim{prefetch_count = PrefetchCount}}. +handle_cast({prefetch_count, PrefetchCount}, State) -> + {noreply, State#lim{prefetch_count = PrefetchCount}}; % This is an asynchronous ack from a queue that it has received an ack from % a queue. This allows the limiter to update the the in-use-by-that queue -- cgit v1.2.1 From 3410d55dcbb1ad78ec2d7ca600bb0bda4c6cb502 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Fri, 21 Nov 2008 18:14:12 +0000 Subject: Minor fixes --- src/rabbit_limiter.erl | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index e38843c9..b83af0c9 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -50,11 +50,11 @@ init([ChPid]) -> % breaching a limit for this queue process handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse, queues = Queues}) -> - case limit_reached(State) of - true -> {reply, false, State}; + NewState = State#lim{queues = sets:add_element(QPid, Queues)}, + case limit_reached(NewState) of + true -> {reply, false, NewState}; false -> - NewQueues = sets:add_element(QPid, Queues), - {reply, true, State#lim{in_use = InUse + 1, queues = NewQueues}} + {reply, true, NewState#lim{in_use = InUse + 1}} end. % When the new limit is larger than the existing limit, @@ -63,8 +63,10 @@ handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse, handle_cast({prefetch_count, PrefetchCount}, State = #lim{prefetch_count = CurrentLimit}) when PrefetchCount > CurrentLimit -> - % TODO implement this requirement - {noreply, State#lim{prefetch_count = PrefetchCount}}; + notify_queues(State), + {noreply, State#lim{prefetch_count = PrefetchCount, + queues = sets:new(), + in_use = 0}}; % Default setter of the prefetch count handle_cast({prefetch_count, PrefetchCount}, State) -> @@ -73,14 +75,16 @@ handle_cast({prefetch_count, PrefetchCount}, State) -> % This is an asynchronous ack from a queue that it has received an ack from % a queue. This allows the limiter to update the the in-use-by-that queue % capacity infromation. -handle_cast(decrement_capacity, State) -> +handle_cast(decrement_capacity, State = #lim{in_use = InUse}) -> NewState = decrement_in_use(State), ShouldNotify = limit_reached(State) and not(limit_reached(NewState)), if - ShouldNotify -> notify_queues(State); - true -> ok - end, - {noreply, NewState}. + ShouldNotify -> + notify_queues(State), + {noreply, State#lim{queues = sets:new(), in_use = InUse - 1}}; + true -> + {noreply, NewState} + end. handle_info(_, State) -> {noreply, State}. -- cgit v1.2.1 From 1b8c0ff6a1f6a305945afacdb3e5d223ae1221f9 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Mon, 24 Nov 2008 00:52:28 +0000 Subject: Now the channel sends the ack directly to the limiter instead of via the queue --- src/rabbit_amqqueue_process.erl | 1 - src/rabbit_channel.erl | 3 +++ src/rabbit_limiter.erl | 18 +++++++++--------- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c0f48ad1..b4d0d52d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -666,7 +666,6 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> not_found -> noreply(State); C = #cr{unacked_messages = UAM, limiter_pid = LimiterPid} -> - rabbit_limiter:decrement_capacity(LimiterPid), {Acked, Remaining} = collect_messages(MsgIds, UAM), persist_acks(Txn, qname(State), Acked), case Txn of diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c6108489..4abc3494 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -271,6 +271,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{transaction_id = TxnKey, + limiter = Limiter, next_tag = NextDeliveryTag, unacked_message_q = UAMQ}) -> if DeliveryTag >= NextDeliveryTag -> @@ -279,6 +280,8 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, true -> ok end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), + % CC the limiter on the number of acks that have been received + rabbit_limiter:decrement_capacity(Limiter, queue:len(Acked)), Participants = ack(State#ch.proxy_pid, TxnKey, Acked), {noreply, case TxnKey of none -> State#ch{unacked_message_q = Remaining}; diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index b83af0c9..4e130ea0 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -10,7 +10,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1]). --export([set_prefetch_count/2, can_send/2, decrement_capacity/1]). +-export([set_prefetch_count/2, can_send/2, decrement_capacity/2]). -record(lim, {prefetch_count = 0, ch_pid, @@ -36,8 +36,8 @@ can_send(LimiterPid, QPid) -> % Lets the limiter know that a queue has received an ack from a consumer % and hence can reduce the in-use-by-that queue capcity information -decrement_capacity(LimiterPid) -> - gen_server:cast(LimiterPid, decrement_capacity). +decrement_capacity(LimiterPid, Magnitude) -> + gen_server:cast(LimiterPid, {decrement_capacity, Magnitude}). %--------------------------------------------------------------------------- % gen_server callbacks @@ -75,8 +75,8 @@ handle_cast({prefetch_count, PrefetchCount}, State) -> % This is an asynchronous ack from a queue that it has received an ack from % a queue. This allows the limiter to update the the in-use-by-that queue % capacity infromation. -handle_cast(decrement_capacity, State = #lim{in_use = InUse}) -> - NewState = decrement_in_use(State), +handle_cast({decrement_capacity, Magnitude}, State = #lim{in_use = InUse}) -> + NewState = decrement_in_use(Magnitude, State), ShouldNotify = limit_reached(State) and not(limit_reached(NewState)), if ShouldNotify -> @@ -99,12 +99,12 @@ code_change(_, State, _) -> % Internal plumbing %--------------------------------------------------------------------------- -% Reduces the in-use-count of the queue by one -decrement_in_use(State = #lim{in_use = 0}) -> +% Reduces the in-use-count of the queue by a specific magnitude +decrement_in_use(_, State = #lim{in_use = 0}) -> State#lim{in_use = 0}; -decrement_in_use(State = #lim{in_use = InUse}) -> - State#lim{in_use = InUse - 1}. +decrement_in_use(Magnitude, State = #lim{in_use = InUse}) -> + State#lim{in_use = InUse - Magnitude}. % Unblocks every queue that this limiter knows about notify_queues(#lim{ch_pid = ChPid, queues = Queues}) -> -- cgit v1.2.1 From c4f8ddb5cb914b0f825a5c8fc30df594f92c8703 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Mon, 24 Nov 2008 01:14:38 +0000 Subject: Differentiate between acks for basic.get and basic.consume --- src/rabbit_channel.erl | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4abc3494..f9f92959 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -281,7 +281,19 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), % CC the limiter on the number of acks that have been received - rabbit_limiter:decrement_capacity(Limiter, queue:len(Acked)), + % but don't include any acks from a basic.get bottom half + % (hence the differentiation between tags set to none and other tags) + % TODO - this is quite crude and is probably more expensive than it should + % be - according to the OTP documentation, len/1 runs in O(n), probably + % not so cool for a queuing system + NotBasicGet = queue:filter( + fun({_CurrentDeliveryTag, ConsumerTag, _Msg}) -> + case ConsumerTag of + none -> false; + _ -> true + end + end, Acked), + rabbit_limiter:decrement_capacity(Limiter, queue:len(NotBasicGet)), Participants = ack(State#ch.proxy_pid, TxnKey, Acked), {noreply, case TxnKey of none -> State#ch{unacked_message_q = Remaining}; -- cgit v1.2.1 From 36c7a93b0e05496c85d2fdbbfea178584feec9ac Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Sun, 7 Dec 2008 01:26:36 +0000 Subject: Dead queue informs limiter --- src/rabbit_amqqueue_process.erl | 5 ++++- src/rabbit_channel.erl | 1 + src/rabbit_limiter.erl | 11 +++++++++++ 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b4d0d52d..2000a11c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -81,6 +81,9 @@ init(Q) -> round_robin = queue:new()}, ?HIBERNATE_AFTER}. terminate(_Reason, State) -> + %% Inform all limiters that we're dying + [ rabbit_limiter:unregister_queue(LimiterPid, self()) + || #cr{limiter_pid = LimiterPid} <- all_ch_record()], %% FIXME: How do we cancel active subscriptions? QName = qname(State), lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end, @@ -665,7 +668,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> noreply(State); - C = #cr{unacked_messages = UAM, limiter_pid = LimiterPid} -> + C = #cr{unacked_messages = UAM} -> {Acked, Remaining} = collect_messages(MsgIds, UAM), persist_acks(Txn, qname(State), Acked), case Txn of diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f9f92959..240ee3d3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -293,6 +293,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, _ -> true end end, Acked), + % TODO Optimization: Probably don't need to send this if len = 0 rabbit_limiter:decrement_capacity(Limiter, queue:len(NotBasicGet)), Participants = ack(State#ch.proxy_pid, TxnKey, Acked), {noreply, case TxnKey of diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 4e130ea0..adc2c721 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -11,6 +11,7 @@ handle_info/2]). -export([start_link/1]). -export([set_prefetch_count/2, can_send/2, decrement_capacity/2]). +-export([unregister_queue/2]). -record(lim, {prefetch_count = 0, ch_pid, @@ -38,6 +39,11 @@ can_send(LimiterPid, QPid) -> % and hence can reduce the in-use-by-that queue capcity information decrement_capacity(LimiterPid, Magnitude) -> gen_server:cast(LimiterPid, {decrement_capacity, Magnitude}). + +% This is called to tell the limiter that the queue is probably dead and +% it should be forgotten about +unregister_queue(LimiterPid, QPid) -> + gen_server:cast(LimiterPid, {unregister_queue, QPid}). %--------------------------------------------------------------------------- % gen_server callbacks @@ -68,6 +74,11 @@ handle_cast({prefetch_count, PrefetchCount}, queues = sets:new(), in_use = 0}}; +% Removes the queue process from the set of monitored queues +handle_cast({unregister_queue, QPid}, State= #lim{queues = Queues}) -> + NewState = decrement_in_use(1, State), + {noreply, NewState#lim{queues = sets:del_element(QPid, Queues)}}; + % Default setter of the prefetch count handle_cast({prefetch_count, PrefetchCount}, State) -> {noreply, State#lim{prefetch_count = PrefetchCount}}; -- cgit v1.2.1 From ee627645fcf03b9d562dcb73f61987d265dd2869 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Sun, 7 Dec 2008 01:34:42 +0000 Subject: Added match for setting the global flag --- src/rabbit_channel.erl | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 240ee3d3..55617abb 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -424,6 +424,10 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, end end; +handle_method(#'basic.qos'{global = true}, _, _State) -> + rabbit_misc:protocol_error(not_implemented, + "Global flag for basic.qos not implementented"); + handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, State = #ch{limiter = Limiter}) -> rabbit_limiter:set_prefetch_count(Limiter, PrefetchCount), -- cgit v1.2.1 From 9c8e90bde666981f5cc716abbeb7d55d4d10bdce Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Sun, 7 Dec 2008 01:40:09 +0000 Subject: Added catch for pre-fetch size --- src/rabbit_channel.erl | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 55617abb..a8db5b3f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -428,6 +428,11 @@ handle_method(#'basic.qos'{global = true}, _, _State) -> rabbit_misc:protocol_error(not_implemented, "Global flag for basic.qos not implementented"); +handle_method(#'basic.qos'{prefetch_size = Size}, + _, _State) when Size /= 0 -> + rabbit_misc:protocol_error(not_implemented, + "Pre-fetch size for basic.qos not implementented"); + handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, State = #ch{limiter = Limiter}) -> rabbit_limiter:set_prefetch_count(Limiter, PrefetchCount), -- cgit v1.2.1 From ff2368fc2e3606248709ef3309eaf51245edcad2 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Sun, 7 Dec 2008 02:01:59 +0000 Subject: Shutting dialyzer up --- src/rabbit_channel.erl | 7 ++++--- src/rabbit_limiter.erl | 16 +++++++++++++--- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a8db5b3f..b17a2b06 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -424,14 +424,15 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, end end; -handle_method(#'basic.qos'{global = true}, _, _State) -> +handle_method(#'basic.qos'{global = Flag = true}, _, _State) -> rabbit_misc:protocol_error(not_implemented, - "Global flag for basic.qos not implementented"); + "Global flag (~s) for basic.qos not implementented", [Flag]); handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> rabbit_misc:protocol_error(not_implemented, - "Pre-fetch size for basic.qos not implementented"); + "Pre-fetch size (~s) for basic.qos not implementented", + [Size]); handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, State = #ch{limiter = Limiter}) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index adc2c721..8509eab8 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -2,9 +2,6 @@ %% bailed. -module(rabbit_limiter). - -% I'm starting out with a gen_server because of the synchronous query -% that the queue process makes -behaviour(gen_server). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, @@ -13,6 +10,19 @@ -export([set_prefetch_count/2, can_send/2, decrement_capacity/2]). -export([unregister_queue/2]). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(set_prefetch_count/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(can_send/2 :: (pid(), pid()) -> bool()). +-spec(decrement_capacity/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(unregister_queue/2 :: (pid(), pid()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + -record(lim, {prefetch_count = 0, ch_pid, queues = sets:new(), -- cgit v1.2.1 From 26ef7ae0cf92b5f95f8cd0ed0ec5d6e0560fd352 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 18 Dec 2008 14:10:10 +0000 Subject: add limiter to module list --- ebin/rabbit.app | 1 + 1 file changed, 1 insertion(+) diff --git a/ebin/rabbit.app b/ebin/rabbit.app index 93abd456..217bb27d 100644 --- a/ebin/rabbit.app +++ b/ebin/rabbit.app @@ -17,6 +17,7 @@ rabbit_framing, rabbit_framing_channel, rabbit_heartbeat, + rabbit_limiter, rabbit_load, rabbit_log, rabbit_misc, -- cgit v1.2.1 From 13f6e4555d9d919af78042a17695fd48c1ecfd1e Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Thu, 18 Dec 2008 17:26:56 +0000 Subject: Fix for multi ack bug --- src/rabbit_limiter.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index d91893b0..20f54359 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -131,7 +131,7 @@ handle_cast({decrement_capacity, Magnitude}, State = #lim{in_use = InUse}) -> if ShouldNotify -> notify_queues(State), - {noreply, State#lim{queues = sets:new(), in_use = InUse - 1}}; + {noreply, State#lim{queues = sets:new(), in_use = InUse - Magnitude}}; true -> {noreply, NewState} end. -- cgit v1.2.1 From ad32ba9a6e7f63beafdf997df473cf9b279c6209 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Thu, 18 Dec 2008 17:37:45 +0000 Subject: Got rid of superfluous PD read --- src/rabbit_amqqueue_process.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b2c619d3..d26b0bb4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -180,7 +180,7 @@ deliver_immediately(Message, Delivered, RoundRobinTail} -> % Use Qos Limits if an ack is required % Query the limiter to find out if a limit has been breached - #cr{limiter_pid = LimiterPid} = ch_record(ChPid), + C = #cr{limiter_pid = LimiterPid} = ch_record(ChPid), case rabbit_limiter:can_send(LimiterPid, self()) of true -> really_deliver(AckRequired, ChPid, ConsumerTag, @@ -189,7 +189,6 @@ deliver_immediately(Message, Delivered, false -> % Have another go by cycling through the consumer % queue - C = ch_record(ChPid), store_ch_record(C#cr{is_limit_active = true}), NewConsumers = block_consumers(ChPid, RoundRobinTail), deliver_immediately(Message, Delivered, -- cgit v1.2.1 From 7498967e007fdf8693aad6dd636e57aa5bf6d531 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Thu, 18 Dec 2008 18:25:26 +0000 Subject: Put some monitors in --- src/rabbit_amqqueue_process.erl | 3 --- src/rabbit_limiter.erl | 43 +++++++++++++++++++++++++---------------- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d26b0bb4..702a8aee 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -102,9 +102,6 @@ init(Q) -> round_robin = queue:new()}, ?HIBERNATE_AFTER}. terminate(_Reason, State) -> - %% Inform all limiters that we're dying - [ rabbit_limiter:unregister_queue(LimiterPid, self()) - || #cr{limiter_pid = LimiterPid} <- all_ch_record()], %% FIXME: How do we cancel active subscriptions? QName = qname(State), lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 20f54359..6cc170f9 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -37,7 +37,6 @@ handle_info/2]). -export([start_link/1]). -export([set_prefetch_count/2, can_send/2, decrement_capacity/2]). --export([unregister_queue/2]). %%---------------------------------------------------------------------------- @@ -46,7 +45,6 @@ -spec(set_prefetch_count/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(can_send/2 :: (pid(), pid()) -> bool()). -spec(decrement_capacity/2 :: (pid(), non_neg_integer()) -> 'ok'). --spec(unregister_queue/2 :: (pid(), pid()) -> 'ok'). -endif. @@ -54,7 +52,7 @@ -record(lim, {prefetch_count = 0, ch_pid, - queues = sets:new(), + queues = dict:new(), in_use = 0}). %--------------------------------------------------------------------------- @@ -78,11 +76,6 @@ can_send(LimiterPid, QPid) -> % and hence can reduce the in-use-by-that queue capcity information decrement_capacity(LimiterPid, Magnitude) -> gen_server:cast(LimiterPid, {decrement_capacity, Magnitude}). - -% This is called to tell the limiter that the queue is probably dead and -% it should be forgotten about -unregister_queue(LimiterPid, QPid) -> - gen_server:cast(LimiterPid, {unregister_queue, QPid}). %--------------------------------------------------------------------------- % gen_server callbacks @@ -93,9 +86,8 @@ init([ChPid]) -> % This queuries the limiter to ask if it is possible to send a message without % breaching a limit for this queue process -handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse, - queues = Queues}) -> - NewState = State#lim{queues = sets:add_element(QPid, Queues)}, +handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) -> + NewState = monitor_queue(QPid, State), case limit_reached(NewState) of true -> {reply, false, NewState}; false -> @@ -109,14 +101,14 @@ handle_cast({prefetch_count, PrefetchCount}, State = #lim{prefetch_count = CurrentLimit}) when PrefetchCount > CurrentLimit -> notify_queues(State), - {noreply, State#lim{prefetch_count = PrefetchCount, - queues = sets:new(), + NewState = demonitor_all(State), + {noreply, NewState#lim{prefetch_count = PrefetchCount, in_use = 0}}; % Removes the queue process from the set of monitored queues -handle_cast({unregister_queue, QPid}, State= #lim{queues = Queues}) -> +handle_cast({unregister_queue, QPid}, State = #lim{}) -> NewState = decrement_in_use(1, State), - {noreply, NewState#lim{queues = sets:del_element(QPid, Queues)}}; + {noreply, demonitor_queue(QPid, NewState)}; % Default setter of the prefetch count handle_cast({prefetch_count, PrefetchCount}, State) -> @@ -131,7 +123,8 @@ handle_cast({decrement_capacity, Magnitude}, State = #lim{in_use = InUse}) -> if ShouldNotify -> notify_queues(State), - {noreply, State#lim{queues = sets:new(), in_use = InUse - Magnitude}}; + NextState = demonitor_all(State), + {noreply, NextState#lim{in_use = InUse - Magnitude}}; true -> {noreply, NewState} end. @@ -149,6 +142,22 @@ code_change(_, State, _) -> % Internal plumbing %--------------------------------------------------------------------------- +% Starts to monitor a particular queue +monitor_queue(QPid, State = #lim{queues = Queues}) -> + MonitorRef = erlang:monitor(process, QPid), + State#lim{queues = dict:store(QPid, MonitorRef, Queues)}. + +% Stops monitoring a particular queue +demonitor_queue(QPid, State = #lim{queues = Queues}) -> + MonitorRef = dict:fetch(QPid, Queues), + true = erlang:demonitor(MonitorRef), + State#lim{queues = dict:erase(QPid, Queues)}. + +% Stops monitoring all queues +demonitor_all(State = #lim{queues = Queues}) -> + dict:map(fun(_, Ref) -> true = erlang:demonitor(Ref) end, Queues), + State#lim{queues = dict:new()}. + % Reduces the in-use-count of the queue by a specific magnitude decrement_in_use(_, State = #lim{in_use = 0}) -> State#lim{in_use = 0}; @@ -158,7 +167,7 @@ decrement_in_use(Magnitude, State = #lim{in_use = InUse}) -> % Unblocks every queue that this limiter knows about notify_queues(#lim{ch_pid = ChPid, queues = Queues}) -> - sets:fold(fun(Q,_) -> rabbit_amqqueue:unblock(Q, ChPid) end, [], Queues). + dict:map(fun(Q, _) -> rabbit_amqqueue:unblock(Q, ChPid) end, Queues). % A prefetch limit of zero means unlimited limit_reached(#lim{prefetch_count = 0}) -> -- cgit v1.2.1 From 6c25cdcea5b0ab405d3ab160446784221533e802 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Thu, 18 Dec 2008 18:29:58 +0000 Subject: Don't double monitor anything --- src/rabbit_limiter.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 6cc170f9..0d938580 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -144,8 +144,11 @@ code_change(_, State, _) -> % Starts to monitor a particular queue monitor_queue(QPid, State = #lim{queues = Queues}) -> - MonitorRef = erlang:monitor(process, QPid), - State#lim{queues = dict:store(QPid, MonitorRef, Queues)}. + case dict:is_key(QPid, Queues) of + false -> MonitorRef = erlang:monitor(process, QPid), + State#lim{queues = dict:store(QPid, MonitorRef, Queues)}; + true -> State + end. % Stops monitoring a particular queue demonitor_queue(QPid, State = #lim{queues = Queues}) -> -- cgit v1.2.1 From 742fcd66dd6b4a56a9481ba17f4d052389cc8386 Mon Sep 17 00:00:00 2001 From: Ben Hood <0x6e6562@gmail.com> Date: Thu, 18 Dec 2008 18:46:05 +0000 Subject: Added handler for monitor notifications --- src/rabbit_limiter.erl | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 0d938580..9f23724e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -95,20 +95,13 @@ handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) -> end. % When the new limit is larger than the existing limit, -% notify all queues and forget about queues with an in-use -% capcity of zero +% notify all queues and forget about all queues handle_cast({prefetch_count, PrefetchCount}, State = #lim{prefetch_count = CurrentLimit}) when PrefetchCount > CurrentLimit -> notify_queues(State), NewState = demonitor_all(State), - {noreply, NewState#lim{prefetch_count = PrefetchCount, - in_use = 0}}; - -% Removes the queue process from the set of monitored queues -handle_cast({unregister_queue, QPid}, State = #lim{}) -> - NewState = decrement_in_use(1, State), - {noreply, demonitor_queue(QPid, NewState)}; + {noreply, NewState#lim{prefetch_count = PrefetchCount, in_use = 0}}; % Default setter of the prefetch count handle_cast({prefetch_count, PrefetchCount}, State) -> @@ -129,8 +122,10 @@ handle_cast({decrement_capacity, Magnitude}, State = #lim{in_use = InUse}) -> {noreply, NewState} end. -handle_info(_, State) -> - {noreply, State}. +%% This is received when a queue dies +handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, + State = #lim{queues = Queues}) -> + {noreply, State#lim{queues = dict:erase(QPid, Queues)}}. terminate(_, _) -> ok. @@ -150,12 +145,6 @@ monitor_queue(QPid, State = #lim{queues = Queues}) -> true -> State end. -% Stops monitoring a particular queue -demonitor_queue(QPid, State = #lim{queues = Queues}) -> - MonitorRef = dict:fetch(QPid, Queues), - true = erlang:demonitor(MonitorRef), - State#lim{queues = dict:erase(QPid, Queues)}. - % Stops monitoring all queues demonitor_all(State = #lim{queues = Queues}) -> dict:map(fun(_, Ref) -> true = erlang:demonitor(Ref) end, Queues), -- cgit v1.2.1 From 39478fac4b45ab2f710f68f66f5e656f903fda0f Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 18 Dec 2008 20:33:31 +0000 Subject: fix typo --- src/rabbit_limiter.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 9f23724e..b939b4bb 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -84,8 +84,8 @@ decrement_capacity(LimiterPid, Magnitude) -> init([ChPid]) -> {ok, #lim{ch_pid = ChPid} }. -% This queuries the limiter to ask if it is possible to send a message without -% breaching a limit for this queue process +% This queries the limiter to ask if it is possible to send a message +% without breaching a limit for this queue process handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) -> NewState = monitor_queue(QPid, State), case limit_reached(NewState) of -- cgit v1.2.1 From c2c4195b7d9c1261e0e7a775ae4e9e5782d85f39 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 18 Dec 2008 20:58:51 +0000 Subject: lots of tweaks and fixes - remove superfluous (or wrong) comments - notification and demonitoring always go together - don't change the in_use count when limit is altered - fix the limit_reached condition --- src/rabbit_limiter.erl | 68 +++++++++++++++----------------------------------- 1 file changed, 20 insertions(+), 48 deletions(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index b939b4bb..e5e54563 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -84,45 +84,26 @@ decrement_capacity(LimiterPid, Magnitude) -> init([ChPid]) -> {ok, #lim{ch_pid = ChPid} }. -% This queries the limiter to ask if it is possible to send a message -% without breaching a limit for this queue process handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) -> - NewState = monitor_queue(QPid, State), - case limit_reached(NewState) of - true -> {reply, false, NewState}; - false -> - {reply, true, NewState#lim{in_use = InUse + 1}} + case limit_reached(State) of + true -> {reply, false, remember_queue(QPid, State)}; + false -> {reply, true, State#lim{in_use = InUse + 1}} end. -% When the new limit is larger than the existing limit, -% notify all queues and forget about all queues handle_cast({prefetch_count, PrefetchCount}, - State = #lim{prefetch_count = CurrentLimit}) - when PrefetchCount > CurrentLimit -> - notify_queues(State), - NewState = demonitor_all(State), - {noreply, NewState#lim{prefetch_count = PrefetchCount, in_use = 0}}; - -% Default setter of the prefetch count -handle_cast({prefetch_count, PrefetchCount}, State) -> - {noreply, State#lim{prefetch_count = PrefetchCount}}; - -% This is an asynchronous ack from a queue that it has received an ack from -% a queue. This allows the limiter to update the the in-use-by-that queue -% capacity infromation. -handle_cast({decrement_capacity, Magnitude}, State = #lim{in_use = InUse}) -> + State = #lim{prefetch_count = CurrentLimit}) -> + NewState = State#lim{prefetch_count = PrefetchCount}, + {noreply, if PrefetchCount > CurrentLimit -> forget_queues(NewState); + true -> NewState + end}; + +handle_cast({decrement_capacity, Magnitude}, State) -> NewState = decrement_in_use(Magnitude, State), ShouldNotify = limit_reached(State) and not(limit_reached(NewState)), - if - ShouldNotify -> - notify_queues(State), - NextState = demonitor_all(State), - {noreply, NextState#lim{in_use = InUse - Magnitude}}; - true -> - {noreply, NewState} - end. + {noreply, if ShouldNotify -> forget_queues(NewState); + true -> NewState + end}. -%% This is received when a queue dies handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, State = #lim{queues = Queues}) -> {noreply, State#lim{queues = dict:erase(QPid, Queues)}}. @@ -137,35 +118,26 @@ code_change(_, State, _) -> % Internal plumbing %--------------------------------------------------------------------------- -% Starts to monitor a particular queue -monitor_queue(QPid, State = #lim{queues = Queues}) -> +remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of false -> MonitorRef = erlang:monitor(process, QPid), State#lim{queues = dict:store(QPid, MonitorRef, Queues)}; true -> State end. -% Stops monitoring all queues -demonitor_all(State = #lim{queues = Queues}) -> - dict:map(fun(_, Ref) -> true = erlang:demonitor(Ref) end, Queues), +forget_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> + ok = dict:fold(fun(Q, Ref, ok) -> + true = erlang:demonitor(Ref), + rabbit_amqqueue:unblock(Q, ChPid) + end, ok, Queues), State#lim{queues = dict:new()}. -% Reduces the in-use-count of the queue by a specific magnitude decrement_in_use(_, State = #lim{in_use = 0}) -> State#lim{in_use = 0}; - decrement_in_use(Magnitude, State = #lim{in_use = InUse}) -> State#lim{in_use = InUse - Magnitude}. -% Unblocks every queue that this limiter knows about -notify_queues(#lim{ch_pid = ChPid, queues = Queues}) -> - dict:map(fun(Q, _) -> rabbit_amqqueue:unblock(Q, ChPid) end, Queues). - -% A prefetch limit of zero means unlimited limit_reached(#lim{prefetch_count = 0}) -> false; - -% Works out whether the limit is breached for the current limiter state limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) -> - InUse == Limit. - + InUse >= Limit. -- cgit v1.2.1 From fb718f3812c48d6021480d9ca578c9a47005e2eb Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 18 Dec 2008 21:05:20 +0000 Subject: tidy comments --- src/rabbit_limiter.erl | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index e5e54563..e225e827 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -55,11 +55,10 @@ queues = dict:new(), in_use = 0}). -%--------------------------------------------------------------------------- -% API -%--------------------------------------------------------------------------- +%%---------------------------------------------------------------------------- +%% API +%-%--------------------------------------------------------------------------- -% Kicks this pig start_link(ChPid) -> {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []), Pid. @@ -67,19 +66,19 @@ start_link(ChPid) -> set_prefetch_count(LimiterPid, PrefetchCount) -> gen_server:cast(LimiterPid, {prefetch_count, PrefetchCount}). -% Queries the limiter to ask whether the queue can deliver a message -% without breaching a limit +%% Ask the limiter whether the queue can deliver a message without +%% breaching a limit can_send(LimiterPid, QPid) -> gen_server:call(LimiterPid, {can_send, QPid}). -% Lets the limiter know that a queue has received an ack from a consumer -% and hence can reduce the in-use-by-that queue capcity information +%% Let the limiter know that the channel has received some acks from a +%% consumer decrement_capacity(LimiterPid, Magnitude) -> gen_server:cast(LimiterPid, {decrement_capacity, Magnitude}). -%--------------------------------------------------------------------------- -% gen_server callbacks -%--------------------------------------------------------------------------- +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- init([ChPid]) -> {ok, #lim{ch_pid = ChPid} }. -- cgit v1.2.1 From 31b713c6de94dae6329fbbf12aa1f3d4af0f2ba4 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 18 Dec 2008 21:11:10 +0000 Subject: tidy some more comments --- src/rabbit_limiter.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index e225e827..cd8f7734 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -113,9 +113,9 @@ terminate(_, _) -> code_change(_, State, _) -> State. -%--------------------------------------------------------------------------- -% Internal plumbing -%--------------------------------------------------------------------------- +%%---------------------------------------------------------------------------- +%% Internal plumbing +%%---------------------------------------------------------------------------- remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of -- cgit v1.2.1 From 13cc87c8935cdabf65fded6015f02b5991f37204 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 18 Dec 2008 21:28:57 +0000 Subject: refactoring --- src/rabbit_limiter.erl | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index cd8f7734..6388c360 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -89,19 +89,14 @@ handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) -> false -> {reply, true, State#lim{in_use = InUse + 1}} end. -handle_cast({prefetch_count, PrefetchCount}, - State = #lim{prefetch_count = CurrentLimit}) -> - NewState = State#lim{prefetch_count = PrefetchCount}, - {noreply, if PrefetchCount > CurrentLimit -> forget_queues(NewState); - true -> NewState - end}; - -handle_cast({decrement_capacity, Magnitude}, State) -> - NewState = decrement_in_use(Magnitude, State), - ShouldNotify = limit_reached(State) and not(limit_reached(NewState)), - {noreply, if ShouldNotify -> forget_queues(NewState); - true -> NewState - end}. +handle_cast({prefetch_count, PrefetchCount}, State) -> + {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})}; + +handle_cast({decrement_capacity, Magnitude}, State = #lim{in_use = InUse}) -> + NewInUse = if InUse == 0 -> 0; + true -> InUse - Magnitude + end, + {noreply, maybe_notify(State, State#lim{in_use = NewInUse})}. handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, State = #lim{queues = Queues}) -> @@ -117,6 +112,12 @@ code_change(_, State, _) -> %% Internal plumbing %%---------------------------------------------------------------------------- +maybe_notify(OldState, NewState) -> + case limit_reached(OldState) and not(limit_reached(NewState)) of + true -> forget_queues(NewState); + false -> NewState + end. + remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of false -> MonitorRef = erlang:monitor(process, QPid), @@ -131,11 +132,6 @@ forget_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> end, ok, Queues), State#lim{queues = dict:new()}. -decrement_in_use(_, State = #lim{in_use = 0}) -> - State#lim{in_use = 0}; -decrement_in_use(Magnitude, State = #lim{in_use = InUse}) -> - State#lim{in_use = InUse - Magnitude}. - limit_reached(#lim{prefetch_count = 0}) -> false; limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) -> -- cgit v1.2.1 From 6fe2881e4013895ef6b2f43d6b118a221f023a98 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 18 Dec 2008 21:34:13 +0000 Subject: oops --- src/rabbit_limiter.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 6388c360..6e9b10a2 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -57,7 +57,7 @@ %%---------------------------------------------------------------------------- %% API -%-%--------------------------------------------------------------------------- +%%---------------------------------------------------------------------------- start_link(ChPid) -> {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []), -- cgit v1.2.1 From bb75417a9173744f32d5a312293cf47555d71c85 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 18 Dec 2008 21:43:36 +0000 Subject: minor simplifications --- src/rabbit_limiter.erl | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 6e9b10a2..257950b3 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -113,11 +113,14 @@ code_change(_, State, _) -> %%---------------------------------------------------------------------------- maybe_notify(OldState, NewState) -> - case limit_reached(OldState) and not(limit_reached(NewState)) of + case limit_reached(OldState) andalso not(limit_reached(NewState)) of true -> forget_queues(NewState); false -> NewState end. +limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) -> + Limit =/= 0 andalso InUse >= Limit. + remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of false -> MonitorRef = erlang:monitor(process, QPid), @@ -131,8 +134,3 @@ forget_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> rabbit_amqqueue:unblock(Q, ChPid) end, ok, Queues), State#lim{queues = dict:new()}. - -limit_reached(#lim{prefetch_count = 0}) -> - false; -limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) -> - InUse >= Limit. -- cgit v1.2.1 From 69d95872d3e87d0fdda7ed408d4bfdae4d37d9a9 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 19 Dec 2008 00:10:31 +0000 Subject: handle transactional acks also: - simplify and optimise non-basic-get ack counting - don't talk to the limiter when there is nothing to tell, i.e. the non-basic-get ack count is zero --- src/rabbit_channel.erl | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b77d26a0..c586df02 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -277,7 +277,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{transaction_id = TxnKey, - limiter = Limiter, next_tag = NextDeliveryTag, unacked_message_q = UAMQ}) -> if DeliveryTag >= NextDeliveryTag -> @@ -286,24 +285,10 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, true -> ok end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), - % CC the limiter on the number of acks that have been received - % but don't include any acks from a basic.get bottom half - % (hence the differentiation between tags set to none and other tags) - % TODO - this is quite crude and is probably more expensive than it should - % be - according to the OTP documentation, len/1 runs in O(n), probably - % not so cool for a queuing system - NotBasicGet = queue:filter( - fun({_CurrentDeliveryTag, ConsumerTag, _Msg}) -> - case ConsumerTag of - none -> false; - _ -> true - end - end, Acked), - % TODO Optimization: Probably don't need to send this if len = 0 - rabbit_limiter:decrement_capacity(Limiter, queue:len(NotBasicGet)), Participants = ack(State#ch.proxy_pid, TxnKey, Acked), {noreply, case TxnKey of - none -> State#ch{unacked_message_q = Remaining}; + none -> ok = notify_limiter(State#ch.limiter, Acked), + State#ch{unacked_message_q = Remaining}; _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q, Acked), add_tx_participants( @@ -789,7 +774,9 @@ internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> case rabbit_amqqueue:commit_all(sets:to_list(Participants), TxnKey) of - ok -> new_tx(State); + ok -> ok = notify_limiter(State#ch.limiter, + State#ch.uncommitted_ack_q), + new_tx(State); {error, Errors} -> rabbit_misc:protocol_error( internal_error, "commit failed: ~w", [Errors]) end. @@ -840,6 +827,17 @@ notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> end], ProxyPid). +%% tell the limiter about the number of acks that have been received +%% for messages delivered to subscribed consumers, rather than those +%% for messages sent in a response to a basic.get +notify_limiter(Limiter, Acked) -> + case lists:foldl(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, queue:to_list(Acked)) of + 0 -> ok; + Count -> rabbit_limiter:decrement_capacity(Limiter, Count) + end. + is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> case Mode of -- cgit v1.2.1 From 2bfbd6d7e52eff0c5fe65dcd467811904dc61107 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 19 Dec 2008 01:36:00 +0000 Subject: fix bug: make sure consumers blocked due to limit are dropped from State --- src/rabbit_amqqueue_process.erl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 702a8aee..b03887b8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -199,7 +199,7 @@ deliver_immediately(Message, Delivered, Delivered, Message, NextId, QName, QEntry, RoundRobinTail, State); {empty, _} -> - not_offered + {not_offered, State} end. % TODO The arity of this function seems a bit large :-( @@ -231,8 +231,8 @@ attempt_delivery(none, Message, State) -> persist_message(none, qname(State), Message), persist_delivery(qname(State), Message, false), {true, State1}; - not_offered -> - {false, State} + {not_offered, State1} -> + {false, State1} end; attempt_delivery(Txn, Message, State) -> persist_message(Txn, qname(State), Message), @@ -367,8 +367,8 @@ run_poke_burst(MessageBuffer, State) -> {offered, false, NewState} -> persist_auto_ack(qname(State), Message), run_poke_burst(BufferTail, NewState); - not_offered -> - State#q{message_buffer = MessageBuffer} + {not_offered, NewState} -> + NewState#q{message_buffer = MessageBuffer} end; {empty, _} -> State#q{message_buffer = MessageBuffer} -- cgit v1.2.1 From eab60583f79a3f689be30403b7f8a56aad99bd8b Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 19 Dec 2008 16:53:51 +0000 Subject: saner state transition handling and assorted bug fixes --- src/rabbit_amqqueue_process.erl | 91 ++++++++++++++++++----------------------- 1 file changed, 39 insertions(+), 52 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b03887b8..53b569b4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -66,7 +66,6 @@ monitor_ref, unacked_messages, is_limit_active, - is_overload_protection_active, unsent_message_count}). -define(INFO_KEYS, @@ -133,7 +132,7 @@ ch_record(ChPid) -> ch_pid = ChPid, monitor_ref = MonitorRef, unacked_messages = dict:new(), - is_overload_protection_active = false, + is_limit_active = false, unsent_message_count = 0}, put(Key, C), C; @@ -146,24 +145,16 @@ store_ch_record(C = #cr{ch_pid = ChPid}) -> all_ch_record() -> [C || {{ch, _}, C} <- get()]. -update_store_and_maybe_block_ch( - C = #cr{is_overload_protection_active = Overloaded, - is_limit_active = Limited, - unsent_message_count = Count}) -> - {Result, NewOverloaded, NewLimited} = - if - not(Overloaded) and (Count > ?UNSENT_MESSAGE_LIMIT) -> - {block_ch, true, Limited}; - Overloaded and (Count == 0) -> - {unblock_ch, false, Limited}; - Limited and (Count < ?UNSENT_MESSAGE_LIMIT) -> - {unblock_ch, Overloaded, false}; - true -> - {ok, Overloaded, Limited} - end, - store_ch_record(C#cr{is_overload_protection_active = NewOverloaded, - is_limit_active = NewLimited}), - Result. +is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> + Limited orelse Count > ?UNSENT_MESSAGE_LIMIT. + +ch_record_state_transition(OldCR, NewCR) -> + BlockedOld = is_ch_blocked(OldCR), + BlockedNew = is_ch_blocked(NewCR), + if BlockedOld andalso not(BlockedNew) -> unblock; + BlockedNew andalso not(BlockedOld) -> block; + true -> ok + end. deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, @@ -213,12 +204,13 @@ really_deliver(AckRequired, ChPid, ConsumerTag, Delivered, Message, NextId, true -> dict:store(NextId, Message, UAM); false -> UAM end, + NewC = C#cr{unsent_message_count = Count + 1, + unacked_messages = NewUAM}, + store_ch_record(NewC), NewConsumers = - case update_store_and_maybe_block_ch( - C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}) of - ok -> queue:in(QEntry, RoundRobinTail); - block_ch -> block_consumers(ChPid, RoundRobinTail) + case ch_record_state_transition(C, NewC) of + ok -> queue:in(QEntry, RoundRobinTail); + block -> block_consumers(ChPid, RoundRobinTail) end, {offered, AckRequired, State#q{round_robin = NewConsumers, next_msg_id = NextId +1}}. @@ -270,16 +262,22 @@ block_consumer(ChPid, ConsumerTag, RoundRobin) -> (CP /= ChPid) or (CT /= ConsumerTag) end, queue:to_list(RoundRobin))). -possibly_unblock(C = #cr{consumers = Consumers, ch_pid = ChPid}, - State = #q{round_robin = RoundRobin}) -> - case update_store_and_maybe_block_ch(C) of - ok -> +possibly_unblock(State, ChPid, Update) -> + case lookup_ch(ChPid) of + not_found -> State; - unblock_ch -> - run_poke_burst(State#q{round_robin = - unblock_consumers(ChPid, Consumers, RoundRobin)}) + C -> + NewC = Update(C), + store_ch_record(NewC), + case ch_record_state_transition(C, NewC) of + ok -> State; + unblock -> NewRR = unblock_consumers(ChPid, + NewC#cr.consumers, + State#q.round_robin), + run_poke_burst(State#q{round_robin = NewRR}) + end end. - + check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) -> {continue, State}; check_auto_delete(State = #q{has_had_consumers = false}) -> @@ -764,27 +762,16 @@ handle_cast({requeue, MsgIds, ChPid}, State) -> end; handle_cast({unblock, ChPid}, State) -> - % TODO Refactor the code duplication - % between this an the notify_sent cast handler - case lookup_ch(ChPid) of - not_found -> - noreply(State); - C = #cr{is_limit_active = true} -> - noreply(possibly_unblock(C, State)); - C -> - rabbit_log:warning("Ignoring unblock for an active ch: ~p~n", - [C]), - noreply(State) - end; + noreply( + possibly_unblock(State, ChPid, + fun (C) -> C#cr{is_limit_active = false} end)); handle_cast({notify_sent, ChPid}, State) -> - case lookup_ch(ChPid) of - not_found -> noreply(State); - T = #cr{unsent_message_count =Count} -> - noreply(possibly_unblock( - T#cr{unsent_message_count = Count - 1}, - State)) - end. + noreply( + possibly_unblock(State, ChPid, + fun (C = #cr{unsent_message_count = Count}) -> + C#cr{unsent_message_count = Count - 1} + end)). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> -- cgit v1.2.1 From 8154121e282bab4951a11daacb0428ad76c2db3e Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 23 Dec 2008 10:26:23 +0000 Subject: ensure fairness --- src/rabbit_limiter.erl | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 257950b3..7ca9772b 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -129,8 +129,17 @@ remember_queue(QPid, State = #lim{queues = Queues}) -> end. forget_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> - ok = dict:fold(fun(Q, Ref, ok) -> - true = erlang:demonitor(Ref), - rabbit_amqqueue:unblock(Q, ChPid) - end, ok, Queues), + QList = dict:to_list(Queues), + case length(QList) of + 0 -> ok; + L -> + %% We randomly vary the position in which each queue + %% appears in the list, thus ensuring that each queue has + %% an equal chance of being notified first. + {L1, L2} = lists:split(random:uniform(L), QList), + [begin + true = erlang:demonitor(Ref), + ok = rabbit_amqqueue:unblock(Q, ChPid) + end || {Q, Ref} <- L2 ++ L1] + end, State#lim{queues = dict:new()}. -- cgit v1.2.1 From 05fa30a345228631d8c7002252ad133ef38e0e9e Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 23 Dec 2008 15:02:07 +0000 Subject: cosmetic --- src/rabbit_channel.erl | 4 ++-- src/rabbit_limiter.erl | 20 ++++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4b0cf6d5..36888f33 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -430,7 +430,7 @@ handle_method(#'basic.qos'{prefetch_size = Size}, handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, State = #ch{limiter = Limiter}) -> - rabbit_limiter:set_prefetch_count(Limiter, PrefetchCount), + ok = rabbit_limiter:limit(Limiter, PrefetchCount), {reply, #'basic.qos_ok'{}, State}; handle_method(#'basic.recover'{requeue = true}, @@ -838,7 +838,7 @@ notify_limiter(Limiter, Acked) -> ({_, _, _}, Acc) -> Acc + 1 end, 0, queue:to_list(Acked)) of 0 -> ok; - Count -> rabbit_limiter:decrement_capacity(Limiter, Count) + Count -> rabbit_limiter:ack(Limiter, Count) end. is_message_persistent(#content{properties = #'P_basic'{ diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 7ca9772b..12632625 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -36,15 +36,15 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1]). --export([set_prefetch_count/2, can_send/2, decrement_capacity/2]). +-export([limit/2, can_send/2, ack/2]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(set_prefetch_count/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(limit/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(can_send/2 :: (pid(), pid()) -> bool()). --spec(decrement_capacity/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(ack/2 :: (pid(), non_neg_integer()) -> 'ok'). -endif. @@ -63,8 +63,8 @@ start_link(ChPid) -> {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []), Pid. -set_prefetch_count(LimiterPid, PrefetchCount) -> - gen_server:cast(LimiterPid, {prefetch_count, PrefetchCount}). +limit(LimiterPid, PrefetchCount) -> + gen_server:cast(LimiterPid, {limit, PrefetchCount}). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit @@ -73,8 +73,8 @@ can_send(LimiterPid, QPid) -> %% Let the limiter know that the channel has received some acks from a %% consumer -decrement_capacity(LimiterPid, Magnitude) -> - gen_server:cast(LimiterPid, {decrement_capacity, Magnitude}). +ack(LimiterPid, Count) -> + gen_server:cast(LimiterPid, {ack, Count}). %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -89,12 +89,12 @@ handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) -> false -> {reply, true, State#lim{in_use = InUse + 1}} end. -handle_cast({prefetch_count, PrefetchCount}, State) -> +handle_cast({limit, PrefetchCount}, State) -> {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})}; -handle_cast({decrement_capacity, Magnitude}, State = #lim{in_use = InUse}) -> +handle_cast({ack, Count}, State = #lim{in_use = InUse}) -> NewInUse = if InUse == 0 -> 0; - true -> InUse - Magnitude + true -> InUse - Count end, {noreply, maybe_notify(State, State#lim{in_use = NewInUse})}. -- cgit v1.2.1 From 62874e89b1cd969e12ffc288fb43f46f1b72e05d Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 23 Dec 2008 16:00:34 +0000 Subject: make limiter keep track of all queues with subscriptions This is more efficient since it avoids the repeated (de)monitoring and updates to the limiter state. --- src/rabbit_amqqueue_process.erl | 12 ++++++++- src/rabbit_limiter.erl | 58 +++++++++++++++++++++++++++++------------ 2 files changed, 52 insertions(+), 18 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 53b569b4..c01f08df 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -634,6 +634,11 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, C1 = C#cr{consumers = [Consumer | Consumers], limiter_pid = LimiterPid}, store_ch_record(C1), + if Consumers == [] -> + ok = rabbit_limiter:register(LimiterPid, self()); + true -> + ok + end, State1 = State#q{has_had_consumers = true, exclusive_consumer = if @@ -653,12 +658,17 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumers = Consumers} -> + C = #cr{consumers = Consumers, limiter_pid = LimiterPid} -> NewConsumers = lists:filter (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, Consumers), C1 = C#cr{consumers = NewConsumers}, store_ch_record(C1), + if NewConsumers == [] -> + ok = rabbit_limiter:unregister(LimiterPid, self()); + true -> + ok + end, ok = maybe_send_reply(ChPid, OkMsg), case check_auto_delete( State#q{exclusive_consumer = cancel_holder(ChPid, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 12632625..f1a45415 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -36,7 +36,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1]). --export([limit/2, can_send/2, ack/2]). +-export([limit/2, can_send/2, ack/2, register/2, unregister/2]). %%---------------------------------------------------------------------------- @@ -45,6 +45,8 @@ -spec(limit/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(can_send/2 :: (pid(), pid()) -> bool()). -spec(ack/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(register/2 :: (pid(), pid()) -> 'ok'). +-spec(unregister/2 :: (pid(), pid()) -> 'ok'). -endif. @@ -76,6 +78,12 @@ can_send(LimiterPid, QPid) -> ack(LimiterPid, Count) -> gen_server:cast(LimiterPid, {ack, Count}). +register(LimiterPid, QPid) -> + gen_server:cast(LimiterPid, {register, QPid}). + +unregister(LimiterPid, QPid) -> + gen_server:cast(LimiterPid, {unregister, QPid}). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -83,9 +91,13 @@ ack(LimiterPid, Count) -> init([ChPid]) -> {ok, #lim{ch_pid = ChPid} }. -handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) -> +handle_call({can_send, _QPid}, _From, State = #lim{in_use = InUse}) -> case limit_reached(State) of - true -> {reply, false, remember_queue(QPid, State)}; + true -> + %% TODO: keep track of the fact that the specific QPid has + %% had a can_send request rejected, so we can restrict the + %% notifications to these QPids only. + {reply, false, State}; false -> {reply, true, State#lim{in_use = InUse + 1}} end. @@ -96,11 +108,16 @@ handle_cast({ack, Count}, State = #lim{in_use = InUse}) -> NewInUse = if InUse == 0 -> 0; true -> InUse - Count end, - {noreply, maybe_notify(State, State#lim{in_use = NewInUse})}. + {noreply, maybe_notify(State, State#lim{in_use = NewInUse})}; + +handle_cast({register, QPid}, State) -> + {noreply, remember_queue(QPid, State)}; -handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, - State = #lim{queues = Queues}) -> - {noreply, State#lim{queues = dict:erase(QPid, Queues)}}. +handle_cast({unregister, QPid}, State) -> + {noreply, forget_queue(QPid, State)}. + +handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, State) -> + {noreply, forget_queue(QPid, State)}. terminate(_, _) -> ok. @@ -114,9 +131,10 @@ code_change(_, State, _) -> maybe_notify(OldState, NewState) -> case limit_reached(OldState) andalso not(limit_reached(NewState)) of - true -> forget_queues(NewState); - false -> NewState - end. + true -> ok = notify_queues(NewState#lim.ch_pid, NewState#lim.queues); + false -> ok + end, + NewState. limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) -> Limit =/= 0 andalso InUse >= Limit. @@ -128,7 +146,16 @@ remember_queue(QPid, State = #lim{queues = Queues}) -> true -> State end. -forget_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> +forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) -> + case dict:find(QPid, Queues) of + {ok, MonitorRef} -> + true = erlang:demonitor(MonitorRef), + ok = rabbit_amqqueue:unblock(QPid, ChPid), + State#lim{queues = dict:erase(QPid, Queues)}; + error -> State + end. + +notify_queues(ChPid, Queues) -> QList = dict:to_list(Queues), case length(QList) of 0 -> ok; @@ -137,9 +164,6 @@ forget_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> %% appears in the list, thus ensuring that each queue has %% an equal chance of being notified first. {L1, L2} = lists:split(random:uniform(L), QList), - [begin - true = erlang:demonitor(Ref), - ok = rabbit_amqqueue:unblock(Q, ChPid) - end || {Q, Ref} <- L2 ++ L1] - end, - State#lim{queues = dict:new()}. + [ok = rabbit_amqqueue:unblock(Q, ChPid) || {Q, _} <- L2 ++ L1], + ok + end. -- cgit v1.2.1 From ef5e395acb15c1ffd2ecd5b82cc3ee7208890ea5 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 23 Dec 2008 16:28:22 +0000 Subject: create limiter lazily This makes an 'unlimited' channel as efficient as it used to be --- src/rabbit_channel.erl | 25 ++++++++++++++++++++----- src/rabbit_limiter.erl | 2 ++ 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index af1923a7..001fa4af 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -108,8 +108,7 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, - % TODO See point 3.1.1 of the design - start the limiter lazily - limiter = rabbit_limiter:start_link(ProxyPid), + limiter = undefined, consumer_mapping = dict:new()}. handle_message({method, Method, Content}, State) -> @@ -430,11 +429,25 @@ handle_method(#'basic.qos'{prefetch_size = Size}, "Pre-fetch size (~s) for basic.qos not implementented", [Size]); -handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{limiter = Limiter}) -> - ok = rabbit_limiter:limit(Limiter, PrefetchCount), +handle_method(#'basic.qos'{prefetch_count = 0}, + _, State = #ch{ limiter = undefined }) -> {reply, #'basic.qos_ok'{}, State}; +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, + _, State = #ch{ limiter = Limiter, + proxy_pid = ProxyPid }) -> + %% TODO: terminate limiter when transitioning to 'unlimited' + NewLimiter = case Limiter of + undefined -> + %% TODO: tell queues with subscribers about + %% the limiter + rabbit_limiter:start_link(ProxyPid); + Pid -> + Pid + end, + ok = rabbit_limiter:limit(NewLimiter, PrefetchCount), + {reply, #'basic.qos_ok'{}, State#ch{limiter = NewLimiter}}; + handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, proxy_pid = ProxyPid, @@ -835,6 +848,8 @@ notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, rather than those %% for messages sent in a response to a basic.get +notify_limiter(undefined, _Acked) -> + ok; notify_limiter(Limiter, Acked) -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; ({_, _, _}, Acc) -> Acc + 1 diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index f1a45415..824de072 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -70,6 +70,8 @@ limit(LimiterPid, PrefetchCount) -> %% Ask the limiter whether the queue can deliver a message without %% breaching a limit +can_send(undefined, _QPid) -> + true; can_send(LimiterPid, QPid) -> gen_server:call(LimiterPid, {can_send, QPid}). -- cgit v1.2.1 From cd234a9d8c95c5ef501554935d879b020a9678fa Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 23 Dec 2008 20:47:16 +0000 Subject: deal with limiting after consumer subscription --- src/rabbit_amqqueue.erl | 9 ++++++++- src/rabbit_amqqueue_process.erl | 15 ++++++++++++++- src/rabbit_channel.erl | 38 +++++++++++++++++++++----------------- 3 files changed, 43 insertions(+), 19 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 24ded98c..a345f5ab 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -40,7 +40,7 @@ -export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2]). -export([unblock/2]). --export([commit_all/2, rollback_all/2, notify_down_all/2]). +-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -import(mnesia). @@ -92,6 +92,7 @@ -spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). +-spec(limit_all/3 :: ([pid()], pid(), pid()) -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). @@ -261,6 +262,12 @@ notify_down_all(QPids, ChPid) -> fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end, QPids). +limit_all(QPids, ChPid, LimiterPid) -> + safe_pmap_ok( + fun (_) -> ok end, + fun (QPid) -> gen_server:cast(QPid, {limit, ChPid, LimiterPid}) end, + QPids). + claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> gen_server:call(QPid, {claim_queue, ReaderPid}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c01f08df..c6bb0502 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -781,7 +781,20 @@ handle_cast({notify_sent, ChPid}, State) -> possibly_unblock(State, ChPid, fun (C = #cr{unsent_message_count = Count}) -> C#cr{unsent_message_count = Count - 1} - end)). + end)); + +handle_cast({limit, ChPid, LimiterPid}, State) -> + case lookup_ch(ChPid) of + not_found -> + ok; + C = #cr{consumers = Consumers} -> + if Consumers =/= [] -> + ok = rabbit_limiter:register(LimiterPid, self()); + true -> ok + end, + store_ch_record(C#cr{limiter_pid = LimiterPid}) + end, + noreply(State). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 001fa4af..51e550ed 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -439,11 +439,11 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, %% TODO: terminate limiter when transitioning to 'unlimited' NewLimiter = case Limiter of undefined -> - %% TODO: tell queues with subscribers about - %% the limiter - rabbit_limiter:start_link(ProxyPid); - Pid -> - Pid + LPid = rabbit_limiter:start_link(ProxyPid), + ok = limit_queues(LPid, State), + LPid; + LPid -> + LPid end, ok = rabbit_limiter:limit(NewLimiter, PrefetchCount), {reply, #'basic.qos_ok'{}, State#ch{limiter = NewLimiter}}; @@ -832,18 +832,22 @@ fold_per_queue(F, Acc0, UAQ) -> Acc0, D). notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> - rabbit_amqqueue:notify_down_all( - [QPid || QueueName <- - sets:to_list( - dict:fold(fun (_ConsumerTag, QueueName, S) -> - sets:add_element(QueueName, S) - end, sets:new(), Consumers)), - case rabbit_amqqueue:lookup(QueueName) of - {ok, Q} -> QPid = Q#amqqueue.pid, true; - %% queue has been deleted in the meantime - {error, not_found} -> QPid = none, false - end], - ProxyPid). + rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), ProxyPid). + +limit_queues(LPid, #ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> + rabbit_amqqueue:limit_all(consumer_queues(Consumers), ProxyPid, LPid). + +consumer_queues(Consumers) -> + [QPid || QueueName <- + sets:to_list( + dict:fold(fun (_ConsumerTag, QueueName, S) -> + sets:add_element(QueueName, S) + end, sets:new(), Consumers)), + case rabbit_amqqueue:lookup(QueueName) of + {ok, Q} -> QPid = Q#amqqueue.pid, true; + %% queue has been deleted in the meantime + {error, not_found} -> QPid = none, false + end]. %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, rather than those -- cgit v1.2.1 From 7b6e96533e2d572740818c801cef6f69a1824d9f Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 23 Dec 2008 21:14:46 +0000 Subject: cosmetic --- src/rabbit_channel.erl | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 51e550ed..71009747 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -39,10 +39,10 @@ %% callbacks -export([init/2, handle_message/2]). --record(ch, {state, proxy_pid, reader_pid, writer_pid, +-record(ch, {state, proxy_pid, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, - username, virtual_host, limiter, + username, virtual_host, most_recently_declared_queue, consumer_mapping}). %%---------------------------------------------------------------------------- @@ -100,6 +100,7 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> proxy_pid = ProxyPid, reader_pid = ReaderPid, writer_pid = WriterPid, + limiter_pid = undefined, transaction_id = none, tx_participants = sets:new(), next_tag = 1, @@ -108,7 +109,6 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, - limiter = undefined, consumer_mapping = dict:new()}. handle_message({method, Method, Content}, State) -> @@ -291,7 +291,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), Participants = ack(State#ch.proxy_pid, TxnKey, Acked), {noreply, case TxnKey of - none -> ok = notify_limiter(State#ch.limiter, Acked), + none -> ok = notify_limiter(State#ch.limiter_pid, Acked), State#ch{unacked_message_q = Remaining}; _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q, Acked), @@ -336,7 +336,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{ proxy_pid = ProxyPid, reader_pid = ReaderPid, - limiter = LimiterPid, + limiter_pid = LimiterPid, consumer_mapping = ConsumerMapping }) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -430,23 +430,23 @@ handle_method(#'basic.qos'{prefetch_size = Size}, [Size]); handle_method(#'basic.qos'{prefetch_count = 0}, - _, State = #ch{ limiter = undefined }) -> + _, State = #ch{ limiter_pid = undefined }) -> {reply, #'basic.qos_ok'{}, State}; handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{ limiter = Limiter, + _, State = #ch{ limiter_pid = LimiterPid, proxy_pid = ProxyPid }) -> %% TODO: terminate limiter when transitioning to 'unlimited' - NewLimiter = case Limiter of - undefined -> - LPid = rabbit_limiter:start_link(ProxyPid), - ok = limit_queues(LPid, State), - LPid; - LPid -> - LPid - end, - ok = rabbit_limiter:limit(NewLimiter, PrefetchCount), - {reply, #'basic.qos_ok'{}, State#ch{limiter = NewLimiter}}; + NewLimiterPid = case LimiterPid of + undefined -> + LPid = rabbit_limiter:start_link(ProxyPid), + ok = limit_queues(LPid, State), + LPid; + LPid -> + LPid + end, + ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount), + {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}}; handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, @@ -792,7 +792,7 @@ internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> case rabbit_amqqueue:commit_all(sets:to_list(Participants), TxnKey) of - ok -> ok = notify_limiter(State#ch.limiter, + ok -> ok = notify_limiter(State#ch.limiter_pid, State#ch.uncommitted_ack_q), new_tx(State); {error, Errors} -> rabbit_misc:protocol_error( @@ -854,12 +854,12 @@ consumer_queues(Consumers) -> %% for messages sent in a response to a basic.get notify_limiter(undefined, _Acked) -> ok; -notify_limiter(Limiter, Acked) -> +notify_limiter(LimiterPid, Acked) -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; ({_, _, _}, Acc) -> Acc + 1 end, 0, queue:to_list(Acked)) of 0 -> ok; - Count -> rabbit_limiter:ack(Limiter, Count) + Count -> rabbit_limiter:ack(LimiterPid, Count) end. is_message_persistent(#content{properties = #'P_basic'{ -- cgit v1.2.1 From 20c86095dd1440f27f51763c932c71034d3fdde4 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 23 Dec 2008 21:39:11 +0000 Subject: handle the "no limiter" case more obviously This is no semantic change since gen_server:cast(undefined, ...) returns 'ok'. However, it only does so because it catches the 'badarg' error thrown by erlang:send. It is probably more efficient to not attempt the send in the first place. Plus for documentation purposes, and to keep dialyzer happy, it is useful to state explicitly which functions are expected to be called on an 'undefined' limiter. --- src/rabbit_limiter.erl | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 824de072..e02f77b1 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -70,21 +70,19 @@ limit(LimiterPid, PrefetchCount) -> %% Ask the limiter whether the queue can deliver a message without %% breaching a limit -can_send(undefined, _QPid) -> - true; -can_send(LimiterPid, QPid) -> - gen_server:call(LimiterPid, {can_send, QPid}). +can_send(undefined, _QPid) -> true; +can_send(LimiterPid, QPid) -> gen_server:call(LimiterPid, {can_send, QPid}). %% Let the limiter know that the channel has received some acks from a %% consumer -ack(LimiterPid, Count) -> - gen_server:cast(LimiterPid, {ack, Count}). +ack(undefined, _Count) -> ok; +ack(LimiterPid, Count) -> gen_server:cast(LimiterPid, {ack, Count}). -register(LimiterPid, QPid) -> - gen_server:cast(LimiterPid, {register, QPid}). +register(undefined, _QPid) -> ok; +register(LimiterPid, QPid) -> gen_server:cast(LimiterPid, {register, QPid}). -unregister(LimiterPid, QPid) -> - gen_server:cast(LimiterPid, {unregister, QPid}). +unregister(undefined, _QPid) -> ok; +unregister(LimiterPid, QPid) -> gen_server:cast(LimiterPid, {unregister, QPid}). %%---------------------------------------------------------------------------- %% gen_server callbacks -- cgit v1.2.1 From 01b464088eaf595b0c1a84d289f470a67ea99071 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 23 Dec 2008 21:44:52 +0000 Subject: add type spec for start_link --- src/rabbit_limiter.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index e02f77b1..a9ec9e10 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -42,6 +42,7 @@ -ifdef(use_specs). +-spec(start_link/1 :: (pid()) -> pid()). -spec(limit/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(can_send/2 :: (pid(), pid()) -> bool()). -spec(ack/2 :: (pid(), non_neg_integer()) -> 'ok'). -- cgit v1.2.1 From 8d7ad7391c0edc4052b193b696b7dce449ae0c39 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 23 Dec 2008 21:54:26 +0000 Subject: don't leave the limiter behind when the channel terminates --- src/rabbit_channel.erl | 4 +++- src/rabbit_limiter.erl | 11 ++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 71009747..e7678cdf 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -162,13 +162,15 @@ handle_message(Other, State) -> %%--------------------------------------------------------------------------- -terminate(Reason, State = #ch{writer_pid = WriterPid}) -> +terminate(Reason, State = #ch{writer_pid = WriterPid, + limiter_pid = LimiterPid}) -> Res = notify_queues(internal_rollback(State)), case Reason of normal -> ok = Res; _ -> ok end, rabbit_writer:shutdown(WriterPid), + rabbit_limiter:shutdown(LimiterPid), exit(Reason). return_ok(State, true, _Msg) -> {noreply, State}; diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index a9ec9e10..6ffa8c23 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -35,7 +35,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --export([start_link/1]). +-export([start_link/1, shutdown/1]). -export([limit/2, can_send/2, ack/2, register/2, unregister/2]). %%---------------------------------------------------------------------------- @@ -43,6 +43,7 @@ -ifdef(use_specs). -spec(start_link/1 :: (pid()) -> pid()). +-spec(shutdown/1 :: (pid()) -> 'ok'). -spec(limit/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(can_send/2 :: (pid(), pid()) -> bool()). -spec(ack/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -66,6 +67,11 @@ start_link(ChPid) -> {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []), Pid. +shutdown(undefined) -> + ok; +shutdown(LimiterPid) -> + gen_server:cast(LimiterPid, shutdown). + limit(LimiterPid, PrefetchCount) -> gen_server:cast(LimiterPid, {limit, PrefetchCount}). @@ -102,6 +108,9 @@ handle_call({can_send, _QPid}, _From, State = #lim{in_use = InUse}) -> false -> {reply, true, State#lim{in_use = InUse + 1}} end. +handle_cast(shutdown, State) -> + {stop, normal, State}; + handle_cast({limit, PrefetchCount}, State) -> {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})}; -- cgit v1.2.1 From 7de6e196f14ca8d83815b7e8e023535dda98647e Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Wed, 24 Dec 2008 14:57:47 +0000 Subject: destroy limiter when a channel becomes unlimited which results in far more efficient handling of subsequent deliveries --- src/rabbit_amqqueue_process.erl | 25 ++++++++++++++----------- src/rabbit_channel.erl | 25 ++++++++++++++++--------- src/rabbit_limiter.erl | 10 ++++++++-- 3 files changed, 38 insertions(+), 22 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c6bb0502..c49b06e5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -784,17 +784,20 @@ handle_cast({notify_sent, ChPid}, State) -> end)); handle_cast({limit, ChPid, LimiterPid}, State) -> - case lookup_ch(ChPid) of - not_found -> - ok; - C = #cr{consumers = Consumers} -> - if Consumers =/= [] -> - ok = rabbit_limiter:register(LimiterPid, self()); - true -> ok - end, - store_ch_record(C#cr{limiter_pid = LimiterPid}) - end, - noreply(State). + noreply( + possibly_unblock( + State, ChPid, + fun (C = #cr{consumers = Consumers, + limiter_pid = OldLimiterPid, + is_limit_active = Limited}) -> + if Consumers =/= [] andalso OldLimiterPid == undefined -> + ok = rabbit_limiter:register(LimiterPid, self()); + true -> + ok + end, + NewLimited = Limited andalso LimiterPid =/= undefined, + C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} + end)). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e7678cdf..a4bfacbb 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -154,6 +154,12 @@ handle_message({conserve_memory, Conserve}, State) -> State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), State; +handle_message({'EXIT', Pid, Reason}, State = #ch{proxy_pid = Pid}) -> + terminate(Reason, State); + +handle_message({'EXIT', _Pid, normal}, State) -> + State; + handle_message({'EXIT', _Pid, Reason}, State) -> terminate(Reason, State); @@ -431,21 +437,22 @@ handle_method(#'basic.qos'{prefetch_size = Size}, "Pre-fetch size (~s) for basic.qos not implementented", [Size]); -handle_method(#'basic.qos'{prefetch_count = 0}, - _, State = #ch{ limiter_pid = undefined }) -> - {reply, #'basic.qos_ok'{}, State}; - handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, State = #ch{ limiter_pid = LimiterPid, proxy_pid = ProxyPid }) -> - %% TODO: terminate limiter when transitioning to 'unlimited' - NewLimiterPid = case LimiterPid of - undefined -> + NewLimiterPid = case {LimiterPid, PrefetchCount} of + {undefined, 0} -> + undefined; + {undefined, _} -> LPid = rabbit_limiter:start_link(ProxyPid), ok = limit_queues(LPid, State), LPid; - LPid -> - LPid + {_, 0} -> + ok = rabbit_limiter:shutdown(LimiterPid), + ok = limit_queues(undefined, State), + undefined; + {_, _} -> + LimiterPid end, ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount), {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}}; diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 6ffa8c23..3e09bb37 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -72,13 +72,19 @@ shutdown(undefined) -> shutdown(LimiterPid) -> gen_server:cast(LimiterPid, shutdown). +limit(undefined, 0) -> + ok; limit(LimiterPid, PrefetchCount) -> gen_server:cast(LimiterPid, {limit, PrefetchCount}). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit -can_send(undefined, _QPid) -> true; -can_send(LimiterPid, QPid) -> gen_server:call(LimiterPid, {can_send, QPid}). +can_send(undefined, _QPid) -> + true; +can_send(LimiterPid, QPid) -> + rabbit_misc:with_exit_handler( + fun () -> true end, + fun () -> gen_server:call(LimiterPid, {can_send, QPid}) end). %% Let the limiter know that the channel has received some acks from a %% consumer -- cgit v1.2.1 From 4882f8c367da4a7f608d1b7be5ab941a4f592441 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 25 Dec 2008 19:46:49 +0000 Subject: optimisation: only notify queues that have had can_send requests rejected --- src/rabbit_limiter.erl | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 3e09bb37..38bf4cd4 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -104,13 +104,9 @@ unregister(LimiterPid, QPid) -> gen_server:cast(LimiterPid, {unregister, QPid}). init([ChPid]) -> {ok, #lim{ch_pid = ChPid} }. -handle_call({can_send, _QPid}, _From, State = #lim{in_use = InUse}) -> +handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) -> case limit_reached(State) of - true -> - %% TODO: keep track of the fact that the specific QPid has - %% had a can_send request rejected, so we can restrict the - %% notifications to these QPids only. - {reply, false, State}; + true -> {reply, false, limit_queue(QPid, State)}; false -> {reply, true, State#lim{in_use = InUse + 1}} end. @@ -147,32 +143,39 @@ code_change(_, State, _) -> maybe_notify(OldState, NewState) -> case limit_reached(OldState) andalso not(limit_reached(NewState)) of - true -> ok = notify_queues(NewState#lim.ch_pid, NewState#lim.queues); - false -> ok - end, - NewState. + true -> notify_queues(NewState); + false -> NewState + end. limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) -> Limit =/= 0 andalso InUse >= Limit. remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of - false -> MonitorRef = erlang:monitor(process, QPid), - State#lim{queues = dict:store(QPid, MonitorRef, Queues)}; + false -> MRef = erlang:monitor(process, QPid), + State#lim{queues = dict:store(QPid, {MRef, false}, Queues)}; true -> State end. forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) -> case dict:find(QPid, Queues) of - {ok, MonitorRef} -> - true = erlang:demonitor(MonitorRef), + {ok, {MRef, _}} -> + true = erlang:demonitor(MRef), ok = rabbit_amqqueue:unblock(QPid, ChPid), State#lim{queues = dict:erase(QPid, Queues)}; error -> State end. -notify_queues(ChPid, Queues) -> - QList = dict:to_list(Queues), +limit_queue(QPid, State = #lim{queues = Queues}) -> + UpdateFun = fun ({MRef, _}) -> {MRef, true} end, + State#lim{queues = dict:update(QPid, UpdateFun, Queues)}. + +notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> + {QList, NewQueues} = + dict:fold(fun (_QPid, {_, false}, Acc) -> Acc; + (QPid, {MRef, true}, {L, D}) -> + {[QPid | L], dict:store(QPid, {MRef, false}, D)} + end, {[], Queues}, Queues), case length(QList) of 0 -> ok; L -> @@ -180,6 +183,7 @@ notify_queues(ChPid, Queues) -> %% appears in the list, thus ensuring that each queue has %% an equal chance of being notified first. {L1, L2} = lists:split(random:uniform(L), QList), - [ok = rabbit_amqqueue:unblock(Q, ChPid) || {Q, _} <- L2 ++ L1], + [ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L2 ++ L1], ok - end. + end, + State#lim{queues = NewQueues}. -- cgit v1.2.1 From 0d6e4923d6d5f88ba6742983065b39ef296b6c8b Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 25 Dec 2008 20:10:34 +0000 Subject: cosmetic: rename 'in_use' to 'volume' --- src/rabbit_limiter.erl | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 38bf4cd4..62c6c73c 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -57,7 +57,7 @@ -record(lim, {prefetch_count = 0, ch_pid, queues = dict:new(), - in_use = 0}). + volume = 0}). %%---------------------------------------------------------------------------- %% API @@ -104,10 +104,10 @@ unregister(LimiterPid, QPid) -> gen_server:cast(LimiterPid, {unregister, QPid}). init([ChPid]) -> {ok, #lim{ch_pid = ChPid} }. -handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) -> +handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) -> case limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, State#lim{in_use = InUse + 1}} + false -> {reply, true, State#lim{volume = Volume + 1}} end. handle_cast(shutdown, State) -> @@ -116,11 +116,11 @@ handle_cast(shutdown, State) -> handle_cast({limit, PrefetchCount}, State) -> {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})}; -handle_cast({ack, Count}, State = #lim{in_use = InUse}) -> - NewInUse = if InUse == 0 -> 0; - true -> InUse - Count - end, - {noreply, maybe_notify(State, State#lim{in_use = NewInUse})}; +handle_cast({ack, Count}, State = #lim{volume = Volume}) -> + NewVolume = if Volume == 0 -> 0; + true -> Volume - Count + end, + {noreply, maybe_notify(State, State#lim{volume = NewVolume})}; handle_cast({register, QPid}, State) -> {noreply, remember_queue(QPid, State)}; @@ -147,8 +147,8 @@ maybe_notify(OldState, NewState) -> false -> NewState end. -limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) -> - Limit =/= 0 andalso InUse >= Limit. +limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> + Limit =/= 0 andalso Volume >= Limit. remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of -- cgit v1.2.1 From 6d7792714815e494cb63aec8ee86894e12f5e4d7 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 26 Dec 2008 09:09:25 +0000 Subject: tidying up, refactoring and some cosmetic changes --- src/rabbit_amqqueue.erl | 5 ++- src/rabbit_amqqueue_process.erl | 73 +++++++++++++++-------------------------- src/rabbit_channel.erl | 15 ++++----- src/rabbit_limiter.erl | 6 ++-- 4 files changed, 38 insertions(+), 61 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a345f5ab..3c8bd99e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -38,8 +38,7 @@ -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2]). --export([unblock/2]). +-export([notify_sent/2, unblock/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -97,7 +96,7 @@ -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). -spec(basic_consume/8 :: - (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) -> + (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c49b06e5..5199fb87 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -162,59 +162,42 @@ deliver_immediately(Message, Delivered, next_msg_id = NextId}) -> ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), case queue:out(RoundRobin) of - {{value, QEntry = {ChPid, - #consumer{tag = ConsumerTag, - ack_required = AckRequired = true}}}, + {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, + ack_required = AckRequired}}}, RoundRobinTail} -> - % Use Qos Limits if an ack is required - % Query the limiter to find out if a limit has been breached - C = #cr{limiter_pid = LimiterPid} = ch_record(ChPid), - case rabbit_limiter:can_send(LimiterPid, self()) of + C = #cr{limiter_pid = LimiterPid, + unsent_message_count = Count, + unacked_messages = UAM} = ch_record(ChPid), + case not(AckRequired) orelse rabbit_limiter:can_send( + LimiterPid, self()) of true -> - really_deliver(AckRequired, ChPid, ConsumerTag, - Delivered, Message, NextId, QName, - QEntry, RoundRobinTail, State); + rabbit_channel:deliver( + ChPid, ConsumerTag, AckRequired, + {QName, self(), NextId, Delivered, Message}), + NewUAM = case AckRequired of + true -> dict:store(NextId, Message, UAM); + false -> UAM + end, + NewC = C#cr{unsent_message_count = Count + 1, + unacked_messages = NewUAM}, + store_ch_record(NewC), + NewConsumers = + case ch_record_state_transition(C, NewC) of + ok -> queue:in(QEntry, RoundRobinTail); + block -> block_consumers(ChPid, RoundRobinTail) + end, + {offered, AckRequired, State#q{round_robin = NewConsumers, + next_msg_id = NextId + 1}}; false -> - % Have another go by cycling through the consumer - % queue store_ch_record(C#cr{is_limit_active = true}), NewConsumers = block_consumers(ChPid, RoundRobinTail), deliver_immediately(Message, Delivered, State#q{round_robin = NewConsumers}) end; - {{value, QEntry = {ChPid, - #consumer{tag = ConsumerTag, - ack_required = AckRequired = false}}}, - RoundRobinTail} -> - really_deliver(AckRequired, ChPid, ConsumerTag, - Delivered, Message, NextId, QName, - QEntry, RoundRobinTail, State); {empty, _} -> {not_offered, State} end. -% TODO The arity of this function seems a bit large :-( -really_deliver(AckRequired, ChPid, ConsumerTag, Delivered, Message, NextId, - QName, QEntry, RoundRobinTail, State) -> - rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, Delivered, Message}), - C = #cr{unsent_message_count = Count, - unacked_messages = UAM} = ch_record(ChPid), - NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); - false -> UAM - end, - NewC = C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}, - store_ch_record(NewC), - NewConsumers = - case ch_record_state_transition(C, NewC) of - ok -> queue:in(QEntry, RoundRobinTail); - block -> block_consumers(ChPid, RoundRobinTail) - end, - {offered, AckRequired, State#q{round_robin = NewConsumers, - next_msg_id = NextId +1}}. - attempt_delivery(none, Message, State) -> case deliver_immediately(Message, false, State) of {offered, false, State1} -> @@ -631,9 +614,8 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ok -> C = #cr{consumers = Consumers} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, - C1 = C#cr{consumers = [Consumer | Consumers], - limiter_pid = LimiterPid}, - store_ch_record(C1), + store_ch_record(C#cr{consumers = [Consumer | Consumers], + limiter_pid = LimiterPid}), if Consumers == [] -> ok = rabbit_limiter:register(LimiterPid, self()); true -> @@ -662,8 +644,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, NewConsumers = lists:filter (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, Consumers), - C1 = C#cr{consumers = NewConsumers}, - store_ch_record(C1), + store_ch_record(C#cr{consumers = NewConsumers}), if NewConsumers == [] -> ok = rabbit_limiter:unregister(LimiterPid, self()); true -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a4bfacbb..304275c4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -427,15 +427,12 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, end end; -handle_method(#'basic.qos'{global = Flag = true}, _, _State) -> - rabbit_misc:protocol_error(not_implemented, - "Global flag (~s) for basic.qos not implementented", [Flag]); +handle_method(#'basic.qos'{global = true}, _, _State) -> + rabbit_misc:protocol_error(not_implemented, "global=true", []); -handle_method(#'basic.qos'{prefetch_size = Size}, - _, _State) when Size /= 0 -> +handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> rabbit_misc:protocol_error(not_implemented, - "Pre-fetch size (~s) for basic.qos not implementented", - [Size]); + "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, State = #ch{ limiter_pid = LimiterPid, @@ -859,8 +856,8 @@ consumer_queues(Consumers) -> end]. %% tell the limiter about the number of acks that have been received -%% for messages delivered to subscribed consumers, rather than those -%% for messages sent in a response to a basic.get +%% for messages delivered to subscribed consumers, but not acks for +%% messages sent in a response to a basic.get. notify_limiter(undefined, _Acked) -> ok; notify_limiter(LimiterPid, Acked) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 62c6c73c..3776edd0 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -179,9 +179,9 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> case length(QList) of 0 -> ok; L -> - %% We randomly vary the position in which each queue - %% appears in the list, thus ensuring that each queue has - %% an equal chance of being notified first. + %% We randomly vary the position of queues in the list, + %% thus ensuring that each queue has an equal chance of + %% being notified first. {L1, L2} = lists:split(random:uniform(L), QList), [ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L2 ++ L1], ok -- cgit v1.2.1 From 1d296abdabc9f8f2a677a02859b02e40b345c457 Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Tue, 6 Jan 2009 12:34:17 +0100 Subject: RPM %files section only lists files that are not under /usr/lib/erlang/lib/rabbitmq-server-$VERSION$, /usr/lib/rabbitmq and /etc. The first two are handled by only listing the directories and rpms checks them recursively. --- packaging/RPMS/Fedora/rabbitmq-server.spec | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 13cfb037..aac28003 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -31,10 +31,10 @@ scalable implementation of an AMQP broker. %define _defaultlibdir /usr/lib %endif -%define _erllibdir %{_defaultlibdir}/erlang/lib -%define _rabbitbindir %{_defaultlibdir}/rabbitmq/bin +%define _rabbit_erllibdir %{_defaultlibdir}/erlang/lib/rabbitmq_server-%{version} +%define _rabbit_libdir %{_defaultlibdir}/rabbitmq -%define _maindir %{buildroot}%{_erllibdir}/rabbitmq_server-%{version} +%define _maindir %{buildroot}%{_rabbit_erllibdir} %pre if [ $1 -gt 1 ]; then @@ -53,7 +53,7 @@ make rm -rf %{buildroot} make install TARGET_DIR=%{_maindir} \ - SBIN_DIR=%{buildroot}%{_rabbitbindir} \ + SBIN_DIR=%{buildroot}%{_rabbit_libdir}/bin \ MAN_DIR=%{buildroot}%{_mandir} mkdir -p %{buildroot}/var/lib/rabbitmq/mnesia @@ -81,8 +81,10 @@ rm %{_maindir}/LICENSE %{_maindir}/LICENSE-MPL-RabbitMQ %{_maindir}/INSTALL #Build the list of files rm -f %{_builddir}/filelist.%{name}.rpm echo '%defattr(-,root,root, -)' >> %{_builddir}/filelist.%{name}.rpm -(cd %{buildroot}; find . ! -regex '\./etc.*' \ - -type f | sed -e 's/^\.//' >> %{_builddir}/filelist.%{name}.rpm) +(cd %{buildroot}; \ + find . -type f ! -regex '\./etc.*' \ + ! -regex '\.\(%{_rabbit_erllibdir}\|%{_rabbit_libdir}\).*' \ + | sed -e 's/^\.//' >> %{_builddir}/filelist.%{name}.rpm) %post # create rabbitmq group @@ -116,6 +118,8 @@ fi %defattr(-,root,root,-) %dir /var/lib/rabbitmq %dir /var/log/rabbitmq +%{_rabbit_erllibdir} +%{_rabbit_libdir} /etc/rc.d/init.d/rabbitmq-server %config(noreplace) /etc/logrotate.d/rabbitmq-server %doc LICENSE LICENSE-MPL-RabbitMQ INSTALL -- cgit v1.2.1 From c75ad65a8a6ea345fd3ccdeae48490ed8ebfff03 Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Tue, 6 Jan 2009 13:28:03 +0100 Subject: Define standard libdir macro in the makefile instead of overwriting it directly in the spec file for RPMS. This is to fix the wrong libdir macro under debian for x86_64 architectures. --- packaging/RPMS/Fedora/Makefile | 4 ++-- packaging/RPMS/Fedora/rabbitmq-server.spec | 10 ++-------- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index c05f14a7..aa8c93c6 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -20,8 +20,8 @@ prepare: server: prepare rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --target i386 - rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --define '_arch x86_64' \ - --define '_defaultdocdir /usr/share/doc' --target x86_64 + rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --define '_libdir /usr/lib64' \ + --define '_arch x86_64' --define '_defaultdocdir /usr/share/doc' --target x86_64 clean: rm -rf SOURCES SPECS RPMS SRPMS BUILD tmp diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 13cfb037..36ad864b 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -25,14 +25,8 @@ RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and scalable implementation of an AMQP broker. -%ifarch x86_64 - %define _defaultlibdir /usr/lib64 -%else - %define _defaultlibdir /usr/lib -%endif - -%define _erllibdir %{_defaultlibdir}/erlang/lib -%define _rabbitbindir %{_defaultlibdir}/rabbitmq/bin +%define _erllibdir %{_libdir}/erlang/lib +%define _rabbitbindir %{_libdir}/rabbitmq/bin %define _maindir %{buildroot}%{_erllibdir}/rabbitmq_server-%{version} -- cgit v1.2.1 From 513a626ceeb9b139b137bb0b5b9d859890c057ea Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Tue, 6 Jan 2009 13:33:04 +0100 Subject: Fix spaces --- packaging/RPMS/Fedora/rabbitmq-server.spec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index aac28003..5efff331 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -83,7 +83,7 @@ rm -f %{_builddir}/filelist.%{name}.rpm echo '%defattr(-,root,root, -)' >> %{_builddir}/filelist.%{name}.rpm (cd %{buildroot}; \ find . -type f ! -regex '\./etc.*' \ - ! -regex '\.\(%{_rabbit_erllibdir}\|%{_rabbit_libdir}\).*' \ + ! -regex '\.\(%{_rabbit_erllibdir}\|%{_rabbit_libdir}\).*' \ | sed -e 's/^\.//' >> %{_builddir}/filelist.%{name}.rpm) %post -- cgit v1.2.1 From ca08e04356d218077d495562c42eaa242fcf06ff Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Tue, 6 Jan 2009 14:42:08 +0100 Subject: Use default macros in sed instead of using hardcoded path. --- packaging/RPMS/Fedora/rabbitmq-server.spec | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 36ad864b..d5961258 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -58,13 +58,13 @@ mkdir -p %{buildroot}/etc/rc.d/init.d/ install -m 0755 %SOURCE1 %{buildroot}/etc/rc.d/init.d/rabbitmq-server chmod 0755 %{buildroot}/etc/rc.d/init.d/rabbitmq-server %ifarch x86_64 - sed -i 's/\/usr\/lib\//\/usr\/lib64\//' %{buildroot}/etc/rc.d/init.d/rabbitmq-server + sed -i 's|/usr/lib/|%{_libdir}/|' %{buildroot}/etc/rc.d/init.d/rabbitmq-server %endif mkdir -p %{buildroot}%{_sbindir} install -m 0755 %SOURCE2 %{buildroot}%{_sbindir}/rabbitmqctl %ifarch x86_64 - sed -i 's/\/usr\/lib\//\/usr\/lib64\//' %{buildroot}%{_sbindir}/rabbitmqctl + sed -i 's|/usr/lib/|%{_libdir}/|' %{buildroot}%{_sbindir}/rabbitmqctl %endif mkdir -p %{buildroot}/etc/logrotate.d -- cgit v1.2.1 From 3518362dc686291c3bff99163e77fbf3638067c9 Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Tue, 6 Jan 2009 17:11:17 +0100 Subject: Generalize the rpm spec so that some values are substituted by 'sed' in the makefile. Default rpm release is for fedora. The only difference between suse and fedora are the dependencies (used files instead of packages in the former), location of init.d and final rpm name. --- packaging/RPMS/Fedora/Makefile | 25 +++++++++++++++++++++---- packaging/RPMS/Fedora/init.d | 3 --- packaging/RPMS/Fedora/rabbitmq-server.spec | 16 ++++++++-------- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index aa8c93c6..cf3a93df 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -6,22 +6,39 @@ TOP_DIR=$(shell pwd) #only checks build-dependencies using rpms, not debs DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'debian 1' +ifndef RPM_OS +RPM_OS=fedora +endif + +ifeq "x$(RPM_OS)" "xsuse" +REQUIRES=/sbin/chkconfig /sbin/service +OS_DEFINES=--define '_initrddir /etc/init.d' +RELEASE_OS=.suse +else +REQUIRES=chkconfig initscripts +OS_DEFINES=--define '_initrddir /etc/rc.d/init.d' +RELEASE_OS= +endif + rpms: clean server prepare: mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp cp $(TOP_DIR)/$(TARBALL) SOURCES cp rabbitmq-server.spec SPECS - sed -i 's/%%VERSION%%/$(VERSION)/' SPECS/rabbitmq-server.spec + sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|;s|%%RELEASE_OS%%|$(RELEASE_OS)|' \ + SPECS/rabbitmq-server.spec cp init.d SOURCES/rabbitmq-server.init cp rabbitmqctl_wrapper SOURCES/rabbitmq-server.wrapper cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate server: prepare - rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --target i386 - rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --define '_libdir /usr/lib64' \ - --define '_arch x86_64' --define '_defaultdocdir /usr/share/doc' --target x86_64 + rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \ + --target i386 + rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \ + --define '_libdir /usr/lib64' --define '_arch x86_64' \ + --define '_defaultdocdir /usr/share/doc' --target x86_64 clean: rm -rf SOURCES SPECS RPMS SRPMS BUILD tmp diff --git a/packaging/RPMS/Fedora/init.d b/packaging/RPMS/Fedora/init.d index 27f150f9..ffcd11ac 100644 --- a/packaging/RPMS/Fedora/init.d +++ b/packaging/RPMS/Fedora/init.d @@ -29,9 +29,6 @@ LOCK_FILE=/var/lock/subsys/$NAME test -x $DAEMON || exit 0 -# source function library -. /etc/rc.d/init.d/functions - # Include rabbitmq defaults if available if [ -f /etc/default/rabbitmq ] ; then . /etc/default/rabbitmq diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index d5961258..460ee286 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -1,6 +1,6 @@ Name: rabbitmq-server Version: %%VERSION%% -Release: 1 +Release: 1%%RELEASE_OS%% License: MPLv1.1 Group: Development/Libraries Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.gz @@ -17,8 +17,8 @@ Requires: erlang, logrotate Packager: Hubert Plociniczak BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root Summary: The RabbitMQ server -Requires(post): chkconfig -Requires(pre): chkconfig initscripts +Requires(post): %%REQUIRES%% +Requires(pre): %%REQUIRES%% %description RabbitMQ is an implementation of AMQP, the emerging standard for high @@ -52,13 +52,13 @@ make install TARGET_DIR=%{_maindir} \ mkdir -p %{buildroot}/var/lib/rabbitmq/mnesia mkdir -p %{buildroot}/var/log/rabbitmq -mkdir -p %{buildroot}/etc/rc.d/init.d/ +mkdir -p %{buildroot}%{_initrddir} #Copy all necessary lib files etc. -install -m 0755 %SOURCE1 %{buildroot}/etc/rc.d/init.d/rabbitmq-server -chmod 0755 %{buildroot}/etc/rc.d/init.d/rabbitmq-server +install -m 0755 %SOURCE1 %{buildroot}/%{_initrddir}/rabbitmq-server +chmod 0755 %{buildroot}%{_initrddir}/rabbitmq-server %ifarch x86_64 - sed -i 's|/usr/lib/|%{_libdir}/|' %{buildroot}/etc/rc.d/init.d/rabbitmq-server + sed -i 's|/usr/lib/|%{_libdir}/|' %{buildroot}%{_initrddir}/rabbitmq-server %endif mkdir -p %{buildroot}%{_sbindir} @@ -110,7 +110,7 @@ fi %defattr(-,root,root,-) %dir /var/lib/rabbitmq %dir /var/log/rabbitmq -/etc/rc.d/init.d/rabbitmq-server +%{_initrddir}/rabbitmq-server %config(noreplace) /etc/logrotate.d/rabbitmq-server %doc LICENSE LICENSE-MPL-RabbitMQ INSTALL -- cgit v1.2.1 From a9007563d0ae9753b58f3ed06bedebd58f52df8e Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Tue, 6 Jan 2009 17:48:02 +0100 Subject: Removed arch conditional on sed. --- packaging/RPMS/Fedora/rabbitmq-server.spec | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index d5961258..62eee7dc 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -57,15 +57,11 @@ mkdir -p %{buildroot}/etc/rc.d/init.d/ #Copy all necessary lib files etc. install -m 0755 %SOURCE1 %{buildroot}/etc/rc.d/init.d/rabbitmq-server chmod 0755 %{buildroot}/etc/rc.d/init.d/rabbitmq-server -%ifarch x86_64 - sed -i 's|/usr/lib/|%{_libdir}/|' %{buildroot}/etc/rc.d/init.d/rabbitmq-server -%endif +sed -i 's|/usr/lib/|%{_libdir}/|' %{buildroot}/etc/rc.d/init.d/rabbitmq-server mkdir -p %{buildroot}%{_sbindir} install -m 0755 %SOURCE2 %{buildroot}%{_sbindir}/rabbitmqctl -%ifarch x86_64 - sed -i 's|/usr/lib/|%{_libdir}/|' %{buildroot}%{_sbindir}/rabbitmqctl -%endif +sed -i 's|/usr/lib/|%{_libdir}/|' %{buildroot}%{_sbindir}/rabbitmqctl mkdir -p %{buildroot}/etc/logrotate.d install -m 0644 %SOURCE3 %{buildroot}/etc/logrotate.d/rabbitmq-server -- cgit v1.2.1 From ff6707dee89f97b03e36810b46253a9442322659 Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Tue, 6 Jan 2009 18:27:42 +0100 Subject: Typo --- packaging/RPMS/Fedora/rabbitmq-server.spec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 460ee286..4e9c61df 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -55,7 +55,7 @@ mkdir -p %{buildroot}/var/log/rabbitmq mkdir -p %{buildroot}%{_initrddir} #Copy all necessary lib files etc. -install -m 0755 %SOURCE1 %{buildroot}/%{_initrddir}/rabbitmq-server +install -m 0755 %SOURCE1 %{buildroot}%{_initrddir}/rabbitmq-server chmod 0755 %{buildroot}%{_initrddir}/rabbitmq-server %ifarch x86_64 sed -i 's|/usr/lib/|%{_libdir}/|' %{buildroot}%{_initrddir}/rabbitmq-server -- cgit v1.2.1 From f8777b9d1fd08ff4107875d1005eb458398cb764 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 8 Jan 2009 15:45:04 +0000 Subject: turn queue processes into custom gen_servers to avoid long message queues that impact the performance of selective receive --- src/rabbit_amqqueue_process.erl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 709e355e..ee012dee 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --behaviour(gen_server). +-behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). -define(HIBERNATE_AFTER, 1000). @@ -85,7 +85,7 @@ %%---------------------------------------------------------------------------- start_link(Q) -> - gen_server:start_link(?MODULE, Q, []). + gen_server2:start_link(?MODULE, Q, []). %%---------------------------------------------------------------------------- @@ -551,14 +551,14 @@ handle_call({deliver, Txn, Message}, _From, State) -> handle_call({commit, Txn}, From, State) -> ok = commit_work(Txn, qname(State)), %% optimisation: we reply straight away so the sender can continue - gen_server:reply(From, ok), + gen_server2:reply(From, ok), NewState = process_pending(Txn, State), erase_tx(Txn), noreply(NewState); handle_call({notify_down, ChPid}, From, State) -> %% optimisation: we reply straight away so the sender can continue - gen_server:reply(From, ok), + gen_server2:reply(From, ok), handle_ch_down(ChPid, State); handle_call({basic_get, ChPid, NoAck}, _From, @@ -757,7 +757,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State) -> %% TODO: Once we drop support for R11B-5, we can change this to %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server, enter_loop, [?MODULE, [], State]); + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]); handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), -- cgit v1.2.1 From 1057239e68bfde5355b084c7848c8045fedcd9a3 Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Fri, 9 Jan 2009 01:28:24 +0100 Subject: Use --gecos option in adduser to add user's comments field, instead of running usermod. --- packaging/debs/Debian/debian/postinst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst index 495b8331..93d1c096 100644 --- a/packaging/debs/Debian/debian/postinst +++ b/packaging/debs/Debian/debian/postinst @@ -25,8 +25,8 @@ fi # create rabbitmq user if ! getent passwd rabbitmq >/dev/null; then - adduser --system --ingroup rabbitmq --home /var/lib/rabbitmq --no-create-home rabbitmq - usermod -c "RabbitMQ messaging server" rabbitmq + adduser --system --ingroup rabbitmq --home /var/lib/rabbitmq \ + --no-create-home rabbitmq --gecos "RabbitMQ messaging server" fi chown -R rabbitmq:rabbitmq /var/lib/rabbitmq -- cgit v1.2.1 From 064fcf8211db27cb2fff152c9a7eb9d91b95e3c4 Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Fri, 9 Jan 2009 03:01:14 +0100 Subject: Removed PATH variable from init scripts. --- packaging/RPMS/Fedora/init.d | 1 - packaging/debs/Debian/debian/init.d | 1 - 2 files changed, 2 deletions(-) diff --git a/packaging/RPMS/Fedora/init.d b/packaging/RPMS/Fedora/init.d index ffcd11ac..a006a5a7 100644 --- a/packaging/RPMS/Fedora/init.d +++ b/packaging/RPMS/Fedora/init.d @@ -16,7 +16,6 @@ # Short-Description: Enable AMQP service provided by RabbitMQ broker ### END INIT INFO -PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin DAEMON_NAME=rabbitmq-multi DAEMON=/usr/lib/rabbitmq/bin/$DAEMON_NAME NAME=rabbitmq-server diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d index ace474c5..70dd0adf 100644 --- a/packaging/debs/Debian/debian/init.d +++ b/packaging/debs/Debian/debian/init.d @@ -9,7 +9,6 @@ # Short-Description: Enable AMQP service provided by RabbitMQ broker ### END INIT INFO -PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin DAEMON=/usr/lib/rabbitmq/bin/rabbitmq-multi NAME=rabbitmq-server DESC=rabbitmq-server -- cgit v1.2.1 From 1ab002d33e0e05312d94651faaa259c576ec73c5 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 9 Jan 2009 15:12:18 +0000 Subject: the #amqqueue.pid is 'none', so return self() instead --- src/rabbit_amqqueue_process.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 709e355e..6282a8fb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -488,7 +488,8 @@ i(name, #q{q = #amqqueue{name = Name}}) -> Name; i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable; i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete; i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments; -i(pid, #q{q = #amqqueue{pid = Pid}}) -> Pid; +i(pid, _) -> + self(); i(messages_ready, #q{message_buffer = MessageBuffer}) -> queue:len(MessageBuffer); i(messages_unacknowledged, _) -> -- cgit v1.2.1 From 081bd947d4731f3f9c8cebbf618a4d3efba482aa Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Fri, 9 Jan 2009 17:41:44 +0100 Subject: Moved user name to the end of the command. --- packaging/debs/Debian/debian/postinst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst index 93d1c096..05fb179c 100644 --- a/packaging/debs/Debian/debian/postinst +++ b/packaging/debs/Debian/debian/postinst @@ -26,7 +26,7 @@ fi # create rabbitmq user if ! getent passwd rabbitmq >/dev/null; then adduser --system --ingroup rabbitmq --home /var/lib/rabbitmq \ - --no-create-home rabbitmq --gecos "RabbitMQ messaging server" + --no-create-home --gecos "RabbitMQ messaging server" rabbitmq fi chown -R rabbitmq:rabbitmq /var/lib/rabbitmq -- cgit v1.2.1 From e2580259eac0070cde345050b3bab74bba63822c Mon Sep 17 00:00:00 2001 From: Hubert Plociniczak Date: Fri, 9 Jan 2009 19:06:57 +0100 Subject: Output file is given as an argument, do not use stdout. --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index b441fcab..84bb3dfe 100644 --- a/Makefile +++ b/Makefile @@ -44,10 +44,10 @@ $(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCL # ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) $< $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) - $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) > $@ + $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@ $(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) - $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) > $@ + $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@ $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script: $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.rel $(TARGETS) erl -noshell -eval 'systools:make_script("ebin/rabbit", [{path, ["ebin"]}]), halt().' -- cgit v1.2.1 From c9e9c8e947b36433460aa4969cf8f51e81309302 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Sun, 11 Jan 2009 22:02:52 +0000 Subject: Report node instead of pid in rabbitmqctl --- src/rabbit_control.erl | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index ecc285a5..419d71d9 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -138,7 +138,7 @@ The list_queues, list_exchanges and list_bindings commands accept an optional virtual host parameter for which to display results. The default value is \"/\". must be a member of the list [name, durable, auto_delete, -arguments, pid, messages_ready, messages_unacknowledged, messages_uncommitted, +arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted, messages, acks_uncommitted, consumers, transactions, memory]. The default is to display name and (number of) messages. @@ -148,7 +148,7 @@ auto_delete, arguments]. The default is to display name and type. The output format for \"list_bindings\" is a list of rows containing exchange name, routing key, queue name and arguments, in that order. - must be a member of the list [pid, address, port, + must be a member of the list [node, address, port, peer_address, peer_port, state, channels, user, vhost, timeout, frame_max, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display user, peer_address and peer_port. @@ -242,7 +242,8 @@ action(list_vhost_users, Node, Args = [_VHostPath], Inform) -> action(list_queues, Node, Args, Inform) -> Inform("Listing queues", []), {VHostArg, RemainingArgs} = parse_vhost_flag(Args), - ArgAtoms = default_if_empty(RemainingArgs, [name, messages]), + ArgAtoms = list_replace(node, pid, + default_if_empty(RemainingArgs, [name, messages])), display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, [VHostArg, ArgAtoms]), ArgAtoms); @@ -267,7 +268,8 @@ action(list_bindings, Node, Args, Inform) -> action(list_connections, Node, Args, Inform) -> Inform("Listing connections", []), - ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port]), + ArgAtoms = list_replace(node, pid, + default_if_empty(Args, [user, peer_address, peer_port])), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), ArgAtoms). @@ -311,6 +313,8 @@ format_info_item(Items, Key) -> {Key, IpAddress} when Key =:= address; Key =:= peer_address andalso is_tuple(IpAddress) -> inet_parse:ntoa(IpAddress); + {pid, _} -> + atom_to_list(node(Value)); _ when is_binary(Value) -> url_encode(Value); _ -> @@ -357,3 +361,6 @@ url_encode_char([], Acc) -> d2h(N) when N<10 -> N+$0; d2h(N) -> N+$a-10. +list_replace(Find, Replace, List) -> + [case X of Find -> Replace; _ -> X end || X <- List]. + -- cgit v1.2.1 From d809b57f7db18757ae0c8a3e3a780bfa40c3bf0c Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Sun, 11 Jan 2009 22:24:33 +0000 Subject: Better dispatching while formatting results in rabbitmqctl --- src/rabbit_control.erl | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 419d71d9..1e07f6d2 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -310,10 +310,9 @@ format_info_item(Items, Key) -> case Info of {_, #resource{name = Name}} -> url_encode(Name); - {Key, IpAddress} when Key =:= address; Key =:= peer_address - andalso is_tuple(IpAddress) -> - inet_parse:ntoa(IpAddress); - {pid, _} -> + _ when Key =:= address; Key =:= peer_address andalso is_tuple(Value) -> + inet_parse:ntoa(Value); + _ when is_pid(Value) -> atom_to_list(node(Value)); _ when is_binary(Value) -> url_encode(Value); -- cgit v1.2.1 From e5a9c80432e749a36bd5d20c63fa195ec5747050 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 13 Jan 2009 17:13:24 +0000 Subject: use halt() instead of init:stop() it's much faster and perfectly safe in these self-contained programs --- src/rabbit_control.erl | 2 +- src/rabbit_multi.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 1e07f6d2..cbc11b40 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -57,7 +57,7 @@ start() -> true -> ok; false -> io:format("...done.~n") end, - init:stop(); + halt(); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> error("invalid command '~s'", [lists:flatten( diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 7f6eaa8e..5e8edd53 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -50,7 +50,7 @@ start() -> case catch action(Command, Args, RpcTimeout) of ok -> io:format("done.~n"), - init:stop(); + halt(); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> error("invalid command '~s'", [lists:flatten( -- cgit v1.2.1 From b5f4ceb5b42314c3d71710006f57e34fb512abf2 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 13 Jan 2009 19:00:50 +0000 Subject: auto-generate rabbit.app module list --- Makefile | 10 +++++++--- ebin/rabbit.app | 57 ------------------------------------------------------ ebin/rabbit_app.in | 20 +++++++++++++++++++ generate_app | 10 ++++++++++ 4 files changed, 37 insertions(+), 60 deletions(-) delete mode 100644 ebin/rabbit.app create mode 100644 ebin/rabbit_app.in create mode 100644 generate_app diff --git a/Makefile b/Makefile index 84bb3dfe..e75f2d28 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,8 @@ SOURCE_DIR=src EBIN_DIR=ebin INCLUDE_DIR=include SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) -TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES)) +BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES)) +TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS) WEB_URL=http://stage.rabbitmq.com/ MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod)) @@ -39,6 +40,9 @@ ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e #all: $(EBIN_DIR)/rabbit.boot all: $(TARGETS) +$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) + escript generate_app $(EBIN_DIR) < $< > $@ + $(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl erlc $(ERLC_OPTS) $< # ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) $< @@ -57,7 +61,7 @@ dialyze: $(TARGETS) clean: cleandb rm -f $(EBIN_DIR)/*.beam - rm -f $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script + rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc rm -f docs/*.[0-9].gz @@ -123,7 +127,7 @@ srcdist: distclean cp BUILD.in $(TARGET_SRC_DIR)/BUILD elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \ >> $(TARGET_SRC_DIR)/BUILD - sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit.app + sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ cp codegen.py Makefile $(TARGET_SRC_DIR) diff --git a/ebin/rabbit.app b/ebin/rabbit.app deleted file mode 100644 index 5ecd247b..00000000 --- a/ebin/rabbit.app +++ /dev/null @@ -1,57 +0,0 @@ -{application, rabbit, %% -*- erlang -*- - [{description, "RabbitMQ"}, - {id, "RabbitMQ"}, - {vsn, "%%VERSION%%"}, - {modules, [buffering_proxy, - rabbit_access_control, - rabbit_alarm, - rabbit_amqqueue, - rabbit_amqqueue_process, - rabbit_amqqueue_sup, - rabbit_binary_generator, - rabbit_binary_parser, - rabbit_channel, - rabbit_control, - rabbit, - rabbit_error_logger, - rabbit_error_logger_file_h, - rabbit_exchange, - rabbit_framing_channel, - rabbit_framing, - rabbit_heartbeat, - rabbit_load, - rabbit_log, - rabbit_memsup_linux, - rabbit_misc, - rabbit_mnesia, - rabbit_multi, - rabbit_networking, - rabbit_node_monitor, - rabbit_persister, - rabbit_reader, - rabbit_router, - rabbit_sasl_report_file_h, - rabbit_sup, - rabbit_tests, - rabbit_tracer, - rabbit_writer, - tcp_acceptor, - tcp_acceptor_sup, - tcp_client_sup, - tcp_listener, - tcp_listener_sup]}, - {registered, [rabbit_amqqueue_sup, - rabbit_log, - rabbit_node_monitor, - rabbit_persister, - rabbit_router, - rabbit_sup, - rabbit_tcp_client_sup]}, - {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, - {mod, {rabbit, []}}, - {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, - {extra_startup_steps, []}, - {default_user, <<"guest">>}, - {default_pass, <<"guest">>}, - {default_vhost, <<"/">>}, - {memory_alarms, auto}]}]}. diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in new file mode 100644 index 00000000..e2f36c0f --- /dev/null +++ b/ebin/rabbit_app.in @@ -0,0 +1,20 @@ +{application, rabbit, %% -*- erlang -*- + [{description, "RabbitMQ"}, + {id, "RabbitMQ"}, + {vsn, "%%VERSION%%"}, + {modules, []}, + {registered, [rabbit_amqqueue_sup, + rabbit_log, + rabbit_node_monitor, + rabbit_persister, + rabbit_router, + rabbit_sup, + rabbit_tcp_client_sup]}, + {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, + {mod, {rabbit, []}}, + {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, + {extra_startup_steps, []}, + {default_user, <<"guest">>}, + {default_pass, <<"guest">>}, + {default_vhost, <<"/">>}, + {memory_alarms, auto}]}]}. diff --git a/generate_app b/generate_app new file mode 100644 index 00000000..1d75e83c --- /dev/null +++ b/generate_app @@ -0,0 +1,10 @@ +#!/usr/bin/env escript +%% -*- erlang -*- + +main([BeamDir]) -> + Modules = [list_to_atom(filename:basename(F, ".beam")) || + F <- filelib:wildcard("*.beam", BeamDir)], + {ok, {application, Application, Properties}} = io:read(""), + NewProperties = lists:keyreplace(modules, 1, Properties, + {modules, Modules}), + io:format("~p", [{application, Application, NewProperties}]). -- cgit v1.2.1 From 0b61669747325219a672c43bd182f7e539341118 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 13 Jan 2009 19:46:18 +0000 Subject: oops --- generate_app | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/generate_app b/generate_app index 1d75e83c..bb6f7516 100644 --- a/generate_app +++ b/generate_app @@ -7,4 +7,4 @@ main([BeamDir]) -> {ok, {application, Application, Properties}} = io:read(""), NewProperties = lists:keyreplace(modules, 1, Properties, {modules, Modules}), - io:format("~p", [{application, Application, NewProperties}]). + io:format("~p.", [{application, Application, NewProperties}]). -- cgit v1.2.1 From 3111476762c0206c18d77ffc56745fcdeb5da26b Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 15 Jan 2009 14:13:25 +1300 Subject: Add "amq.match" builtin exchange; accept "headers" in exchange.declare; and stub out routing implementation for headers exchanges. --- src/rabbit_access_control.erl | 1 + src/rabbit_exchange.erl | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index b73090fc..e2d96c71 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -186,6 +186,7 @@ add_vhost(VHostPath) -> [{<<"">>, direct}, {<<"amq.direct">>, direct}, {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, {<<"amq.fanout">>, fanout}]], ok; [_] -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 925c335c..03478a4d 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -145,6 +145,8 @@ check_type(<<"direct">>) -> direct; check_type(<<"topic">>) -> topic; +check_type(<<"headers">>) -> + headers; check_type(T) -> rabbit_misc:protocol_error( command_invalid, "invalid exchange type '~s'", [T]). @@ -252,6 +254,9 @@ route(#exchange{name = Name, type = topic}, RoutingKey) -> topic_matches(BindingKey, RoutingKey)] end); +route(X = #exchange{type = headers}, _RoutingKey) -> + exit(headers_unimplemented); + route(X = #exchange{type = fanout}, _) -> route_internal(X, '_'); -- cgit v1.2.1 From 28333003d191d6a487f31704004fe671a9cf1813 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 15 Jan 2009 14:23:29 +1300 Subject: Depend on generate_app when deciding whether to rebuild rabbit.app. --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index e75f2d28..cc070d9f 100644 --- a/Makefile +++ b/Makefile @@ -40,7 +40,7 @@ ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e #all: $(EBIN_DIR)/rabbit.boot all: $(TARGETS) -$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) +$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app escript generate_app $(EBIN_DIR) < $< > $@ $(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl -- cgit v1.2.1 From 6727fa1875d711c41abaa68b27663db4a2b59ce5 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 15 Jan 2009 14:23:59 +1300 Subject: Use io:read('') instead of io:read("") to avoid a "prompt" of "[]" ending up in the output. --- generate_app | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/generate_app b/generate_app index bb6f7516..62301292 100644 --- a/generate_app +++ b/generate_app @@ -4,7 +4,7 @@ main([BeamDir]) -> Modules = [list_to_atom(filename:basename(F, ".beam")) || F <- filelib:wildcard("*.beam", BeamDir)], - {ok, {application, Application, Properties}} = io:read(""), + {ok, {application, Application, Properties}} = io:read(''), NewProperties = lists:keyreplace(modules, 1, Properties, {modules, Modules}), io:format("~p.", [{application, Application, NewProperties}]). -- cgit v1.2.1 From 100a9502fac186848d47cf51c11cbb4a72d63cd5 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 15 Jan 2009 16:13:58 +1300 Subject: Implement routing for headers exchange. --- src/rabbit_channel.erl | 2 +- src/rabbit_exchange.erl | 111 +++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 101 insertions(+), 12 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ca2782c7..cbdc9e48 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -273,7 +273,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, content = DecodedContent, persistent_key = PersistentKey}, - rabbit_exchange:route(Exchange, RoutingKey), State)}; + rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 03478a4d..a4e6d219 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -37,11 +37,11 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, simple_publish/6, simple_publish/3, - route/2]). + route/3]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_bindings_for_queue/1]). --export([check_type/1, assert_type/2, topic_matches/2]). +-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). %% EXTENDED API -export([list_exchange_bindings/1]). @@ -77,7 +77,7 @@ (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> publish_res()). -spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). --spec(route/2 :: (exchange(), routing_key()) -> [pid()]). +-spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]). -spec(add_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). @@ -88,6 +88,7 @@ [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). -spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok'). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). +-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> @@ -213,15 +214,19 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, %% Usable by Erlang code that wants to publish messages. simple_publish(Mandatory, Immediate, Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey}) -> + routing_key = RoutingKey, + content = Content}) -> case lookup(ExchangeName) of {ok, Exchange} -> - QPids = route(Exchange, RoutingKey), + QPids = route(Exchange, RoutingKey, Content), rabbit_router:deliver(QPids, Mandatory, Immediate, none, Message); {error, Error} -> {error, Error} end. +sort_arguments(Arguments) -> + lists:keysort(1, Arguments). + %% return the list of qpids to which a message with a given routing %% key, sent to a particular exchange, should be delivered. %% @@ -230,7 +235,7 @@ simple_publish(Mandatory, Immediate, %% current exchange types that is at most once. %% %% TODO: Maybe this should be handled by a cursor instead. -route(#exchange{name = Name, type = topic}, RoutingKey) -> +route(#exchange{name = Name, type = topic}, RoutingKey, _Content) -> Query = qlc:q([QName || #route{binding = #binding{ exchange_name = ExchangeName, @@ -254,13 +259,37 @@ route(#exchange{name = Name, type = topic}, RoutingKey) -> topic_matches(BindingKey, RoutingKey)] end); -route(X = #exchange{type = headers}, _RoutingKey) -> - exit(headers_unimplemented); +route(#exchange{name = Name, type = headers}, + _RoutingKey, + #content{properties = #'P_basic'{headers = Headers0}}) -> + Headers = case Headers0 of + undefined -> []; + _ -> sort_arguments(Headers0) + end, + Query = qlc:q([QName || + #route{binding = #binding{exchange_name = ExchangeName, + queue_name = QName, + args = Spec}} + <- mnesia:table(route), + ExchangeName == Name, + headers_match(Spec, Headers)]), + lookup_qpids( + try + mnesia:async_dirty(fun qlc:e/1, [Query]) + catch exit:{aborted, {badarg, _}} -> + %% work around OTP-7025, which was fixed in R12B-1, by + %% falling back on a less efficient method + [QName || #route{binding = #binding{queue_name = QName, args = Spec}} + <- mnesia:dirty_match_object( + #route{binding = #binding{exchange_name = Name, + _ = '_'}}), + headers_match(Spec, Headers)] + end); -route(X = #exchange{type = fanout}, _) -> +route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> route_internal(X, '_'); -route(X = #exchange{type = direct}, RoutingKey) -> +route(X = #exchange{type = direct}, RoutingKey, _Content) -> route_internal(X, RoutingKey). route_internal(#exchange{name = Name}, RoutingKey) -> @@ -382,7 +411,7 @@ sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> Binding = #binding{exchange_name = ExchangeName, queue_name = QueueName, key = RoutingKey, - args = Arguments}, + args = sort_arguments(Arguments)}, ok = case Durable of true -> Fun(durable_routes, #route{binding = Binding}, write); false -> ok @@ -434,6 +463,66 @@ reverse_binding(#binding{exchange_name = Exchange, key = Key, args = Args}. +default_headers_match_kind() -> all. + +parse_x_match(<<"all">>) -> all; +parse_x_match(<<"any">>) -> any; +parse_x_match(Other) -> + rabbit_log:warning("Invalid x-match field value ~p; expected all or any", [Other]), + default_headers_match_kind(). + +%% Horrendous matching algorithm. Depends for its merge-like +%% (linear-time) behaviour on the lists:keysort (sort_arguments) that +%% route/3 and sync_binding/6 do. +%% +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% +headers_match(Pattern, Data) -> + MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of + {value, {_, longstr, MK}} -> parse_x_match(MK); + {value, {_, Type, MK}} -> + rabbit_log:warning("Invalid x-match field type ~p (value ~p); expected longstr", + [Type, MK]), + default_headers_match_kind(); + _ -> default_headers_match_kind() + end, + headers_match(Pattern, Data, true, false, MatchKind). + +headers_match([], _Data, AllMatch, _AnyMatch, all) -> + AllMatch; +headers_match([], _Data, _AllMatch, AnyMatch, any) -> + AnyMatch; +headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], D, AllMatch, AnyMatch, MatchKind) -> + headers_match(PRest, D, AllMatch, AnyMatch, MatchKind); +headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> + headers_match([], [], false, AnyMatch, MatchKind); +headers_match(P = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], AllMatch, AnyMatch, MatchKind) + when PK > DK -> + headers_match(P, DRest, AllMatch, AnyMatch, MatchKind); +headers_match([{PK, _PT, _PV} | PRest], D = [{DK, _DT, _DV} | _], _AllMatch, AnyMatch, MatchKind) + when PK < DK -> + headers_match(PRest, D, false, AnyMatch, MatchKind); +headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], AllMatch, AnyMatch, MatchKind) + when PK == DK -> + if + PT == void -> + %% It's not properly specified, but a "no value" in a + %% pattern field is supposed to mean simple presence of + %% the corresponding data field. I've interpreted that to + %% mean a type of "void" for the pattern field. + headers_match(PRest, DRest, AllMatch, true, MatchKind); + PT =/= DT -> + %% Similarly, it's not specified, but I assume that a + %% mismatched type causes a mismatched value. + headers_match(PRest, DRest, false, AnyMatch, MatchKind); + PV == DV -> + headers_match(PRest, DRest, AllMatch, true, MatchKind); + true -> + headers_match(PRest, DRest, false, AnyMatch, MatchKind) + end. + split_topic_key(Key) -> {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), KeySplit. -- cgit v1.2.1 From 99a94debffc6fc924a6fec58c92b461959ad653d Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Thu, 15 Jan 2009 16:51:58 +1300 Subject: The PDF requires amq.match. The XML requires amq.headers. Create both. --- src/rabbit_access_control.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index e2d96c71..36270efd 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -186,7 +186,8 @@ add_vhost(VHostPath) -> [{<<"">>, direct}, {<<"amq.direct">>, direct}, {<<"amq.topic">>, topic}, - {<<"amq.match">>, headers}, + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml {<<"amq.fanout">>, fanout}]], ok; [_] -> -- cgit v1.2.1 From ff5698ef609c369c4cc4a64a15ab08009b86233e Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 15 Jan 2009 13:43:06 +0000 Subject: cosmetic and minor refactoring --- src/rabbit_exchange.erl | 72 +++++++++++++++++++++++++------------------------ 1 file changed, 37 insertions(+), 35 deletions(-) diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a4e6d219..c83dd38b 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -264,13 +264,13 @@ route(#exchange{name = Name, type = headers}, #content{properties = #'P_basic'{headers = Headers0}}) -> Headers = case Headers0 of undefined -> []; - _ -> sort_arguments(Headers0) + _ -> sort_arguments(Headers0) end, Query = qlc:q([QName || - #route{binding = #binding{exchange_name = ExchangeName, - queue_name = QName, - args = Spec}} - <- mnesia:table(route), + #route{binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName, + args = Spec}} <- mnesia:table(route), ExchangeName == Name, headers_match(Spec, Headers)]), lookup_qpids( @@ -279,10 +279,11 @@ route(#exchange{name = Name, type = headers}, catch exit:{aborted, {badarg, _}} -> %% work around OTP-7025, which was fixed in R12B-1, by %% falling back on a less efficient method - [QName || #route{binding = #binding{queue_name = QName, args = Spec}} - <- mnesia:dirty_match_object( - #route{binding = #binding{exchange_name = Name, - _ = '_'}}), + [QName || #route{binding = #binding{queue_name = QName, + args = Spec}} <- + mnesia:dirty_match_object( + #route{binding = #binding{exchange_name = Name, + _ = '_'}}), headers_match(Spec, Headers)] end); @@ -468,7 +469,8 @@ default_headers_match_kind() -> all. parse_x_match(<<"all">>) -> all; parse_x_match(<<"any">>) -> any; parse_x_match(Other) -> - rabbit_log:warning("Invalid x-match field value ~p; expected all or any", [Other]), + rabbit_log:warning("Invalid x-match field value ~p; expected all or any", + [Other]), default_headers_match_kind(). %% Horrendous matching algorithm. Depends for its merge-like @@ -483,7 +485,8 @@ headers_match(Pattern, Data) -> MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of {value, {_, longstr, MK}} -> parse_x_match(MK); {value, {_, Type, MK}} -> - rabbit_log:warning("Invalid x-match field type ~p (value ~p); expected longstr", + rabbit_log:warning("Invalid x-match field type ~p " + "(value ~p); expected longstr", [Type, MK]), default_headers_match_kind(); _ -> default_headers_match_kind() @@ -494,34 +497,33 @@ headers_match([], _Data, AllMatch, _AnyMatch, all) -> AllMatch; headers_match([], _Data, _AllMatch, AnyMatch, any) -> AnyMatch; -headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], D, AllMatch, AnyMatch, MatchKind) -> - headers_match(PRest, D, AllMatch, AnyMatch, MatchKind); +headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, + AllMatch, AnyMatch, MatchKind) -> + headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> headers_match([], [], false, AnyMatch, MatchKind); -headers_match(P = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], AllMatch, AnyMatch, MatchKind) - when PK > DK -> - headers_match(P, DRest, AllMatch, AnyMatch, MatchKind); -headers_match([{PK, _PT, _PV} | PRest], D = [{DK, _DT, _DV} | _], _AllMatch, AnyMatch, MatchKind) - when PK < DK -> - headers_match(PRest, D, false, AnyMatch, MatchKind); -headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], AllMatch, AnyMatch, MatchKind) - when PK == DK -> - if - PT == void -> - %% It's not properly specified, but a "no value" in a - %% pattern field is supposed to mean simple presence of - %% the corresponding data field. I've interpreted that to - %% mean a type of "void" for the pattern field. - headers_match(PRest, DRest, AllMatch, true, MatchKind); - PT =/= DT -> +headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK > DK -> + headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); +headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], + _AllMatch, AnyMatch, MatchKind) when PK < DK -> + headers_match(PRest, Data, false, AnyMatch, MatchKind); +headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK == DK -> + {AllMatch1, AnyMatch1} = + if + %% It's not properly specified, but a "no value" in a + %% pattern field is supposed to mean simple presence of + %% the corresponding data field. I've interpreted that to + %% mean a type of "void" for the pattern field. + PT == void -> {AllMatch, true}; %% Similarly, it's not specified, but I assume that a %% mismatched type causes a mismatched value. - headers_match(PRest, DRest, false, AnyMatch, MatchKind); - PV == DV -> - headers_match(PRest, DRest, AllMatch, true, MatchKind); - true -> - headers_match(PRest, DRest, false, AnyMatch, MatchKind) - end. + PT =/= DT -> {false, AnyMatch}; + PV == DV -> {AllMatch, true}; + true -> {false, AnyMatch} + end, + headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). split_topic_key(Key) -> {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), -- cgit v1.2.1 From d4fb4e425aa33ae59b920c7793e2a3065ad0ab52 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 15 Jan 2009 14:04:03 +0000 Subject: refactoring to eliminate code duplication ...between topic and headers exchange routing --- src/rabbit_exchange.erl | 79 +++++++++++++++++++------------------------------ 1 file changed, 31 insertions(+), 48 deletions(-) diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index c83dd38b..2b5d9763 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -233,65 +233,48 @@ sort_arguments(Arguments) -> %% The function ensures that a qpid appears in the return list exactly %% as many times as a message should be delivered to it. With the %% current exchange types that is at most once. -%% -%% TODO: Maybe this should be handled by a cursor instead. route(#exchange{name = Name, type = topic}, RoutingKey, _Content) -> - Query = qlc:q([QName || - #route{binding = #binding{ - exchange_name = ExchangeName, - queue_name = QName, - key = BindingKey}} <- mnesia:table(route), - ExchangeName == Name, - %% TODO: This causes a full scan for each entry - %% with the same exchange (see bug 19336) - topic_matches(BindingKey, RoutingKey)]), - lookup_qpids( - try - mnesia:async_dirty(fun qlc:e/1, [Query]) - catch exit:{aborted, {badarg, _}} -> - %% work around OTP-7025, which was fixed in R12B-1, by - %% falling back on a less efficient method - [QName || #route{binding = #binding{queue_name = QName, - key = BindingKey}} <- - mnesia:dirty_match_object( - #route{binding = #binding{exchange_name = Name, - _ = '_'}}), - topic_matches(BindingKey, RoutingKey)] - end); + match_bindings(Name, fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end); -route(#exchange{name = Name, type = headers}, - _RoutingKey, - #content{properties = #'P_basic'{headers = Headers0}}) -> - Headers = case Headers0 of +route(#exchange{name = Name, type = headers}, _RoutingKey, Content) -> + Headers = case (Content#content.properties)#'P_basic'.headers of undefined -> []; - _ -> sort_arguments(Headers0) + H -> sort_arguments(H) end, - Query = qlc:q([QName || - #route{binding = #binding{ - exchange_name = ExchangeName, - queue_name = QName, - args = Spec}} <- mnesia:table(route), - ExchangeName == Name, - headers_match(Spec, Headers)]), + match_bindings(Name, fun (#binding{args = Spec}) -> + headers_match(Spec, Headers) + end); + +route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> + route_internal(X, '_'); + +route(X = #exchange{type = direct}, RoutingKey, _Content) -> + route_internal(X, RoutingKey). + +%% TODO: Maybe this should be handled by a cursor instead. +%% TODO: This causes a full scan for each entry with the same exchange +match_bindings(XName, Match) -> + Query = qlc:q([QName || #route{binding = Binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName}} <- + mnesia:table(route), + ExchangeName == XName, + Match(Binding)]), lookup_qpids( try mnesia:async_dirty(fun qlc:e/1, [Query]) catch exit:{aborted, {badarg, _}} -> %% work around OTP-7025, which was fixed in R12B-1, by %% falling back on a less efficient method - [QName || #route{binding = #binding{queue_name = QName, - args = Spec}} <- - mnesia:dirty_match_object( - #route{binding = #binding{exchange_name = Name, + [QName || #route{binding = Binding = #binding{ + queue_name = QName}} <- + mnesia:dirty_match_object( + #route{binding = #binding{exchange_name = XName, _ = '_'}}), - headers_match(Spec, Headers)] - end); - -route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> - route_internal(X, '_'); - -route(X = #exchange{type = direct}, RoutingKey, _Content) -> - route_internal(X, RoutingKey). + Match(Binding)] + end). route_internal(#exchange{name = Name}, RoutingKey) -> MatchHead = #route{binding = #binding{exchange_name = Name, -- cgit v1.2.1 From a59216a046c0495b79e067e8fa0ed638d3f29f3d Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 15 Jan 2009 14:14:12 +0000 Subject: minor refactoring: more consistent signatures/matching in routing functions --- src/rabbit_exchange.erl | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 2b5d9763..960e4945 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -233,34 +233,34 @@ sort_arguments(Arguments) -> %% The function ensures that a qpid appears in the return list exactly %% as many times as a message should be delivered to it. With the %% current exchange types that is at most once. -route(#exchange{name = Name, type = topic}, RoutingKey, _Content) -> - match_bindings(Name, fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end); +route(X = #exchange{type = topic}, RoutingKey, _Content) -> + match_bindings(X, fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end); -route(#exchange{name = Name, type = headers}, _RoutingKey, Content) -> +route(X = #exchange{type = headers}, _RoutingKey, Content) -> Headers = case (Content#content.properties)#'P_basic'.headers of undefined -> []; H -> sort_arguments(H) end, - match_bindings(Name, fun (#binding{args = Spec}) -> - headers_match(Spec, Headers) - end); + match_bindings(X, fun (#binding{args = Spec}) -> + headers_match(Spec, Headers) + end); route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> - route_internal(X, '_'); + match_routing_key(X, '_'); route(X = #exchange{type = direct}, RoutingKey, _Content) -> - route_internal(X, RoutingKey). + match_routing_key(X, RoutingKey). %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same exchange -match_bindings(XName, Match) -> +match_bindings(#exchange{name = Name}, Match) -> Query = qlc:q([QName || #route{binding = Binding = #binding{ exchange_name = ExchangeName, queue_name = QName}} <- mnesia:table(route), - ExchangeName == XName, + ExchangeName == Name, Match(Binding)]), lookup_qpids( try @@ -271,12 +271,12 @@ match_bindings(XName, Match) -> [QName || #route{binding = Binding = #binding{ queue_name = QName}} <- mnesia:dirty_match_object( - #route{binding = #binding{exchange_name = XName, + #route{binding = #binding{exchange_name = Name, _ = '_'}}), Match(Binding)] end). -route_internal(#exchange{name = Name}, RoutingKey) -> +match_routing_key(#exchange{name = Name}, RoutingKey) -> MatchHead = #route{binding = #binding{exchange_name = Name, queue_name = '$1', key = RoutingKey, -- cgit v1.2.1 From c8a10816690982de04073eec6239fbeb33335e3b Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 15 Jan 2009 14:33:28 +0000 Subject: don't try to dialyze rabbit.app ...it won't work --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 962b6f4b..fede89e1 100644 --- a/Makefile +++ b/Makefile @@ -59,7 +59,7 @@ $(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script: $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.rel $(TARGETS) erl -noshell -eval 'systools:make_script("ebin/rabbit", [{path, ["ebin"]}]), halt().' -dialyze: $(TARGETS) +dialyze: $(BEAM_TARGETS) dialyzer -c $? clean: cleandb -- cgit v1.2.1 From 5246d7b643d6be6fdb29992b1becb6d96f20ff9b Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 15 Jan 2009 14:34:29 +0000 Subject: ignore rabbit.app --- .hgignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.hgignore b/.hgignore index 28f9cfd8..35607765 100644 --- a/.hgignore +++ b/.hgignore @@ -9,6 +9,7 @@ syntax: regexp ^include/rabbit_framing.hrl$ ^src/rabbit_framing.erl$ ^rabbit.plt$ +^ebin/rabbit.app$ ^packaging/RPMS/Fedora/(BUILD|RPMS|SOURCES|SPECS|SRPMS)$ ^packaging/debs/Debian/rabbitmq-server_.*\.(dsc|(diff|tar)\.gz|deb|changes)$ -- cgit v1.2.1 From 6e0e3fcdc9b2760ace7f16537cbc6ad1cd94497e Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 15 Jan 2009 15:57:43 +0000 Subject: add some comments --- src/rabbit_channel.erl | 3 ++- src/rabbit_limiter.erl | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 513d3050..8ef0f7e1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -855,7 +855,8 @@ consumer_queues(Consumers) -> %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, but not acks for -%% messages sent in a response to a basic.get. +%% messages sent in a response to a basic.get (identified by their +%% 'none' consumer tag) notify_limiter(undefined, _Acked) -> ok; notify_limiter(LimiterPid, Acked) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 3776edd0..49a9c195 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -56,8 +56,11 @@ -record(lim, {prefetch_count = 0, ch_pid, - queues = dict:new(), + queues = dict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). +%% 'Notify' is a boolean that indicates whether a queue should be +%% notified of a change in the limit or volume that may allow it to +%% deliver more messages via the limiter's channel. %%---------------------------------------------------------------------------- %% API -- cgit v1.2.1