summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Hood <0x6e6562@gmail.com>2008-12-07 01:19:46 +0000
committerBen Hood <0x6e6562@gmail.com>2008-12-07 01:19:46 +0000
commit54967f60e2ccf692e84c286582ede2f00fe365d8 (patch)
treeab2f728299a0cdfb9816b8125d39e81a4e33c709
parent8bca4d5e133b088e5ab493211343c57116726177 (diff)
parentc4f8ddb5cb914b0f825a5c8fc30df594f92c8703 (diff)
downloadrabbitmq-server-54967f60e2ccf692e84c286582ede2f00fe365d8.tar.gz
Merged default into 18557
-rw-r--r--src/rabbit_amqqueue.erl17
-rw-r--r--src/rabbit_amqqueue_process.erl115
-rw-r--r--src/rabbit_channel.erl27
-rw-r--r--src/rabbit_limiter.erl120
4 files changed, 236 insertions, 43 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4b318eeb..d142507d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -30,8 +30,9 @@
-export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1,
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
-export([claim_queue/2]).
--export([basic_get/3, basic_consume/7, basic_cancel/4]).
+-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2]).
+-export([unblock/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2]).
-export([on_node_down/1]).
@@ -82,12 +83,13 @@
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
--spec(basic_consume/7 ::
- (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) ->
+-spec(basic_consume/8 ::
+ (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
+-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -238,10 +240,10 @@ claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
gen_server:call(QPid, {basic_get, ChPid, NoAck}).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
- ConsumerTag, ExclusiveConsume, OkMsg}).
+ gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
+ LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
@@ -249,6 +251,9 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
notify_sent(QPid, ChPid) ->
gen_server:cast(QPid, {notify_sent, ChPid}).
+unblock(QPid, ChPid) ->
+ gen_server:cast(QPid, {unblock, ChPid}).
+
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f8964e34..b4d0d52d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -56,8 +56,10 @@
%% These are held in our process dictionary
-record(cr, {consumers,
ch_pid,
+ limiter_pid,
monitor_ref,
unacked_messages,
+ is_limit_active,
is_overload_protection_active,
unsent_message_count}).
@@ -124,18 +126,22 @@ all_ch_record() ->
[C || {{ch, _}, C} <- get()].
update_store_and_maybe_block_ch(
- C = #cr{is_overload_protection_active = Active,
+ C = #cr{is_overload_protection_active = Overloaded,
+ is_limit_active = Limited,
unsent_message_count = Count}) ->
- {Result, NewActive} =
+ {Result, NewOverloaded, NewLimited} =
if
- not(Active) and (Count > ?UNSENT_MESSAGE_LIMIT) ->
- {block_ch, true};
- Active and (Count == 0) ->
- {unblock_ch, false};
+ not(Overloaded) and (Count > ?UNSENT_MESSAGE_LIMIT) ->
+ {block_ch, true, Limited};
+ Overloaded and (Count == 0) ->
+ {unblock_ch, false, Limited};
+ Limited and (Count < ?UNSENT_MESSAGE_LIMIT) ->
+ {unblock_ch, Overloaded, false};
true ->
- {ok, Active}
+ {ok, Overloaded, Limited}
end,
- store_ch_record(C#cr{is_overload_protection_active = NewActive}),
+ store_ch_record(C#cr{is_overload_protection_active = NewOverloaded,
+ is_limit_active = NewLimited}),
Result.
deliver_immediately(Message, Delivered,
@@ -144,31 +150,59 @@ deliver_immediately(Message, Delivered,
next_msg_id = NextId}) ->
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
case queue:out(RoundRobin) of
- {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}},
+ {{value, QEntry = {ChPid,
+ #consumer{tag = ConsumerTag,
+ ack_required = AckRequired = true}}},
RoundRobinTail} ->
- rabbit_channel:deliver(
- ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, Delivered, Message}),
- C = #cr{unsent_message_count = Count,
- unacked_messages = UAM} = ch_record(ChPid),
- NewUAM = case AckRequired of
- true -> dict:store(NextId, Message, UAM);
- false -> UAM
- end,
- NewConsumers =
- case update_store_and_maybe_block_ch(
- C#cr{unsent_message_count = Count + 1,
- unacked_messages = NewUAM}) of
- ok -> queue:in(QEntry, RoundRobinTail);
- block_ch -> block_consumers(ChPid, RoundRobinTail)
- end,
- {offered, AckRequired, State#q{round_robin = NewConsumers,
- next_msg_id = NextId +1}};
+ % Use Qos Limits if an ack is required
+ % Query the limiter to find out if a limit has been breached
+ #cr{limiter_pid = LimiterPid} = ch_record(ChPid),
+ case rabbit_limiter:can_send(LimiterPid, self()) of
+ true ->
+ really_deliver(AckRequired, ChPid, ConsumerTag,
+ Delivered, Message, NextId, QName,
+ QEntry, RoundRobinTail, State);
+ false ->
+ % Have another go by cycling through the consumer
+ % queue
+ C = ch_record(ChPid),
+ store_ch_record(C#cr{is_limit_active = true}),
+ NewConsumers = block_consumers(ChPid, RoundRobinTail),
+ deliver_immediately(Message, Delivered,
+ State#q{round_robin = NewConsumers})
+ end;
+ {{value, QEntry = {ChPid,
+ #consumer{tag = ConsumerTag,
+ ack_required = AckRequired = false}}},
+ RoundRobinTail} ->
+ really_deliver(AckRequired, ChPid, ConsumerTag,
+ Delivered, Message, NextId, QName,
+ QEntry, RoundRobinTail, State);
{empty, _} ->
not_offered
end.
+% TODO The arity of this function seems a bit large :-(
+really_deliver(AckRequired, ChPid, ConsumerTag, Delivered, Message, NextId,
+ QName, QEntry, RoundRobinTail, State) ->
+ rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
+ {QName, self(), NextId, Delivered, Message}),
+ C = #cr{unsent_message_count = Count,
+ unacked_messages = UAM} = ch_record(ChPid),
+ NewUAM = case AckRequired of
+ true -> dict:store(NextId, Message, UAM);
+ false -> UAM
+ end,
+ NewConsumers =
+ case update_store_and_maybe_block_ch(
+ C#cr{unsent_message_count = Count + 1,
+ unacked_messages = NewUAM}) of
+ ok -> queue:in(QEntry, RoundRobinTail);
+ block_ch -> block_consumers(ChPid, RoundRobinTail)
+ end,
+ {offered, AckRequired, State#q{round_robin = NewConsumers,
+ next_msg_id = NextId +1}}.
+
attempt_delivery(none, Message, State) ->
case deliver_immediately(Message, false, State) of
{offered, false, State1} ->
@@ -280,7 +314,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
{stop, normal, NewState}
end
end.
-
+
cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) ->
none;
cancel_holder(_ChPid, _ConsumerTag, Holder) ->
@@ -519,8 +553,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply(empty, State)
end;
-handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
- ExclusiveConsume, OkMsg},
+handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
+ ConsumerTag, ExclusiveConsume, OkMsg},
_From, State = #q{owner = Owner,
exclusive_consumer = ExistingHolder,
round_robin = RoundRobin}) ->
@@ -534,7 +568,8 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
ok ->
C = #cr{consumers = Consumers} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
- C1 = C#cr{consumers = [Consumer | Consumers]},
+ C1 = C#cr{consumers = [Consumer | Consumers],
+ limiter_pid = LimiterPid},
store_ch_record(C1),
State1 = State#q{has_had_consumers = true,
exclusive_consumer =
@@ -630,7 +665,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
- C = #cr{unacked_messages = UAM} ->
+ C = #cr{unacked_messages = UAM, limiter_pid = LimiterPid} ->
{Acked, Remaining} = collect_messages(MsgIds, UAM),
persist_acks(Txn, qname(State), Acked),
case Txn of
@@ -663,6 +698,20 @@ handle_cast({requeue, MsgIds, ChPid}, State) ->
[{Message, true} || Message <- Messages], State))
end;
+handle_cast({unblock, ChPid}, State) ->
+ % TODO Refactor the code duplication
+ % between this an the notify_sent cast handler
+ case lookup_ch(ChPid) of
+ not_found ->
+ noreply(State);
+ C = #cr{is_limit_active = true} ->
+ noreply(possibly_unblock(C, State));
+ C ->
+ rabbit_log:warning("Ignoring unblock for an active ch: ~p~n",
+ [C]),
+ noreply(State)
+ end;
+
handle_cast({notify_sent, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found -> noreply(State);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 1eb421ca..f9f92959 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,7 +36,7 @@
-record(ch, {state, proxy_pid, reader_pid, writer_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
- username, virtual_host,
+ username, virtual_host, limiter,
most_recently_declared_queue, consumer_mapping}).
%%----------------------------------------------------------------------------
@@ -102,6 +102,8 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
username = Username,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
+ % TODO See point 3.1.1 of the design - start the limiter lazily
+ limiter = rabbit_limiter:start_link(ProxyPid),
consumer_mapping = dict:new()}.
handle_message({method, Method, Content}, State) ->
@@ -269,6 +271,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{transaction_id = TxnKey,
+ limiter = Limiter,
next_tag = NextDeliveryTag,
unacked_message_q = UAMQ}) ->
if DeliveryTag >= NextDeliveryTag ->
@@ -277,6 +280,20 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
true -> ok
end,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
+ % CC the limiter on the number of acks that have been received
+ % but don't include any acks from a basic.get bottom half
+ % (hence the differentiation between tags set to none and other tags)
+ % TODO - this is quite crude and is probably more expensive than it should
+ % be - according to the OTP documentation, len/1 runs in O(n), probably
+ % not so cool for a queuing system
+ NotBasicGet = queue:filter(
+ fun({_CurrentDeliveryTag, ConsumerTag, _Msg}) ->
+ case ConsumerTag of
+ none -> false;
+ _ -> true
+ end
+ end, Acked),
+ rabbit_limiter:decrement_capacity(Limiter, queue:len(NotBasicGet)),
Participants = ack(State#ch.proxy_pid, TxnKey, Acked),
{noreply, case TxnKey of
none -> State#ch{unacked_message_q = Remaining};
@@ -323,6 +340,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
nowait = NoWait},
_, State = #ch{ proxy_pid = ProxyPid,
reader_pid = ReaderPid,
+ limiter = LimiterPid,
consumer_mapping = ConsumerMapping }) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -340,7 +358,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName,
fun (Q) ->
rabbit_amqqueue:basic_consume(
- Q, NoAck, ReaderPid, ProxyPid,
+ Q, NoAck, ReaderPid, ProxyPid, LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -405,8 +423,9 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
end
end;
-handle_method(#'basic.qos'{}, _, State) ->
- %% FIXME: Need to implement QOS
+handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
+ _, State = #ch{limiter = Limiter}) ->
+ rabbit_limiter:set_prefetch_count(Limiter, PrefetchCount),
{reply, #'basic.qos_ok'{}, State};
handle_method(#'basic.recover'{requeue = true},
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
new file mode 100644
index 00000000..4e130ea0
--- /dev/null
+++ b/src/rabbit_limiter.erl
@@ -0,0 +1,120 @@
+%% TODO Decide what to do with the license statement now that Cohesive have
+%% bailed.
+-module(rabbit_limiter).
+
+
+% I'm starting out with a gen_server because of the synchronous query
+% that the queue process makes
+-behaviour(gen_server).
+
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+-export([start_link/1]).
+-export([set_prefetch_count/2, can_send/2, decrement_capacity/2]).
+
+-record(lim, {prefetch_count = 0,
+ ch_pid,
+ queues = sets:new(),
+ in_use = 0}).
+
+%---------------------------------------------------------------------------
+% API
+%---------------------------------------------------------------------------
+
+% Kicks this pig
+start_link(ChPid) ->
+ {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []),
+ Pid.
+
+set_prefetch_count(LimiterPid, PrefetchCount) ->
+ gen_server:cast(LimiterPid, {prefetch_count, PrefetchCount}).
+
+% Queries the limiter to ask whether the queue can deliver a message
+% without breaching a limit
+can_send(LimiterPid, QPid) ->
+ gen_server:call(LimiterPid, {can_send, QPid}).
+
+% Lets the limiter know that a queue has received an ack from a consumer
+% and hence can reduce the in-use-by-that queue capcity information
+decrement_capacity(LimiterPid, Magnitude) ->
+ gen_server:cast(LimiterPid, {decrement_capacity, Magnitude}).
+
+%---------------------------------------------------------------------------
+% gen_server callbacks
+%---------------------------------------------------------------------------
+
+init([ChPid]) ->
+ {ok, #lim{ch_pid = ChPid} }.
+
+% This queuries the limiter to ask if it is possible to send a message without
+% breaching a limit for this queue process
+handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse,
+ queues = Queues}) ->
+ NewState = State#lim{queues = sets:add_element(QPid, Queues)},
+ case limit_reached(NewState) of
+ true -> {reply, false, NewState};
+ false ->
+ {reply, true, NewState#lim{in_use = InUse + 1}}
+ end.
+
+% When the new limit is larger than the existing limit,
+% notify all queues and forget about queues with an in-use
+% capcity of zero
+handle_cast({prefetch_count, PrefetchCount},
+ State = #lim{prefetch_count = CurrentLimit})
+ when PrefetchCount > CurrentLimit ->
+ notify_queues(State),
+ {noreply, State#lim{prefetch_count = PrefetchCount,
+ queues = sets:new(),
+ in_use = 0}};
+
+% Default setter of the prefetch count
+handle_cast({prefetch_count, PrefetchCount}, State) ->
+ {noreply, State#lim{prefetch_count = PrefetchCount}};
+
+% This is an asynchronous ack from a queue that it has received an ack from
+% a queue. This allows the limiter to update the the in-use-by-that queue
+% capacity infromation.
+handle_cast({decrement_capacity, Magnitude}, State = #lim{in_use = InUse}) ->
+ NewState = decrement_in_use(Magnitude, State),
+ ShouldNotify = limit_reached(State) and not(limit_reached(NewState)),
+ if
+ ShouldNotify ->
+ notify_queues(State),
+ {noreply, State#lim{queues = sets:new(), in_use = InUse - 1}};
+ true ->
+ {noreply, NewState}
+ end.
+
+handle_info(_, State) ->
+ {noreply, State}.
+
+terminate(_, _) ->
+ ok.
+
+code_change(_, State, _) ->
+ State.
+
+%---------------------------------------------------------------------------
+% Internal plumbing
+%---------------------------------------------------------------------------
+
+% Reduces the in-use-count of the queue by a specific magnitude
+decrement_in_use(_, State = #lim{in_use = 0}) ->
+ State#lim{in_use = 0};
+
+decrement_in_use(Magnitude, State = #lim{in_use = InUse}) ->
+ State#lim{in_use = InUse - Magnitude}.
+
+% Unblocks every queue that this limiter knows about
+notify_queues(#lim{ch_pid = ChPid, queues = Queues}) ->
+ sets:fold(fun(Q,_) -> rabbit_amqqueue:unblock(Q, ChPid) end, [], Queues).
+
+% A prefetch limit of zero means unlimited
+limit_reached(#lim{prefetch_count = 0}) ->
+ false;
+
+% Works out whether the limit is breached for the current limiter state
+limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) ->
+ InUse == Limit.
+