summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-03-20 17:28:29 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-03-20 17:28:29 +0000
commitb15b247899318309bc75840f2b37dde3f548d255 (patch)
treef43764194ab766fc3120c5d3d21372856d6bf416
parente5a86b2fd17a316327975ce99bb9a6e81aadef37 (diff)
parent23e11ed50ee7fbb5406d349e0bb8c85696ea9611 (diff)
downloadrabbitmq-server-b15b247899318309bc75840f2b37dde3f548d255.tar.gz
First attempt at merging these two bugs; this compiles at least.
-rw-r--r--packaging/debs/apt-repository/distributions2
-rw-r--r--src/rabbit_alarm.erl14
-rw-r--r--src/rabbit_amqqueue.erl18
-rw-r--r--src/rabbit_amqqueue_process.erl132
-rw-r--r--src/rabbit_channel.erl50
-rw-r--r--src/rabbit_limiter.erl84
-rw-r--r--src/rabbit_misc.erl51
-rw-r--r--src/rabbit_node_monitor.erl37
-rw-r--r--src/rabbit_tests.erl24
9 files changed, 362 insertions, 50 deletions
diff --git a/packaging/debs/apt-repository/distributions b/packaging/debs/apt-repository/distributions
index 61fd778a..75b9fe46 100644
--- a/packaging/debs/apt-repository/distributions
+++ b/packaging/debs/apt-repository/distributions
@@ -2,6 +2,6 @@ Origin: RabbitMQ
Label: RabbitMQ Repository for Debian / Ubuntu etc
Suite: testing
Codename: kitten
-Architectures: AVR32 alpha amd64 arm armel armhf hppa hurd-i386 i386 ia64 kfreebsd-amd64 kfreebsd-i386 m32 m68k mips mipsel netbsd-alpha netbsd-i386 powerpc s390 s390x sh sparc
+Architectures: AVR32 alpha amd64 arm armel armhf hppa hurd-i386 i386 ia64 kfreebsd-amd64 kfreebsd-i386 m32 m68k mips mipsel netbsd-alpha netbsd-i386 powerpc s390 s390x sh sparc source
Components: main
Description: RabbitMQ Repository for Debian / Ubuntu etc
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 362b11aa..6d24d130 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -208,11 +208,19 @@ internal_register(Pid, {M, F, A} = AlertMFA,
State#alarms{alertees = NewAlertees}.
handle_set_alarm({{resource_limit, Source, Node}, []}, State) ->
- rabbit_log:warning("~s resource limit alarm set on node ~p~n",
- [Source, Node]),
+ rabbit_log:warning(
+ "~s resource limit alarm set on node ~p.~n~n"
+ "**********************************************************~n"
+ "*** Publishers will be blocked until this alarm clears ***~n"
+ "**********************************************************~n",
+ [Source, Node]),
{ok, maybe_alert(fun dict:append/3, Node, Source, State)};
handle_set_alarm({file_descriptor_limit, []}, State) ->
- rabbit_log:warning("file descriptor limit alarm set~n"),
+ rabbit_log:warning(
+ "file descriptor limit alarm set.~n~n"
+ "********************************************************************~n"
+ "*** New connections will not be accepted until this alarm clears ***~n"
+ "********************************************************************~n"),
{ok, State};
handle_set_alarm(Alarm, State) ->
rabbit_log:warning("alarm '~p' set~n", [Alarm]),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3f0a7f9c..2dfed21d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,9 +26,9 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, wake_up/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/4, basic_consume/8, basic_cancel/4]).
+-export([basic_get/4, basic_consume/9, basic_cancel/4]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
--export([notify_down_all/2, activate_limit_all/2]).
+-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
@@ -147,9 +147,11 @@
-spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()).
-spec(basic_get/4 :: (rabbit_types:amqqueue(), pid(), boolean(), pid()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
--spec(basic_consume/8 ::
+-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(),
+ non_neg_integer(), boolean()) -> 'ok').
+-spec(basic_consume/9 ::
(rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
- rabbit_types:ctag(), boolean(), any())
+ rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
@@ -540,13 +542,17 @@ notify_down_all(QPids, ChPid) ->
activate_limit_all(QPids, ChPid) ->
delegate:cast(QPids, {activate_limit, ChPid}).
+credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) ->
+ delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}).
+
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
+
basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, OkMsg) ->
+ ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) ->
delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, OkMsg}).
+ ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6fc79dca..8f325e5c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -66,8 +66,14 @@
monitor_ref,
acktags,
consumer_count,
+ %% Queue of {ChPid, #consumer{}} for consumers which have
+ %% been blocked for any reason
blocked_consumers,
+ %% The limiter itself
limiter,
+ %% Has the limiter imposed a channel-wide block, either
+ %% because of qos or channel flow?
+ %% Internal flow control for queue -> writer
unsent_message_count}).
%%----------------------------------------------------------------------------
@@ -404,6 +410,21 @@ block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
+maybe_send_drained(WasEmpty, State) ->
+ case (not WasEmpty) andalso is_empty(State) of
+ true -> [send_drained(C) || C <- all_ch_record()];
+ false -> ok
+ end,
+ State.
+
+send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
+ case rabbit_limiter:drained(Limiter) of
+ {[], Limiter} -> ok;
+ {CTagCredit, Limiter2} -> rabbit_channel:send_drained(
+ ChPid, CTagCredit),
+ update_ch_record(C#cr{limiter = Limiter2})
+ end.
+
deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
@@ -424,14 +445,18 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
true -> block_consumer(C, E),
{false, State};
false -> case rabbit_limiter:can_send(C#cr.limiter,
- Consumer#consumer.ack_required) of
- {suspend, Limiter} ->
- block_consumer(C#cr{limiter = Limiter}, E),
+ Consumer#consumer.ack_required,
+ Consumer#consumer.tag) of
+ consumer_blocked ->
+ block_consumer(C, E),
{false, State};
- {continue, Limiter} ->
+ channel_blocked ->
+ block_consumer(C, E),
+ {false, State};
+ Limiter2 ->
AC1 = queue:in(E, State#q.active_consumers),
deliver_msg_to_consumer(
- DeliverFun, Consumer, C#cr{limiter = Limiter},
+ DeliverFun, Consumer, C#cr{limiter = Limiter2},
State#q{active_consumers = AC1})
end
end.
@@ -583,14 +608,16 @@ maybe_drop_head(State = #q{max_length = MaxLen,
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
+ WasEmpty = BQ:is_empty(BQS),
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
{_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}),
- run_message_queue(drop_expired_msgs(State1)).
+ run_message_queue(maybe_send_drained(WasEmpty, drop_expired_msgs(State1))).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
- {Result, drop_expired_msgs(State#q{backing_queue_state = BQS1})}.
+ State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}),
+ {Result, maybe_send_drained(Result =:= empty, State1)}.
ack(AckTags, ChPid, State) ->
subtract_acks(ChPid, AckTags, State,
@@ -619,20 +646,29 @@ remove_consumers(ChPid, Queue, QName) ->
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
- not_found ->
- State;
- C ->
- C1 = Update(C),
- case is_ch_blocked(C) andalso not is_ch_blocked(C1) of
- false -> update_ch_record(C1),
+ not_found -> State;
+ C -> possibly_unblock(State, Update(C))
+ end.
+
+possibly_unblock(State, C = #cr{limiter = Limiter}) ->
+ case is_ch_blocked(C) of
+ true -> update_ch_record(C),
+ State;
+ false -> case lists:partition(
+ fun({_ChPid, #consumer{tag = CTag}}) ->
+ rabbit_limiter:is_consumer_blocked(
+ Limiter, CTag)
+ end, queue:to_list(C#cr.blocked_consumers)) of
+ {_, []} ->
+ update_ch_record(C),
State;
- true -> #cr{blocked_consumers = Consumers} = C1,
- update_ch_record(
- C1#cr{blocked_consumers = queue:new()}),
- AC1 = queue:join(State#q.active_consumers,
- Consumers),
+ {Blocked, Unblocked} ->
+ BlockedQ = queue:from_list(Blocked),
+ UnblockedQ = queue:from_list(Unblocked),
+ update_ch_record(C#cr{blocked_consumers = BlockedQ}),
+ AC1 = queue:join(State#q.active_consumers, UnblockedQ),
run_message_queue(State#q{active_consumers = AC1})
- end
+ end
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
@@ -729,6 +765,11 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
+%% Logically this function should invoke maybe_send_drained/2.
+%% However, that is expensive. Since some frequent callers of
+%% drop_expired_msgs/1, in particular deliver_or_enqueue/3, cannot
+%% possibly cause the queue to become empty, we push the
+%% responsibility to the callers. So be cautious when adding new ones.
drop_expired_msgs(State) ->
case is_empty(State) of
true -> State;
@@ -1111,7 +1152,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, OkMsg},
+ ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg},
_From, State = #q{exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
@@ -1119,12 +1160,23 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ok ->
C = #cr{consumer_count = Count,
limiter = Limiter} = ch_record(ChPid, LimiterPid),
- Limiter1 = case LimiterActive of
- true -> rabbit_limiter:activate(Limiter);
- false -> Limiter
+ Limiter1 = case CreditArgs of
+ none ->
+ Limiter;
+ {Credit, Drain} ->
+ rabbit_limiter:credit(
+ Limiter, ConsumerTag, Credit, Drain)
+ end,
+ Limiter2 = case LimiterActive of
+ true -> rabbit_limiter:activate(Limiter1);
+ false -> Limiter1
end,
- update_ch_record(C#cr{consumer_count = Count + 1,
- limiter = Limiter1}),
+ C1 = update_ch_record(C#cr{consumer_count = Count + 1,
+ limiter = Limiter2}),
+ case is_empty(State) of
+ true -> send_drained(C1);
+ false -> ok
+ end,
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
@@ -1149,13 +1201,14 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
limiter = Limiter,
blocked_consumers = Blocked} ->
emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
+ Limiter1 = rabbit_limiter:forget_consumer(Limiter, ConsumerTag),
Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
- Limiter1 = case Count of
- 1 -> rabbit_limiter:deactivate(Limiter);
- _ -> Limiter
+ Limiter2 = case Count of
+ 1 -> rabbit_limiter:deactivate(Limiter1);
+ _ -> Limiter1
end,
update_ch_record(C#cr{consumer_count = Count - 1,
- limiter = Limiter1,
+ limiter = Limiter2,
blocked_consumers = Blocked1}),
State1 = State#q{
exclusive_consumer = case Holder of
@@ -1189,7 +1242,8 @@ handle_call({delete, IfUnused, IfEmpty}, From,
handle_call(purge, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Count, BQS1} = BQ:purge(BQS),
- reply({ok, Count}, State#q{backing_queue_state = BQS1});
+ State1 = State#q{backing_queue_state = BQS1},
+ reply({ok, Count}, maybe_send_drained(Count =:= 0, State1));
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
@@ -1335,6 +1389,20 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
noreply(State#q{backing_queue = BQ1,
backing_queue_state = BQS1});
+handle_cast({credit, ChPid, CTag, Credit, Drain},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ Len = BQ:len(BQS),
+ rabbit_channel:send_credit_reply(ChPid, Len),
+ C = #cr{limiter = Lim} = lookup_ch(ChPid),
+ C1 = C#cr{limiter = rabbit_limiter:credit(Lim, CTag, Credit, Drain)},
+ noreply(case Drain andalso Len == 0 of
+ true -> update_ch_record(C1),
+ send_drained(C1),
+ State;
+ false -> possibly_unblock(State, C1)
+ end);
+
handle_cast(wake_up, State) ->
noreply(State).
@@ -1354,7 +1422,9 @@ handle_info(maybe_expire, State) ->
end;
handle_info(drop_expired, State) ->
- noreply(drop_expired_msgs(State#q{ttl_timer_ref = undefined}));
+ WasEmpty = is_empty(State),
+ State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}),
+ noreply(maybe_send_drained(WasEmpty, State1));
handle_info(emit_stats, State) ->
emit_stats(State),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 1787d688..5af112e2 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -21,7 +21,8 @@
-behaviour(gen_server2).
-export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2]).
+-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2,
+ flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([force_event_refresh/0]).
@@ -94,6 +95,9 @@
-spec(deliver/4 ::
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
+-spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}])
+ -> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
@@ -138,6 +142,12 @@ send_command(Pid, Msg) ->
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
+send_credit_reply(Pid, Len) ->
+ gen_server2:cast(Pid, {send_credit_reply, Len}).
+
+send_drained(Pid, CTagCredit) ->
+ gen_server2:cast(Pid, {send_drained, CTagCredit}).
+
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
@@ -315,6 +325,19 @@ handle_cast({deliver, ConsumerTag, AckRequired,
Content),
noreply(record_sent(ConsumerTag, AckRequired, Msg, State));
+handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(
+ WriterPid, #'basic.credit_ok'{available = Len}),
+ noreply(State);
+
+handle_cast({send_drained, CTagCredit},
+ State = #ch{writer_pid = WriterPid}) ->
+ [ok = rabbit_writer:send_command(
+ WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag,
+ credit_drained = CreditDrained})
+ || {ConsumerTag, CreditDrained} <- CTagCredit],
+ noreply(State);
+
handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
noreply(State);
@@ -709,7 +732,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_local = _, % FIXME: implement
no_ack = NoAck,
exclusive = ExclusiveConsume,
- nowait = NoWait},
+ nowait = NoWait,
+ arguments = Arguments},
_, State = #ch{conn_pid = ConnPid,
limiter = Limiter,
consumer_mapping = ConsumerMapping}) ->
@@ -735,6 +759,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
rabbit_limiter:pid(Limiter),
rabbit_limiter:is_active(Limiter),
ActualConsumerTag, ExclusiveConsume,
+ parse_credit_args(Arguments),
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
Q}
@@ -1110,6 +1135,17 @@ handle_method(#'channel.flow'{active = false}, _,
State1#ch{blocking = sets:from_list(QPids)})}
end;
+handle_method(#'basic.credit'{consumer_tag = CTag,
+ credit = Credit,
+ drain = Drain}, _,
+ State = #ch{consumer_mapping = Consumers}) ->
+ case dict:find(CTag, Consumers) of
+ {ok, Q} -> ok = rabbit_amqqueue:credit(
+ Q, self(), CTag, Credit, Drain),
+ {noreply, State};
+ error -> precondition_failed("unknown consumer tag '~s'", [CTag])
+ end;
+
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
command_invalid, "unimplemented method", []).
@@ -1176,6 +1212,16 @@ handle_consuming_queue_down(QPid,
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
+parse_credit_args(Arguments) ->
+ case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
+ {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, Credit}, {boolean, Drain}} -> {Credit, Drain};
+ _ -> none
+ end;
+ undefined -> none
+ end.
+
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 430c2716..71ed2e73 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -117,10 +117,14 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/3]).
+-export([is_consumer_blocked/2, credit/4, drained/1, forget_consumer/2]).
+
+-import(rabbit_misc, [serial_add/2, serial_diff/2]).
+
%%----------------------------------------------------------------------------
-record(lstate, {pid, prefetch_limited, blocked}).
--record(qstate, {pid, state}).
+-record(qstate, {pid, state, credits}).
-ifdef(use_specs).
@@ -137,6 +141,8 @@
-> lstate()).
-spec(unlimit_prefetch/1 :: (lstate()) -> lstate()).
-spec(block/1 :: (lstate()) -> lstate()).
+-spec(can_send/4 :: (qstate(), pid(), boolean(), rabbit_types:ctag())
+ -> qstate() | 'consumer_blocked' | 'channel_blocked').
-spec(unblock/1 :: (lstate()) -> lstate()).
-spec(is_prefetch_limited/1 :: (lstate()) -> boolean()).
-spec(is_blocked/1 :: (lstate()) -> boolean()).
@@ -152,6 +158,12 @@
-spec(resume/1 :: (qstate()) -> qstate()).
-spec(deactivate/1 :: (qstate()) -> qstate()).
-spec(is_suspended/1 :: (qstate()) -> boolean()).
+-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()).
+-spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean())
+ -> qstate()).
+-spec(drained/1 :: (qstate())
+ -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}).
+-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()).
-endif.
@@ -166,6 +178,8 @@
%% notified of a change in the limit or volume that may allow it to
%% deliver more messages via the limiter's channel.
+-record(credit, {credit = 0, drain = false}).
+
%%----------------------------------------------------------------------------
%% API
%%----------------------------------------------------------------------------
@@ -196,6 +210,24 @@ unblock(L) ->
is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited.
+can_send(Token = #qstate{pid = Pid, state = State, credits = Credits},
+ QPid, AckReq, CTag) ->
+ case is_consumer_blocked(Token, CTag) of
+ false -> case State =/= active orelse call_can_send(Pid, QPid, AckReq) of
+ true -> Token#qstate{
+ credits = record_send_q(CTag, Credits)};
+ false -> channel_blocked
+ end;
+ true -> consumer_blocked
+ end.
+
+call_can_send(Pid, QPid, AckRequired) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> true end,
+ fun () ->
+ gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity)
+ end).
+
is_blocked(#lstate{blocked = Blocked}) -> Blocked.
is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L).
@@ -208,7 +240,7 @@ ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}).
pid(#lstate{pid = Pid}) -> Pid.
-client(Pid) -> #qstate{pid = Pid, state = dormant}.
+client(Pid) -> #qstate{pid = Pid, state = dormant, credits = gb_trees:empty()}.
activate(L = #qstate{state = dormant}) ->
ok = gen_server:cast(L#qstate.pid, {register, self()}),
@@ -236,6 +268,54 @@ deactivate(L) ->
is_suspended(#qstate{state = suspended}) -> true;
is_suspended(#qstate{}) -> false.
+is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
+ case gb_trees:lookup(CTag, Credits) of
+ {value, #credit{credit = C}} when C > 0 -> false;
+ {value, #credit{}} -> true;
+ none -> false
+ end.
+
+
+credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) ->
+ Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}.
+
+drained(Limiter = #qstate{credits = Credits}) ->
+ {CTagCredits, Credits2} =
+ rabbit_misc:gb_trees_fold(
+ fun (CTag, #credit{credit = C, drain = true}, {Acc, Creds0}) ->
+ {[{CTag, C} | Acc], update_credit(CTag, 0, false, Creds0)};
+ (_CTag, #credit{credit = _C, drain = false}, {Acc, Creds0}) ->
+ {Acc, Creds0}
+ end, {[], Credits}, Credits),
+ {CTagCredits, Limiter#qstate{credits = Credits2}}.
+
+forget_consumer(Limiter = #qstate{credits = Credits}, CTag) ->
+ Limiter#qstate{credits = gb_trees:delete_any(CTag, Credits)}.
+
+%%----------------------------------------------------------------------------
+%% Queue-local code
+%%----------------------------------------------------------------------------
+
+%% We want to do all the AMQP 1.0-ish link level credit calculations
+%% in the queue (to do them elsewhere introduces a ton of
+%% races). However, it's a big chunk of code that is conceptually very
+%% linked to the limiter concept. So we get the queue to hold a bit of
+%% state for us (#qstate.credits), and maintain a fiction that the
+%% limiter is making the decisions...
+
+record_send_q(CTag, Credits) ->
+ case gb_trees:lookup(CTag, Credits) of
+ {value, #credit{credit = Credit, drain = Drain}} ->
+ update_credit(CTag, Credit - 1, Drain, Credits);
+ none ->
+ Credits
+ end.
+
+update_credit(CTag, Credit, Drain, Credits) ->
+ %% Using up all credit implies no need to send a 'drained' event
+ Drain1 = Drain andalso Credit > 0,
+ gb_trees:enter(CTag, #credit{credit = Credit, drain = Drain1}, Credits).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index c36fb147..135f6443 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -69,6 +69,7 @@
-export([interval_operation/4]).
-export([ensure_timer/4, stop_timer/2]).
-export([get_parent/0]).
+-export([serial_add/2, serial_compare/2, serial_diff/2]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -83,6 +84,7 @@
-ifdef(use_specs).
-export_type([resource_name/0, thunk/1]).
+-export_type([serial_number/0]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
@@ -95,6 +97,8 @@
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])).
-type(graph_edge_fun() ::
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])).
+-type(serial_number() :: non_neg_integer()).
+-type(serial_compare_result() :: 'equal' | 'less' | 'greater').
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
@@ -246,6 +250,12 @@
-spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A).
-spec(stop_timer/2 :: (A, non_neg_integer()) -> A).
-spec(get_parent/0 :: () -> pid()).
+-spec(serial_add/2 :: (serial_number(), non_neg_integer()) ->
+ serial_number()).
+-spec(serial_compare/2 :: (serial_number(), serial_number()) ->
+ serial_compare_result()).
+-spec(serial_diff/2 :: (serial_number(), serial_number()) ->
+ integer()).
-endif.
%%----------------------------------------------------------------------------
@@ -1099,3 +1109,44 @@ whereis_name(Name) ->
%% End copypasta from gen_server2.erl
%% -------------------------------------------------------------------------
+
+%% Serial arithmetic for unsigned ints.
+%% http://www.faqs.org/rfcs/rfc1982.html
+%% SERIAL_BITS = 32
+
+%% 2 ^ SERIAL_BITS
+-define(SERIAL_MAX, 16#100000000).
+%% 2 ^ (SERIAL_BITS - 1) - 1
+-define(SERIAL_MAX_ADDEND, 16#7fffffff).
+
+serial_add(S, N) when N =< ?SERIAL_MAX_ADDEND ->
+ (S + N) rem ?SERIAL_MAX;
+serial_add(S, N) ->
+ exit({out_of_bound_serial_addition, S, N}).
+
+serial_compare(A, B) ->
+ if A =:= B ->
+ equal;
+ (A < B andalso B - A < ?SERIAL_MAX_ADDEND) orelse
+ (A > B andalso A - B > ?SERIAL_MAX_ADDEND) ->
+ less;
+ (A < B andalso B - A > ?SERIAL_MAX_ADDEND) orelse
+ (A > B andalso B - A < ?SERIAL_MAX_ADDEND) ->
+ greater;
+ true -> exit({indeterminate_serial_comparison, A, B})
+ end.
+
+-define(SERIAL_DIFF_BOUND, 16#80000000).
+
+serial_diff(A, B) ->
+ Diff = A - B,
+ if Diff > (?SERIAL_DIFF_BOUND) ->
+ %% B is actually greater than A
+ - (?SERIAL_MAX - Diff);
+ Diff < - (?SERIAL_DIFF_BOUND) ->
+ ?SERIAL_MAX + Diff;
+ Diff < ?SERIAL_DIFF_BOUND andalso Diff > -?SERIAL_DIFF_BOUND ->
+ Diff;
+ true ->
+ exit({indeterminate_serial_diff, A, B})
+ end.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 98e26a6a..de53b7f0 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -249,7 +249,8 @@ handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason},
write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}),
ok = handle_dead_rabbit(Node),
[P ! {node_down, Node} || P <- pmon:monitored(Subscribers)],
- {noreply, State#state{monitors = pmon:erase({rabbit, Node}, Monitors)}};
+ {noreply, handle_dead_rabbit_state(
+ State#state{monitors = pmon:erase({rabbit, Node}, Monitors)})};
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state{subscribers = Subscribers}) ->
@@ -257,10 +258,19 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
handle_info({mnesia_system_event,
{inconsistent_database, running_partitioned_network, Node}},
- State = #state{partitions = Partitions}) ->
+ State = #state{partitions = Partitions,
+ monitors = Monitors}) ->
+ %% We will not get a node_up from this node - yet we should treat it as
+ %% up (mostly).
+ State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
+ true -> State;
+ false -> State#state{
+ monitors = pmon:monitor({rabbit, Node}, Monitors)}
+ end,
+ ok = handle_live_rabbit(Node),
Partitions1 = ordsets:to_list(
ordsets:add_element(Node, ordsets:from_list(Partitions))),
- {noreply, State#state{partitions = Partitions1}};
+ {noreply, State1#state{partitions = Partitions1}};
handle_info(_Info, State) ->
{noreply, State}.
@@ -299,9 +309,14 @@ handle_dead_rabbit(Node) ->
ok.
majority() ->
+ length(alive_nodes()) / length(rabbit_mnesia:cluster_nodes(all)) > 0.5.
+
+%% mnesia:system_info(db_nodes) (and hence
+%% rabbit_mnesia:cluster_nodes(running)) does not give reliable results
+%% when partitioned.
+alive_nodes() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
- Alive = [N || N <- Nodes, pong =:= net_adm:ping(N)],
- length(Alive) / length(Nodes) > 0.5.
+ [N || N <- Nodes, pong =:= net_adm:ping(N)].
await_cluster_recovery() ->
rabbit_log:warning("Cluster minority status detected - awaiting recovery~n",
@@ -325,6 +340,18 @@ wait_for_cluster_recovery(Nodes) ->
wait_for_cluster_recovery(Nodes)
end.
+handle_dead_rabbit_state(State = #state{partitions = Partitions}) ->
+ %% If we have been partitioned, and we are now in the only remaining
+ %% partition, we no longer care about partitions - forget them. Note
+ %% that we do not attempt to deal with individual (other) partitions
+ %% going away. It's only safe to forget anything about partitions when
+ %% there are no partitions.
+ Partitions1 = case Partitions -- (Partitions -- alive_nodes()) of
+ [] -> [];
+ _ -> Partitions
+ end,
+ State#state{partitions = Partitions1}.
+
handle_live_rabbit(Node) ->
ok = rabbit_alarm:on_node_up(Node),
ok = rabbit_mnesia:on_node_up(Node).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d1ae38be..f56fe8ee 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -50,6 +50,7 @@ all_tests() ->
passed = test_table_codec(),
passed = test_content_framing(),
passed = test_content_transcoding(),
+ passed = test_serial_arithmetic(),
passed = test_topic_matching(),
passed = test_log_management(),
passed = test_app_management(),
@@ -559,6 +560,29 @@ sequence_with_content(Sequence) ->
rabbit_framing_amqp_0_9_1),
Sequence).
+test_serial_arithmetic() ->
+ 1 = rabbit_misc:serial_add(0, 1),
+ 16#7fffffff = rabbit_misc:serial_add(0, 16#7fffffff),
+ 0 = rabbit_misc:serial_add(16#ffffffff, 1),
+ %% Cannot add more than 2 ^ 31 - 1
+ case catch rabbit_misc:serial_add(200, 16#80000000) of
+ {'EXIT', {out_of_bound_serial_addition, _, _}} -> ok;
+ _ -> exit(fail_out_of_bound_serial_addition)
+ end,
+
+ 1 = rabbit_misc:serial_diff(1, 0),
+ 2 = rabbit_misc:serial_diff(1, 16#ffffffff),
+ -2 = rabbit_misc:serial_diff(16#ffffffff, 1),
+ case catch rabbit_misc:serial_diff(0, 16#80000000) of
+ {'EXIT', {indeterminate_serial_diff, _, _}} -> ok;
+ _ -> exit(fail_indeterminate_serial_difference)
+ end,
+ case catch rabbit_misc:serial_diff(16#ffffffff, 16#7fffffff) of
+ {'EXIT', {indeterminate_serial_diff, _, _}} -> ok;
+ _ -> exit(fail_indeterminate_serial_difference)
+ end,
+ passed.
+
test_topic_matching() ->
XName = #resource{virtual_host = <<"/">>,
kind = exchange,