summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-03-22 13:45:03 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-03-22 13:45:03 +0000
commitab88adcf3163b7b4ebd93306eb900490be433516 (patch)
tree8d7ba66e83900c01c68d1f232914acf28d65056c
parent00eb6aefa5f0bdacad56aeacd53420a86a479350 (diff)
parent4dbd221c2a2486676d7ee031c9b4b1ce8d5355aa (diff)
downloadrabbitmq-server-ab88adcf3163b7b4ebd93306eb900490be433516.tar.gz
Merge in default.
-rw-r--r--packaging/debs/apt-repository/distributions2
-rw-r--r--src/rabbit_amqqueue.erl35
-rw-r--r--src/rabbit_amqqueue_process.erl125
-rw-r--r--src/rabbit_channel.erl102
-rw-r--r--src/rabbit_channel_sup.erl4
-rw-r--r--src/rabbit_exchange.erl36
-rw-r--r--src/rabbit_exchange_decorator.erl12
-rw-r--r--src/rabbit_limiter.erl331
-rw-r--r--src/rabbit_reader.erl23
-rw-r--r--src/rabbit_registry.erl39
-rw-r--r--src/rabbit_tests.erl31
-rw-r--r--src/rabbit_vhost.erl2
12 files changed, 436 insertions, 306 deletions
diff --git a/packaging/debs/apt-repository/distributions b/packaging/debs/apt-repository/distributions
index 61fd778a..75b9fe46 100644
--- a/packaging/debs/apt-repository/distributions
+++ b/packaging/debs/apt-repository/distributions
@@ -2,6 +2,6 @@ Origin: RabbitMQ
Label: RabbitMQ Repository for Debian / Ubuntu etc
Suite: testing
Codename: kitten
-Architectures: AVR32 alpha amd64 arm armel armhf hppa hurd-i386 i386 ia64 kfreebsd-amd64 kfreebsd-i386 m32 m68k mips mipsel netbsd-alpha netbsd-i386 powerpc s390 s390x sh sparc
+Architectures: AVR32 alpha amd64 arm armel armhf hppa hurd-i386 i386 ia64 kfreebsd-amd64 kfreebsd-i386 m32 m68k mips mipsel netbsd-alpha netbsd-i386 powerpc s390 s390x sh sparc source
Components: main
Description: RabbitMQ Repository for Debian / Ubuntu etc
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 82ac74fa..3f0a7f9c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,9 +26,9 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, wake_up/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
--export([notify_down_all/2, limit_all/3]).
+-export([basic_get/4, basic_consume/8, basic_cancel/4]).
+-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
+-export([notify_down_all/2, activate_limit_all/2]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
@@ -144,19 +144,18 @@
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()).
--spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) ->
- ok_or_errors()).
--spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
+-spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()).
+-spec(basic_get/4 :: (rabbit_types:amqqueue(), pid(), boolean(), pid()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
--spec(basic_consume/7 ::
- (rabbit_types:amqqueue(), boolean(), pid(),
- rabbit_limiter:token(), rabbit_types:ctag(), boolean(), any())
+-spec(basic_consume/8 ::
+ (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
+ rabbit_types:ctag(), boolean(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
--spec(unblock/2 :: (pid(), pid()) -> 'ok').
+-spec(resume/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
@@ -538,16 +537,16 @@ notify_down_all(QPids, ChPid) ->
Bads1 -> {error, Bads1}
end.
-limit_all(QPids, ChPid, Limiter) ->
- delegate:cast(QPids, {limit, ChPid, Limiter}).
+activate_limit_all(QPids, ChPid) ->
+ delegate:cast(QPids, {activate_limit, ChPid}).
-basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- delegate:call(QPid, {basic_get, ChPid, NoAck}).
+basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
+ delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- delegate:call(QPid, {basic_consume, NoAck, ChPid,
- Limiter, ConsumerTag, ExclusiveConsume, OkMsg}).
+ delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
+ ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
@@ -569,7 +568,7 @@ notify_sent_queue_down(QPid) ->
erase({consumer_credit_to, QPid}),
ok.
-unblock(QPid, ChPid) -> delegate:cast(QPid, {unblock, ChPid}).
+resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}).
flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 18b641d4..efe8efc4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -68,7 +68,6 @@
consumer_count,
blocked_consumers,
limiter,
- is_limit_active,
unsent_message_count}).
%%----------------------------------------------------------------------------
@@ -165,6 +164,8 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName},
fun (BQS) ->
BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete doesn't return 'ok'.
+ rabbit_event:if_enabled(State, #q.stats_timer,
+ fun() -> emit_stats(State) end),
rabbit_amqqueue:internal_delete(QName),
BQS1
end, State).
@@ -362,17 +363,17 @@ lookup_ch(ChPid) ->
C -> C
end.
-ch_record(ChPid) ->
+ch_record(ChPid, LimiterPid) ->
Key = {ch, ChPid},
case get(Key) of
undefined -> MonitorRef = erlang:monitor(process, ChPid),
+ Limiter = rabbit_limiter:client(LimiterPid),
C = #cr{ch_pid = ChPid,
monitor_ref = MonitorRef,
acktags = queue:new(),
consumer_count = 0,
blocked_consumers = queue:new(),
- is_limit_active = false,
- limiter = rabbit_limiter:make_token(),
+ limiter = Limiter,
unsent_message_count = 0},
put(Key, C),
C;
@@ -392,38 +393,18 @@ store_ch_record(C = #cr{ch_pid = ChPid}) ->
put({ch, ChPid}, C),
ok.
-erase_ch_record(#cr{ch_pid = ChPid,
- limiter = Limiter,
- monitor_ref = MonitorRef}) ->
- ok = rabbit_limiter:unregister(Limiter, self()),
+erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) ->
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
ok.
-update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) ->
- ok = rabbit_limiter:register(Limiter, self()),
- update_ch_record(C#cr{consumer_count = 1});
-update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) ->
- ok = rabbit_limiter:unregister(Limiter, self()),
- update_ch_record(C#cr{consumer_count = 0,
- limiter = rabbit_limiter:make_token()});
-update_consumer_count(C = #cr{consumer_count = Count}, Delta) ->
- update_ch_record(C#cr{consumer_count = Count + Delta}).
-
all_ch_record() -> [C || {{ch, _}, C} <- get()].
block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}).
-is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
- Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
-
-ch_record_state_transition(OldCR, NewCR) ->
- case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of
- {true, false} -> unblock;
- {false, true} -> block;
- {_, _} -> ok
- end.
+is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
+ Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
@@ -440,18 +421,20 @@ deliver_msgs_to_consumers(DeliverFun, false,
end.
deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
- C = ch_record(ChPid),
+ C = lookup_ch(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
{false, State};
- false -> case rabbit_limiter:can_send(C#cr.limiter, self(),
+ false -> case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required) of
- false -> block_consumer(C#cr{is_limit_active = true}, E),
- {false, State};
- true -> AC1 = queue:in(E, State#q.active_consumers),
- deliver_msg_to_consumer(
- DeliverFun, Consumer, C,
- State#q{active_consumers = AC1})
+ {suspend, Limiter} ->
+ block_consumer(C#cr{limiter = Limiter}, E),
+ {false, State};
+ {continue, Limiter} ->
+ AC1 = queue:in(E, State#q.active_consumers),
+ deliver_msg_to_consumer(
+ DeliverFun, Consumer, C#cr{limiter = Limiter},
+ State#q{active_consumers = AC1})
end
end.
@@ -642,15 +625,15 @@ possibly_unblock(State, ChPid, Update) ->
State;
C ->
C1 = Update(C),
- case ch_record_state_transition(C, C1) of
- ok -> update_ch_record(C1),
- State;
- unblock -> #cr{blocked_consumers = Consumers} = C1,
- update_ch_record(
- C1#cr{blocked_consumers = queue:new()}),
- AC1 = queue:join(State#q.active_consumers,
- Consumers),
- run_message_queue(State#q{active_consumers = AC1})
+ case is_ch_blocked(C) andalso not is_ch_blocked(C1) of
+ false -> update_ch_record(C1),
+ State;
+ true -> #cr{blocked_consumers = Consumers} = C1,
+ update_ch_record(
+ C1#cr{blocked_consumers = queue:new()}),
+ AC1 = queue:join(State#q.active_consumers,
+ Consumers),
+ run_message_queue(State#q{active_consumers = AC1})
end
end.
@@ -1108,7 +1091,7 @@ handle_call({notify_down, ChPid}, From, State) ->
{stop, State1} -> stop(From, ok, State1)
end;
-handle_call({basic_get, ChPid, NoAck}, _From,
+handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
@@ -1118,7 +1101,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
{{Message, IsDelivered, AckTag}, State2} ->
State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
case AckRequired of
- true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
+ true -> C = #cr{acktags = ChAckTags} =
+ ch_record(ChPid, LimiterPid),
ChAckTags1 = queue:in(AckTag, ChAckTags),
update_ch_record(C#cr{acktags = ChAckTags1}),
State2;
@@ -1128,15 +1112,21 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply({ok, BQ:len(BQS), Msg}, State3)
end;
-handle_call({basic_consume, NoAck, ChPid, Limiter,
+handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ConsumerTag, ExclusiveConsume, OkMsg},
_From, State = #q{exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
- C = ch_record(ChPid),
- update_consumer_count(C#cr{limiter = Limiter}, +1),
+ C = #cr{consumer_count = Count,
+ limiter = Limiter} = ch_record(ChPid, LimiterPid),
+ Limiter1 = case LimiterActive of
+ true -> rabbit_limiter:activate(Limiter);
+ false -> Limiter
+ end,
+ update_ch_record(C#cr{consumer_count = Count + 1,
+ limiter = Limiter1}),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
@@ -1157,10 +1147,18 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
case lookup_ch(ChPid) of
not_found ->
reply(ok, State);
- C = #cr{blocked_consumers = Blocked} ->
+ C = #cr{consumer_count = Count,
+ limiter = Limiter,
+ blocked_consumers = Blocked} ->
emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
- update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1),
+ Limiter1 = case Count of
+ 1 -> rabbit_limiter:deactivate(Limiter);
+ _ -> Limiter
+ end,
+ update_ch_record(C#cr{consumer_count = Count - 1,
+ limiter = Limiter1,
+ blocked_consumers = Blocked1}),
State1 = State#q{
exclusive_consumer = case Holder of
{ChPid, ConsumerTag} -> none;
@@ -1288,10 +1286,12 @@ handle_cast({reject, AckTags, false, ChPid}, State) ->
handle_cast(delete_immediately, State) ->
stop(State);
-handle_cast({unblock, ChPid}, State) ->
+handle_cast({resume, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,
- fun (C) -> C#cr{is_limit_active = false} end));
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:resume(Limiter)}
+ end));
handle_cast({notify_sent, ChPid, Credit}, State) ->
noreply(
@@ -1300,21 +1300,12 @@ handle_cast({notify_sent, ChPid, Credit}, State) ->
C#cr{unsent_message_count = Count - Credit}
end));
-handle_cast({limit, ChPid, Limiter}, State) ->
+handle_cast({activate_limit, ChPid}, State) ->
noreply(
- possibly_unblock(
- State, ChPid,
- fun (C = #cr{consumer_count = ConsumerCount,
- limiter = OldLimiter,
- is_limit_active = OldLimited}) ->
- case (ConsumerCount =/= 0 andalso
- not rabbit_limiter:is_enabled(OldLimiter)) of
- true -> ok = rabbit_limiter:register(Limiter, self());
- false -> ok
- end,
- Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter),
- C#cr{limiter = Limiter, is_limit_active = Limited}
- end));
+ possibly_unblock(State, ChPid,
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:activate(Limiter)}
+ end));
handle_cast({flush, ChPid}, State) ->
ok = rabbit_channel:flushed(ChPid, self()),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 792a06c9..e9f69b62 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -81,8 +81,8 @@
-spec(start_link/11 ::
(channel_number(), pid(), pid(), pid(), string(),
rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
- rabbit_framing:amqp_table(),
- pid(), rabbit_limiter:token()) -> rabbit_types:ok_pid_or_error()).
+ rabbit_framing:amqp_table(), pid(), pid()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
@@ -180,7 +180,7 @@ force_event_refresh() ->
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
- Capabilities, CollectorPid, Limiter]) ->
+ Capabilities, CollectorPid, LimiterPid]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
State = #ch{state = starting,
@@ -190,7 +190,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
writer_pid = WriterPid,
conn_pid = ConnPid,
conn_name = ConnName,
- limiter = Limiter,
+ limiter = rabbit_limiter:new(LimiterPid),
tx = none,
next_tag = 1,
unacked_message_q = queue:new(),
@@ -372,6 +372,8 @@ terminate(Reason, State) ->
_ -> ok
end,
pg_local:leave(rabbit_channels, self()),
+ rabbit_event:if_enabled(State, #ch.stats_timer,
+ fun() -> emit_stats(State) end),
rabbit_event:notify(channel_closed, [{pid, self()}]).
code_change(_OldVsn, State, _Extra) ->
@@ -676,12 +678,15 @@ handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
_, State = #ch{writer_pid = WriterPid,
conn_pid = ConnPid,
+ limiter = Limiter,
next_tag = DeliveryTag}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
- fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
+ fun (Q) -> rabbit_amqqueue:basic_get(
+ Q, self(), NoAck, rabbit_limiter:pid(Limiter))
+ end) of
{ok, MessageCount,
Msg = {QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
@@ -728,7 +733,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) ->
{rabbit_amqqueue:basic_consume(
- Q, NoAck, self(), Limiter,
+ Q, NoAck, self(),
+ rabbit_limiter:pid(Limiter),
+ rabbit_limiter:is_active(Limiter),
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
@@ -803,19 +810,17 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
rabbit_misc:protocol_error(not_implemented,
"prefetch_size!=0 (~w)", [Size]);
-handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _,
+handle_method(#'basic.qos'{prefetch_count = 0}, _,
State = #ch{limiter = Limiter}) ->
- Limiter1 = case {rabbit_limiter:is_enabled(Limiter), PrefetchCount} of
- {false, 0} -> Limiter;
- {false, _} -> enable_limiter(State);
- {_, _} -> Limiter
- end,
- Limiter3 = case rabbit_limiter:limit(Limiter1, PrefetchCount) of
- ok -> Limiter1;
- {disabled, Limiter2} -> ok = limit_queues(Limiter2, State),
- Limiter2
- end,
- {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter3}};
+ Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
+ {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
+
+handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _,
+ State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
+ Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
+ PrefetchCount, queue:len(UAMQ)),
+ {reply, #'basic.qos_ok'{},
+ maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ,
@@ -1078,25 +1083,34 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter = Limiter}) ->
- Limiter2 = case rabbit_limiter:unblock(Limiter) of
- ok -> Limiter;
- {disabled, Limiter1} -> ok = limit_queues(Limiter1, State),
- Limiter1
- end,
- {reply, #'channel.flow_ok'{active = true}, State#ch{limiter = Limiter2}};
+ Limiter1 = rabbit_limiter:unblock(Limiter),
+ {reply, #'channel.flow_ok'{active = true},
+ maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
handle_method(#'channel.flow'{active = false}, _,
State = #ch{consumer_mapping = Consumers,
limiter = Limiter}) ->
- Limiter1 = case rabbit_limiter:is_enabled(Limiter) of
- true -> Limiter;
- false -> enable_limiter(State)
- end,
- State1 = State#ch{limiter = Limiter1},
- ok = rabbit_limiter:block(Limiter1),
- QPids = consumer_queues(Consumers),
- ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, maybe_send_flow_ok(State1#ch{blocking = sets:from_list(QPids)})};
+ case rabbit_limiter:is_blocked(Limiter) of
+ true -> {noreply, maybe_send_flow_ok(State)};
+ false -> Limiter1 = rabbit_limiter:block(Limiter),
+ State1 = maybe_limit_queues(Limiter, Limiter1,
+ State#ch{limiter = Limiter1}),
+ %% The semantics of channel.flow{active=false}
+ %% require that no messages are delivered after the
+ %% channel.flow_ok has been sent. We accomplish that
+ %% by "flushing" all messages in flight from the
+ %% consumer queues to us. To do this we tell all the
+ %% queues to invoke rabbit_channel:flushed/2, which
+ %% will send us a {flushed, ...} message that appears
+ %% *after* all the {deliver, ...} messages. We keep
+ %% track of all the QPids thus asked, and once all of
+ %% them have responded (or died) we send the
+ %% channel.flow_ok.
+ QPids = consumer_queues(Consumers),
+ ok = rabbit_amqqueue:flush_all(QPids, self()),
+ {noreply, maybe_send_flow_ok(
+ State1#ch{blocking = sets:from_list(QPids)})}
+ end;
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
@@ -1332,14 +1346,14 @@ foreach_per_queue(F, UAL) ->
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_foreach(F, T).
-enable_limiter(State = #ch{unacked_message_q = UAMQ,
- limiter = Limiter}) ->
- Limiter1 = rabbit_limiter:enable(Limiter, queue:len(UAMQ)),
- ok = limit_queues(Limiter1, State),
- Limiter1.
-
-limit_queues(Limiter, #ch{consumer_mapping = Consumers}) ->
- rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), Limiter).
+maybe_limit_queues(OldLimiter, NewLimiter, State) ->
+ case ((not rabbit_limiter:is_active(OldLimiter)) andalso
+ rabbit_limiter:is_active(NewLimiter)) of
+ true -> Queues = consumer_queues(State#ch.consumer_mapping),
+ rabbit_amqqueue:activate_limit_all(Queues, self());
+ false -> ok
+ end,
+ State.
consumer_queues(Consumers) ->
lists:usort([QPid ||
@@ -1350,7 +1364,9 @@ consumer_queues(Consumers) ->
%% messages sent in a response to a basic.get (identified by their
%% 'none' consumer tag)
notify_limiter(Limiter, Acked) ->
- case rabbit_limiter:is_enabled(Limiter) of
+ %% optimisation: avoid the potentially expensive 'foldl' in the
+ %% common case.
+ case rabbit_limiter:is_prefetch_limited(Limiter) of
false -> ok;
true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
({_, _, _}, Acc) -> Acc + 1
@@ -1515,7 +1531,7 @@ i(messages_uncommitted, #ch{}) -> 0;
i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
i(prefetch_count, #ch{limiter = Limiter}) ->
- rabbit_limiter:get_limit(Limiter);
+ rabbit_limiter:get_prefetch_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->
rabbit_limiter:is_blocked(Limiter);
i(Item, _) ->
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 8ea44a81..a0c7624b 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -58,7 +58,7 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
{channel, {rabbit_channel, start_link,
[Channel, ReaderPid, WriterPid, ReaderPid, ConnName,
Protocol, User, VHost, Capabilities, Collector,
- rabbit_limiter:make_token(LimiterPid)]},
+ LimiterPid]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
{ok, SupPid, {ChannelPid, AState}};
@@ -72,7 +72,7 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
{channel, {rabbit_channel, start_link,
[Channel, ClientChannelPid, ClientChannelPid, ConnPid,
ConnName, Protocol, User, VHost, Capabilities, Collector,
- rabbit_limiter:make_token(LimiterPid)]},
+ LimiterPid]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 5f4fb9ec..9e98448d 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -326,34 +326,34 @@ route(#exchange{name = #resource{virtual_host = VHost,
%% Optimisation
[rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
{Decorators, _} ->
- QNames = route1(Delivery, {[X], XName, []}),
- lists:usort(decorate_route(Decorators, X, Delivery, QNames))
+ lists:usort(route1(Delivery, Decorators, {[X], XName, []}))
end.
-decorate_route([], _X, _Delivery, QNames) ->
+route1(_, _, {[], _, QNames}) ->
QNames;
-decorate_route(Decorators, X, Delivery, QNames) ->
- QNames ++
- lists:append([Decorator:route(X, Delivery) || Decorator <- Decorators]).
-
-route1(_, {[], _, QNames}) ->
- QNames;
-route1(Delivery, {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
- DstNames = process_alternate(
- X, ((type_to_module(Type)):route(X, Delivery))),
- route1(Delivery,
+route1(Delivery, Decorators,
+ {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
+ ExchangeDests = (type_to_module(Type)):route(X, Delivery),
+ DecorateDests = process_decorators(X, Decorators, Delivery),
+ AlternateDests = process_alternate(X, ExchangeDests),
+ route1(Delivery, Decorators,
lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames},
- DstNames)).
+ AlternateDests ++ DecorateDests ++ ExchangeDests)).
-process_alternate(#exchange{arguments = []}, Results) -> %% optimisation
- Results;
+process_alternate(#exchange{arguments = []}, _Results) -> %% optimisation
+ [];
process_alternate(#exchange{name = XName, arguments = Args}, []) ->
case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of
undefined -> [];
AName -> [AName]
end;
-process_alternate(_X, Results) ->
- Results.
+process_alternate(_X, _Results) ->
+ [].
+
+process_decorators(_, [], _) -> %% optimisation
+ [];
+process_decorators(X, Decorators, Delivery) ->
+ lists:append([Decorator:route(X, Delivery) || Decorator <- Decorators]).
process_route(#resource{kind = exchange} = XName,
{_WorkList, XName, _QNames} = Acc) ->
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index 8f17adfc..040b55db 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -57,12 +57,10 @@
-callback remove_bindings(serial(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok'.
-%% called after exchange routing
-%% return value is a list of queues to be added to the list of
-%% destination queues. decorators must register separately for
-%% this callback using exchange_decorator_route.
--callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
- [rabbit_amqqueue:name()].
+%% Decorators can optionally implement route/2 which allows additional
+%% destinations to be added to the routing decision.
+%% -callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
+%% [rabbit_amqqueue:name() | rabbit_exchange:name()].
-else.
@@ -70,7 +68,7 @@
behaviour_info(callbacks) ->
[{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3},
- {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}, {route, 2}];
+ {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 8a7d14fe..430c2716 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -14,42 +14,144 @@
%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
+%% The purpose of the limiter is to stem the flow of messages from
+%% queues to channels, in order to act upon various protocol-level
+%% flow control mechanisms, specifically AMQP's basic.qos
+%% prefetch_count and channel.flow.
+%%
+%% Each channel has an associated limiter process, created with
+%% start_link/1, which it passes to queues on consumer creation with
+%% rabbit_amqqueue:basic_consume/8, and rabbit_amqqueue:basic_get/4.
+%% The latter isn't strictly necessary, since basic.get is not
+%% subject to limiting, but it means that whenever a queue knows about
+%% a channel, it also knows about its limiter, which is less fiddly.
+%%
+%% Th limiter process holds state that is, in effect, shared between
+%% the channel and all queues from which the channel is
+%% consuming. Essentially all these queues are competing for access to
+%% a single, limited resource - the ability to deliver messages via
+%% the channel - and it is the job of the limiter process to mediate
+%% that access.
+%%
+%% The limiter process is separate from the channel process for two
+%% reasons: separation of concerns, and efficiency. Channels can get
+%% very busy, particularly if they are also dealing with publishes.
+%% With a separate limiter process all the aforementioned access
+%% mediation can take place without touching the channel.
+%%
+%% For efficiency, both the channel and the queues keep some local
+%% state, initialised from the limiter pid with new/1 and client/1,
+%% respectively. In particular this allows them to avoid any
+%% interaction with the limiter process when it is 'inactive', i.e. no
+%% protocol-level flow control is taking place.
+%%
+%% This optimisation does come at the cost of some complexity though:
+%% when a limiter becomes active, the channel needs to inform all its
+%% consumer queues of this change in status. It does this by invoking
+%% rabbit_amqqueue:activate_limit_all/2. Note that there is no inverse
+%% transition, i.e. once a queue has been told about an active
+%% limiter, it is not subsequently told when that limiter becomes
+%% inactive. In practice it is rare for that to happen, though we
+%% could optimise this case in the future.
+%%
+%% The interactions with the limiter are as follows:
+%%
+%% 1. Channels tell the limiter about basic.qos prefetch counts -
+%% that's what the limit_prefetch/3, unlimit_prefetch/1,
+%% is_prefetch_limited/1, get_prefetch_limit/1 API functions are
+%% about - and channel.flow blocking - that's what block/1,
+%% unblock/1 and is_blocked/1 are for.
+%%
+%% 2. Queues register with the limiter - this happens as part of
+%% activate/1.
+%%
+%% 4. The limiter process maintains an internal counter of 'messages
+%% sent but not yet acknowledged', called the 'volume'.
+%%
+%% 5. Queues ask the limiter for permission (with can_send/2) whenever
+%% they want to deliver a message to a channel. The limiter checks
+%% whether a) the channel isn't blocked by channel.flow, and b) the
+%% volume has not yet reached the prefetch limit. If so it
+%% increments the volume and tells the queue to proceed. Otherwise
+%% it marks the queue as requiring notification (see below) and
+%% tells the queue not to proceed.
+%%
+%% 6. A queue that has told to proceed (by the return value of
+%% can_send/2) sends the message to the channel. Conversely, a
+%% queue that has been told not to proceed, will not attempt to
+%% deliver that message, or any future messages, to the
+%% channel. This is accomplished by can_send/2 capturing the
+%% outcome in the local state, where it can be accessed with
+%% is_suspended/1.
+%%
+%% 7. When a channel receives an ack it tells the limiter (via ack/2)
+%% how many messages were ack'ed. The limiter process decrements
+%% the volume and if it falls below the prefetch_count then it
+%% notifies (through rabbit_amqqueue:resume/2) all the queues
+%% requiring notification, i.e. all those that had a can_send/2
+%% request denied.
+%%
+%% 8. Upon receipt of such a notification, queues resume delivery to
+%% the channel, i.e. they will once again start asking limiter, as
+%% described in (5).
+%%
+%% 9. When a queues has no more consumers associated with a particular
+%% channel, it deactivates use of the limiter with deactivate/1,
+%% which alters the local state such that no further interactions
+%% with the limiter process take place until a subsequent
+%% activate/1.
+
-module(rabbit_limiter).
-behaviour(gen_server2).
+-export([start_link/0]).
+%% channel API
+-export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1,
+ is_prefetch_limited/1, is_blocked/1, is_active/1,
+ get_prefetch_limit/1, ack/2, pid/1]).
+%% queue API
+-export([client/1, activate/1, can_send/2, resume/1, deactivate/1,
+ is_suspended/1]).
+%% callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/3]).
--export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2,
- disable/1]).
--export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
--export([get_limit/1, block/1, unblock/1, is_blocked/1]).
%%----------------------------------------------------------------------------
--record(token, {pid, enabled}).
+-record(lstate, {pid, prefetch_limited, blocked}).
+-record(qstate, {pid, state}).
-ifdef(use_specs).
--export_type([token/0]).
-
--opaque(token() :: #token{}).
+-type(lstate() :: #lstate{pid :: pid(),
+ prefetch_limited :: boolean(),
+ blocked :: boolean()}).
+-type(qstate() :: #qstate{pid :: pid(),
+ state :: 'dormant' | 'active' | 'suspended'}).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(make_token/0 :: () -> token()).
--spec(make_token/1 :: ('undefined' | pid()) -> token()).
--spec(is_enabled/1 :: (token()) -> boolean()).
--spec(enable/2 :: (token(), non_neg_integer()) -> token()).
--spec(disable/1 :: (token()) -> token()).
--spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}).
--spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()).
--spec(ack/2 :: (token(), non_neg_integer()) -> 'ok').
--spec(register/2 :: (token(), pid()) -> 'ok').
--spec(unregister/2 :: (token(), pid()) -> 'ok').
--spec(get_limit/1 :: (token()) -> non_neg_integer()).
--spec(block/1 :: (token()) -> 'ok').
--spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}).
--spec(is_blocked/1 :: (token()) -> boolean()).
+-spec(new/1 :: (pid()) -> lstate()).
+
+-spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer())
+ -> lstate()).
+-spec(unlimit_prefetch/1 :: (lstate()) -> lstate()).
+-spec(block/1 :: (lstate()) -> lstate()).
+-spec(unblock/1 :: (lstate()) -> lstate()).
+-spec(is_prefetch_limited/1 :: (lstate()) -> boolean()).
+-spec(is_blocked/1 :: (lstate()) -> boolean()).
+-spec(is_active/1 :: (lstate()) -> boolean()).
+-spec(get_prefetch_limit/1 :: (lstate()) -> non_neg_integer()).
+-spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok').
+-spec(pid/1 :: (lstate()) -> pid()).
+
+-spec(client/1 :: (pid()) -> qstate()).
+-spec(activate/1 :: (qstate()) -> qstate()).
+-spec(can_send/2 :: (qstate(), boolean()) ->
+ {'continue' | 'suspend', qstate()}).
+-spec(resume/1 :: (qstate()) -> qstate()).
+-spec(deactivate/1 :: (qstate()) -> qstate()).
+-spec(is_suspended/1 :: (qstate()) -> boolean()).
-endif.
@@ -70,114 +172,120 @@
start_link() -> gen_server2:start_link(?MODULE, [], []).
-make_token() -> make_token(undefined).
-make_token(Pid) -> #token{pid = Pid, enabled = false}.
+new(Pid) ->
+ %% this a 'call' to ensure that it is invoked at most once.
+ ok = gen_server:call(Pid, {new, self()}),
+ #lstate{pid = Pid, prefetch_limited = false, blocked = false}.
-is_enabled(#token{enabled = Enabled}) -> Enabled.
+limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 ->
+ ok = gen_server:call(L#lstate.pid,
+ {limit_prefetch, PrefetchCount, UnackedCount}),
+ L#lstate{prefetch_limited = true}.
-enable(#token{pid = Pid} = Token, Volume) ->
- gen_server2:call(Pid, {enable, Token, self(), Volume}, infinity).
+unlimit_prefetch(L) ->
+ ok = gen_server:call(L#lstate.pid, unlimit_prefetch),
+ L#lstate{prefetch_limited = false}.
-disable(#token{pid = Pid} = Token) ->
- gen_server2:call(Pid, {disable, Token}, infinity).
+block(L) ->
+ ok = gen_server:call(L#lstate.pid, block),
+ L#lstate{blocked = true}.
-limit(Limiter, PrefetchCount) ->
- maybe_call(Limiter, {limit, PrefetchCount, Limiter}, ok).
+unblock(L) ->
+ ok = gen_server:call(L#lstate.pid, unblock),
+ L#lstate{blocked = false}.
-%% Ask the limiter whether the queue can deliver a message without
-%% breaching a limit. Note that we don't use maybe_call here in order
-%% to avoid always going through with_exit_handler/2, even when the
-%% limiter is disabled.
-can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) ->
- rabbit_misc:with_exit_handler(
- fun () -> true end,
- fun () ->
- gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity)
- end);
-can_send(_, _, _) ->
- true.
+is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited.
-%% Let the limiter know that the channel has received some acks from a
-%% consumer
-ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}).
+is_blocked(#lstate{blocked = Blocked}) -> Blocked.
-register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}).
+is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L).
-unregister(Limiter, QPid) -> maybe_cast(Limiter, {unregister, QPid}).
+get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0;
+get_prefetch_limit(L) -> gen_server:call(L#lstate.pid, get_prefetch_limit).
-get_limit(Limiter) ->
+ack(#lstate{prefetch_limited = false}, _AckCount) -> ok;
+ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}).
+
+pid(#lstate{pid = Pid}) -> Pid.
+
+client(Pid) -> #qstate{pid = Pid, state = dormant}.
+
+activate(L = #qstate{state = dormant}) ->
+ ok = gen_server:cast(L#qstate.pid, {register, self()}),
+ L#qstate{state = active};
+activate(L) -> L.
+
+can_send(L = #qstate{state = active}, AckRequired) ->
rabbit_misc:with_exit_handler(
- fun () -> 0 end,
- fun () -> maybe_call(Limiter, get_limit, 0) end).
+ fun () -> {continue, L} end,
+ fun () -> Msg = {can_send, self(), AckRequired},
+ case gen_server2:call(L#qstate.pid, Msg, infinity) of
+ true -> {continue, L};
+ false -> {suspend, L#qstate{state = suspended}}
+ end
+ end);
+can_send(L, _AckRequired) -> {continue, L}.
-block(Limiter) ->
- maybe_call(Limiter, block, ok).
+resume(L) -> L#qstate{state = active}.
-unblock(Limiter) ->
- maybe_call(Limiter, {unblock, Limiter}, ok).
+deactivate(L = #qstate{state = dormant}) -> L;
+deactivate(L) ->
+ ok = gen_server:cast(L#qstate.pid, {unregister, self()}),
+ L#qstate{state = dormant}.
-is_blocked(Limiter) ->
- maybe_call(Limiter, is_blocked, false).
+is_suspended(#qstate{state = suspended}) -> true;
+is_suspended(#qstate{}) -> false.
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([]) ->
- {ok, #lim{}}.
+init([]) -> {ok, #lim{}}.
-prioritise_call(get_limit, _From, _State) -> 9;
-prioritise_call(_Msg, _From, _State) -> 0.
+prioritise_call(get_prefetch_limit, _From, _State) -> 9;
+prioritise_call(_Msg, _From, _State) -> 0.
+
+handle_call({new, ChPid}, _From, State = #lim{ch_pid = undefined}) ->
+ {reply, ok, State#lim{ch_pid = ChPid}};
+
+handle_call({limit_prefetch, PrefetchCount, UnackedCount}, _From, State) ->
+ %% assertion
+ true = State#lim.prefetch_count == 0 orelse
+ State#lim.volume == UnackedCount,
+ {reply, ok, maybe_notify(State, State#lim{prefetch_count = PrefetchCount,
+ volume = UnackedCount})};
+
+handle_call(unlimit_prefetch, _From, State) ->
+ {reply, ok, maybe_notify(State, State#lim{prefetch_count = 0,
+ volume = 0})};
+
+handle_call(block, _From, State) ->
+ {reply, ok, State#lim{blocked = true}};
+
+handle_call(unblock, _From, State) ->
+ {reply, ok, maybe_notify(State, State#lim{blocked = false})};
+
+handle_call(get_prefetch_limit, _From,
+ State = #lim{prefetch_count = PrefetchCount}) ->
+ {reply, PrefetchCount, State};
handle_call({can_send, QPid, _AckRequired}, _From,
State = #lim{blocked = true}) ->
{reply, false, limit_queue(QPid, State)};
handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
- case limit_reached(State) of
+ case prefetch_limit_reached(State) of
true -> {reply, false, limit_queue(QPid, State)};
false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
true -> Volume
end}}
- end;
-
-handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
- {reply, PrefetchCount, State};
-
-handle_call({limit, PrefetchCount, Token}, _From, State) ->
- case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of
- {cont, State1} ->
- {reply, ok, State1};
- {stop, State1} ->
- {reply, {disabled, Token#token{enabled = false}}, State1}
- end;
-
-handle_call(block, _From, State) ->
- {reply, ok, State#lim{blocked = true}};
-
-handle_call({unblock, Token}, _From, State) ->
- case maybe_notify(State, State#lim{blocked = false}) of
- {cont, State1} ->
- {reply, ok, State1};
- {stop, State1} ->
- {reply, {disabled, Token#token{enabled = false}}, State1}
- end;
-
-handle_call(is_blocked, _From, State) ->
- {reply, blocked(State), State};
-
-handle_call({enable, Token, Channel, Volume}, _From, State) ->
- {reply, Token#token{enabled = true},
- State#lim{ch_pid = Channel, volume = Volume}};
-handle_call({disable, Token}, _From, State) ->
- {reply, Token#token{enabled = false}, State}.
+ end.
handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
true -> Volume - Count
end,
- {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}),
- {noreply, State1};
+ {noreply, maybe_notify(State, State#lim{volume = NewVolume})};
handle_cast({register, QPid}, State) ->
{noreply, remember_queue(QPid, State)};
@@ -199,27 +307,13 @@ code_change(_, State, _) ->
%%----------------------------------------------------------------------------
maybe_notify(OldState, NewState) ->
- case (limit_reached(OldState) orelse blocked(OldState)) andalso
- not (limit_reached(NewState) orelse blocked(NewState)) of
- true -> NewState1 = notify_queues(NewState),
- {case NewState1#lim.prefetch_count of
- 0 -> stop;
- _ -> cont
- end, NewState1};
- false -> {cont, NewState}
+ case (prefetch_limit_reached(OldState) orelse blocked(OldState)) andalso
+ not (prefetch_limit_reached(NewState) orelse blocked(NewState)) of
+ true -> notify_queues(NewState);
+ false -> NewState
end.
-maybe_call(#token{pid = Pid, enabled = true}, Call, _Default) ->
- gen_server2:call(Pid, Call, infinity);
-maybe_call(_, _Call, Default) ->
- Default.
-
-maybe_cast(#token{pid = Pid, enabled = true}, Cast) ->
- gen_server2:cast(Pid, Cast);
-maybe_cast(_, _Call) ->
- ok.
-
-limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
+prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
blocked(#lim{blocked = Blocked}) -> Blocked.
@@ -231,10 +325,9 @@ remember_queue(QPid, State = #lim{queues = Queues}) ->
true -> State
end.
-forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) ->
+forget_queue(QPid, State = #lim{queues = Queues}) ->
case orddict:find(QPid, Queues) of
{ok, {MRef, _}} -> true = erlang:demonitor(MRef),
- ok = rabbit_amqqueue:unblock(QPid, ChPid),
State#lim{queues = orddict:erase(QPid, Queues)};
error -> State
end.
@@ -251,13 +344,13 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
end, {[], Queues}, Queues),
case length(QList) of
0 -> ok;
- 1 -> ok = rabbit_amqqueue:unblock(hd(QList), ChPid); %% common case
+ 1 -> ok = rabbit_amqqueue:resume(hd(QList), ChPid); %% common case
L ->
%% We randomly vary the position of queues in the list,
%% thus ensuring that each queue has an equal chance of
%% being notified first.
{L1, L2} = lists:split(random:uniform(L), QList),
- [[ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L3]
+ [[ok = rabbit_amqqueue:resume(Q, ChPid) || Q <- L3]
|| L3 <- [L2, L1]],
ok
end,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index aaaa179a..61fac0e2 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -306,8 +306,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
closed when State#v1.connection_state =:= closed ->
ok;
closed ->
+ maybe_emit_stats(State),
throw(connection_closed_abruptly);
{error, Reason} ->
+ maybe_emit_stats(State),
throw({inet_error, Reason});
{other, {system, From, Request}} ->
sys:handle_system_msg(Request, From, State#v1.parent,
@@ -338,23 +340,28 @@ handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
%% ordinary error case. However, since this termination is
%% initiated by our parent it is probably more important to exit
%% quickly.
+ maybe_emit_stats(State),
exit(Reason);
-handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, _State) ->
+handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, State) ->
+ maybe_emit_stats(State),
throw(E);
handle_other({channel_exit, Channel, Reason}, State) ->
handle_exception(State, Channel, Reason);
handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) ->
handle_dependent_exit(ChPid, Reason, State);
-handle_other(terminate_connection, _State) ->
+handle_other(terminate_connection, State) ->
+ maybe_emit_stats(State),
stop;
handle_other(handshake_timeout, State)
when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) ->
State;
handle_other(handshake_timeout, State) ->
+ maybe_emit_stats(State),
throw({handshake_timeout, State#v1.callback});
handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
State;
-handle_other(heartbeat_timeout, #v1{connection_state = S}) ->
+handle_other(heartbeat_timeout, State = #v1{connection_state = S}) ->
+ maybe_emit_stats(State),
throw({heartbeat_timeout, S});
handle_other({'$gen_call', From, {shutdown, Explanation}}, State) ->
{ForceTermination, NewState} = terminate(Explanation, State),
@@ -386,8 +393,9 @@ handle_other(emit_stats, State) ->
handle_other({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
control_throttle(State);
-handle_other(Other, _State) ->
+handle_other(Other, State) ->
%% internal error -> something worth dying for
+ maybe_emit_stats(State),
exit({unexpected_message, Other}).
switch_callback(State, Callback, Length) ->
@@ -850,8 +858,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
- rabbit_event:if_enabled(State1, #v1.stats_timer,
- fun() -> emit_stats(State1) end),
+ maybe_emit_stats(State1),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
@@ -1010,6 +1017,10 @@ cert_info(F, #v1{sock = Sock}) ->
{ok, Cert} -> list_to_binary(F(Cert))
end.
+maybe_emit_stats(State) ->
+ rabbit_event:if_enabled(State, #v1.stats_timer,
+ fun() -> emit_stats(State) end).
+
emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
rabbit_event:reset_stats_timer(State, #v1.stats_timer).
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 3514e780..acdc2cff 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -84,12 +84,34 @@ internal_binary_to_type(TypeBin) when is_binary(TypeBin) ->
internal_register(Class, TypeName, ModuleName)
when is_atom(Class), is_binary(TypeName), is_atom(ModuleName) ->
ok = sanity_check_module(class_module(Class), ModuleName),
- true = ets:insert(?ETS_NAME,
- {{Class, internal_binary_to_type(TypeName)}, ModuleName}),
+ RegArg = {{Class, internal_binary_to_type(TypeName)}, ModuleName},
+ true = ets:insert(?ETS_NAME, RegArg),
+ conditional_register(RegArg),
ok.
internal_unregister(Class, TypeName) ->
- true = ets:delete(?ETS_NAME, {Class, internal_binary_to_type(TypeName)}),
+ UnregArg = {Class, internal_binary_to_type(TypeName)},
+ conditional_unregister(UnregArg),
+ true = ets:delete(?ETS_NAME, UnregArg),
+ ok.
+
+%% register exchange decorator route callback only when implemented,
+%% in order to avoid unnecessary decorator calls on the fast
+%% publishing path
+conditional_register({{exchange_decorator, Type}, ModuleName}) ->
+ case erlang:function_exported(ModuleName, route, 2) of
+ true -> true = ets:insert(?ETS_NAME,
+ {{exchange_decorator_route, Type},
+ ModuleName});
+ false -> ok
+ end;
+conditional_register(_) ->
+ ok.
+
+conditional_unregister({exchange_decorator, Type}) ->
+ true = ets:delete(?ETS_NAME, {exchange_decorator_route, Type}),
+ ok;
+conditional_unregister(_) ->
ok.
sanity_check_module(ClassModule, Module) ->
@@ -104,12 +126,11 @@ sanity_check_module(ClassModule, Module) ->
true -> ok
end.
-class_module(exchange) -> rabbit_exchange_type;
-class_module(auth_mechanism) -> rabbit_auth_mechanism;
-class_module(runtime_parameter) -> rabbit_runtime_parameter;
-class_module(exchange_decorator) -> rabbit_exchange_decorator;
-class_module(exchange_decorator_route) -> rabbit_exchange_decorator;
-class_module(policy_validator) -> rabbit_policy_validator.
+class_module(exchange) -> rabbit_exchange_type;
+class_module(auth_mechanism) -> rabbit_auth_mechanism;
+class_module(runtime_parameter) -> rabbit_runtime_parameter;
+class_module(exchange_decorator) -> rabbit_exchange_decorator;
+class_module(policy_validator) -> rabbit_policy_validator.
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1188c554..b2c80364 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1100,20 +1100,14 @@ test_policy_validation() ->
test_server_status() ->
%% create a few things so there is some useful information to list
- Writer = spawn(fun test_writer/0),
- {ok, Ch} = rabbit_channel:start_link(
- 1, self(), Writer, self(), "", rabbit_framing_amqp_0_9_1,
- user(<<"user">>), <<"/">>, [], self(),
- rabbit_limiter:make_token(self())),
+ {_Writer, Limiter, Ch} = test_channel(),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
[rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, Name),
false, false, [], none)]],
-
ok = rabbit_amqqueue:basic_consume(
- Q, true, Ch, rabbit_limiter:make_token(),
- <<"ctag">>, true, undefined),
+ Q, true, Ch, Limiter, false, <<"ctag">>, true, undefined),
%% list queues
ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),
@@ -1191,8 +1185,6 @@ find_listener() ->
N =:= node()],
{H, P}.
-test_writer() -> test_writer(none).
-
test_writer(Pid) ->
receive
{'$gen_call', From, flush} -> gen_server:reply(From, ok),
@@ -1202,13 +1194,17 @@ test_writer(Pid) ->
shutdown -> ok
end.
-test_spawn() ->
+test_channel() ->
Me = self(),
Writer = spawn(fun () -> test_writer(Me) end),
+ {ok, Limiter} = rabbit_limiter:start_link(),
{ok, Ch} = rabbit_channel:start_link(
1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1,
- user(<<"guest">>), <<"/">>, [], Me,
- rabbit_limiter:make_token(self())),
+ user(<<"guest">>), <<"/">>, [], Me, Limiter),
+ {Writer, Limiter, Ch}.
+
+test_spawn() ->
+ {Writer, _Limiter, Ch} = test_channel(),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok)
@@ -1580,7 +1576,7 @@ control_action(Command, Node, Args, Opts) ->
info_action(Command, Args, CheckVHost) ->
ok = control_action(Command, []),
- if CheckVHost -> ok = control_action(Command, []);
+ if CheckVHost -> ok = control_action(Command, [], ["-p", "/"]);
true -> ok
end,
ok = control_action(Command, lists:map(fun atom_to_list/1, Args)),
@@ -2722,12 +2718,13 @@ test_queue_recover() ->
end,
rabbit_amqqueue:stop(),
rabbit_amqqueue:start(rabbit_amqqueue:recover()),
+ {ok, Limiter} = rabbit_limiter:start_link(),
rabbit_amqqueue:with_or_die(
QName,
fun (Q1 = #amqqueue { pid = QPid1 }) ->
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
- rabbit_amqqueue:basic_get(Q1, self(), false),
+ rabbit_amqqueue:basic_get(Q1, self(), false, Limiter),
exit(QPid1, shutdown),
VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1}, VQ2} =
@@ -2748,9 +2745,11 @@ test_variable_queue_delete_msg_store_files_callback() ->
rabbit_amqqueue:set_ram_duration_target(QPid, 0),
+ {ok, Limiter} = rabbit_limiter:start_link(),
+
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} =
- rabbit_amqqueue:basic_get(Q, self(), true),
+ rabbit_amqqueue:basic_get(Q, self(), true, Limiter),
{ok, CountMinusOne} = rabbit_amqqueue:purge(Q),
%% give the queue a second to receive the close_fds callback msg
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index d0f39221..2858cf58 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -70,6 +70,7 @@ add(VHostPath) ->
{<<"amq.rabbitmq.trace">>, topic}]],
ok
end),
+ rabbit_event:notify(vhost_created, info(VHostPath)),
R.
delete(VHostPath) ->
@@ -87,6 +88,7 @@ delete(VHostPath) ->
with(VHostPath, fun () ->
ok = internal_delete(VHostPath)
end)),
+ ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]),
R.
internal_delete(VHostPath) ->