diff options
author | Tim Watson <tim@rabbitmq.com> | 2013-04-16 17:07:09 +0100 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2013-04-16 17:07:09 +0100 |
commit | 463fbb4b0c80059d7ad7a5908e06689587f38d50 (patch) | |
tree | f3687be0e0e9a55944e0c206bef960aff604c8a0 | |
parent | 0c0b55cbb63bdb3b3f3a6604c8aa5690ef23222c (diff) | |
parent | 160573b71c48e05a32965e287fe9b184b5fa8da1 (diff) | |
download | rabbitmq-server-463fbb4b0c80059d7ad7a5908e06689587f38d50.tar.gz |
merge default
-rw-r--r-- | packaging/debs/Debian/debian/control | 2 | ||||
-rw-r--r-- | src/file_handle_cache.erl | 4 | ||||
-rw-r--r-- | src/gen_server2.erl | 38 | ||||
-rw-r--r-- | src/gm.erl | 8 | ||||
-rw-r--r-- | src/priority_queue.erl | 70 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 17 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 251 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 59 | ||||
-rw-r--r-- | src/rabbit_client_sup.erl | 13 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 144 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 10 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 21 | ||||
-rw-r--r-- | src/rabbit_msg_store_gc.erl | 6 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 52 | ||||
-rw-r--r-- | src/rabbit_nodes.erl | 11 | ||||
-rw-r--r-- | src/rabbit_ssl.erl | 29 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 | ||||
-rw-r--r-- | src/worker_pool_worker.erl | 6 |
18 files changed, 468 insertions, 275 deletions
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index d4526d87..3a15c4b6 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -9,7 +9,7 @@ Standards-Version: 3.9.2 Package: rabbitmq-server Architecture: all -Depends: erlang-nox (>= 1:12.b.3), adduser, logrotate, ${misc:Depends} +Depends: erlang-nox (>= 1:12.b.3) | esl-erlang, adduser, logrotate, ${misc:Depends} Description: AMQP server written in Erlang RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index d2d4d295..406add8a 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -152,7 +152,7 @@ -export([ulimit/0]). -export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2, - handle_info/2, terminate/2, code_change/3, prioritise_cast/2]). + handle_info/2, terminate/2, code_change/3, prioritise_cast/3]). -define(SERVER, ?MODULE). -define(RESERVED_FOR_OTHERS, 100). @@ -848,7 +848,7 @@ init([AlarmSet, AlarmClear]) -> alarm_set = AlarmSet, alarm_clear = AlarmClear }}. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of {release, _, _} -> 5; _ -> 0 diff --git a/src/gen_server2.erl b/src/gen_server2.erl index c82327a2..9109febd 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -16,12 +16,15 @@ %% The original code could reorder messages when communicating with a %% process on a remote node that was not currently connected. %% -%% 4) The callback module can optionally implement prioritise_call/3, -%% prioritise_cast/2 and prioritise_info/2. These functions take -%% Message, From and State or just Message and State and return a -%% single integer representing the priority attached to the message. -%% Messages with higher priorities are processed before requests with -%% lower priorities. The default priority is 0. +%% 4) The callback module can optionally implement prioritise_call/4, +%% prioritise_cast/3 and prioritise_info/3. These functions take +%% Message, From, Length and State or just Message, Length and State +%% (where Length is the current number of messages waiting to be +%% processed) and return a single integer representing the priority +%% attached to the message, or 'drop' to ignore it (for +%% prioritise_cast/3 and prioritise_info/3 only). Messages with +%% higher priorities are processed before requests with lower +%% priorities. The default priority is 0. %% %% 5) The callback module can optionally implement %% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be @@ -649,6 +652,9 @@ in({system, _From, _Req} = Input, GS2State) -> in(Input, GS2State = #gs2_state { prioritisers = {_, _, F} }) -> in(Input, F(Input, GS2State), GS2State). +in(_Input, drop, GS2State) -> + GS2State; + in(Input, Priority, GS2State = #gs2_state { queue = Queue }) -> GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }. @@ -1148,27 +1154,33 @@ whereis_name(Name) -> end. find_prioritisers(GS2State = #gs2_state { mod = Mod }) -> - PCall = function_exported_or_default(Mod, 'prioritise_call', 3, + PCall = function_exported_or_default(Mod, 'prioritise_call', 4, fun (_Msg, _From, _State) -> 0 end), - PCast = function_exported_or_default(Mod, 'prioritise_cast', 2, + PCast = function_exported_or_default(Mod, 'prioritise_cast', 3, fun (_Msg, _State) -> 0 end), - PInfo = function_exported_or_default(Mod, 'prioritise_info', 2, + PInfo = function_exported_or_default(Mod, 'prioritise_info', 3, fun (_Msg, _State) -> 0 end), GS2State #gs2_state { prioritisers = {PCall, PCast, PInfo} }. function_exported_or_default(Mod, Fun, Arity, Default) -> case erlang:function_exported(Mod, Fun, Arity) of true -> case Arity of - 2 -> fun (Msg, GS2State = #gs2_state { state = State }) -> - case catch Mod:Fun(Msg, State) of + 3 -> fun (Msg, GS2State = #gs2_state { queue = Queue, + state = State }) -> + Length = priority_queue:len(Queue), + case catch Mod:Fun(Msg, Length, State) of + drop -> + drop; Res when is_integer(Res) -> Res; Err -> handle_common_termination(Err, Msg, GS2State) end end; - 3 -> fun (Msg, From, GS2State = #gs2_state { state = State }) -> - case catch Mod:Fun(Msg, From, State) of + 4 -> fun (Msg, From, GS2State = #gs2_state { queue = Queue, + state = State }) -> + Length = priority_queue:len(Queue), + case catch Mod:Fun(Msg, From, Length, State) of Res when is_integer(Res) -> Res; Err -> @@ -380,7 +380,7 @@ confirmed_broadcast/2, info/1, forget_group/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, prioritise_info/2]). + code_change/3, prioritise_info/3]). -ifndef(use_specs). -export([behaviour_info/1]). @@ -721,12 +721,12 @@ terminate(Reason, State = #state { module = Module, code_change(_OldVsn, State, _Extra) -> {ok, State}. -prioritise_info(flush, _State) -> +prioritise_info(flush, _Len, _State) -> 1; -prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, +prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len, #state { members_state = MS }) when MS /= undefined -> 1; -prioritise_info(_, _State) -> +prioritise_info(_, _Len, _State) -> 0. diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 02a0a1df..0dc19819 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -69,9 +69,9 @@ %%---------------------------------------------------------------------------- new() -> - {queue, [], []}. + {queue, [], [], 0}. -is_queue({queue, R, F}) when is_list(R), is_list(F) -> +is_queue({queue, R, F, L}) when is_list(R), is_list(F), is_integer(L) -> true; is_queue({pqueue, Queues}) when is_list(Queues) -> lists:all(fun ({infinity, Q}) -> is_queue(Q); @@ -80,17 +80,17 @@ is_queue({pqueue, Queues}) when is_list(Queues) -> is_queue(_) -> false. -is_empty({queue, [], []}) -> +is_empty({queue, [], [], 0}) -> true; is_empty(_) -> false. -len({queue, R, F}) when is_list(R), is_list(F) -> - length(R) + length(F); +len({queue, _R, _F, L}) -> + L; len({pqueue, Queues}) -> lists:sum([len(Q) || {_, Q} <- Queues]). -to_list({queue, In, Out}) when is_list(In), is_list(Out) -> +to_list({queue, In, Out, _Len}) when is_list(In), is_list(Out) -> [{0, V} || V <- Out ++ lists:reverse(In, [])]; to_list({pqueue, Queues}) -> [{maybe_negate_priority(P), V} || {P, Q} <- Queues, @@ -99,13 +99,13 @@ to_list({pqueue, Queues}) -> in(Item, Q) -> in(Item, 0, Q). -in(X, 0, {queue, [_] = In, []}) -> - {queue, [X], In}; -in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) -> - {queue, [X|In], Out}; -in(X, Priority, _Q = {queue, [], []}) -> +in(X, 0, {queue, [_] = In, [], 1}) -> + {queue, [X], In, 2}; +in(X, 0, {queue, In, Out, Len}) when is_list(In), is_list(Out) -> + {queue, [X|In], Out, Len + 1}; +in(X, Priority, _Q = {queue, [], [], 0}) -> in(X, Priority, {pqueue, []}); -in(X, Priority, Q = {queue, _, _}) -> +in(X, Priority, Q = {queue, _, _, _}) -> in(X, Priority, {pqueue, [{0, Q}]}); in(X, Priority, {pqueue, Queues}) -> P = maybe_negate_priority(Priority), @@ -113,33 +113,33 @@ in(X, Priority, {pqueue, Queues}) -> {value, {_, Q}} -> lists:keyreplace(P, 1, Queues, {P, in(X, Q)}); false when P == infinity -> - [{P, {queue, [X], []}} | Queues]; + [{P, {queue, [X], [], 1}} | Queues]; false -> case Queues of [{infinity, InfQueue} | Queues1] -> [{infinity, InfQueue} | - lists:keysort(1, [{P, {queue, [X], []}} | Queues1])]; + lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues1])]; _ -> - lists:keysort(1, [{P, {queue, [X], []}} | Queues]) + lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues]) end end}. -out({queue, [], []} = Q) -> +out({queue, [], [], 0} = Q) -> {empty, Q}; -out({queue, [V], []}) -> - {{value, V}, {queue, [], []}}; -out({queue, [Y|In], []}) -> +out({queue, [V], [], 1}) -> + {{value, V}, {queue, [], [], 0}}; +out({queue, [Y|In], [], Len}) -> [V|Out] = lists:reverse(In, []), - {{value, V}, {queue, [Y], Out}}; -out({queue, In, [V]}) when is_list(In) -> - {{value,V}, r2f(In)}; -out({queue, In,[V|Out]}) when is_list(In) -> - {{value, V}, {queue, In, Out}}; + {{value, V}, {queue, [Y], Out}, Len - 1}; +out({queue, In, [V], Len}) when is_list(In) -> + {{value,V}, r2f(In, Len - 1)}; +out({queue, In,[V|Out], Len}) when is_list(In) -> + {{value, V}, {queue, In, Out, Len - 1}}; out({pqueue, [{P, Q} | Queues]}) -> {R, Q1} = out(Q), NewQ = case is_empty(Q1) of true -> case Queues of - [] -> {queue, [], []}; + [] -> {queue, [], [], 0}; [{0, OnlyQ}] -> OnlyQ; [_|_] -> {pqueue, Queues} end; @@ -147,13 +147,13 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. -join(A, {queue, [], []}) -> +join(A, {queue, [], [], 0}) -> A; -join({queue, [], []}, B) -> +join({queue, [], [], 0}, B) -> B; -join({queue, AIn, AOut}, {queue, BIn, BOut}) -> - {queue, BIn, AOut ++ lists:reverse(AIn, BOut)}; -join(A = {queue, _, _}, {pqueue, BPQ}) -> +join({queue, AIn, AOut, ALen}, {queue, BIn, BOut, BLen}) -> + {queue, BIn, AOut ++ lists:reverse(AIn, BOut), ALen + BLen}; +join(A = {queue, _, _, _}, {pqueue, BPQ}) -> {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, BPQ), Post1 = case Post of @@ -162,7 +162,7 @@ join(A = {queue, _, _}, {pqueue, BPQ}) -> _ -> [ {0, A} | Post ] end, {pqueue, Pre ++ Post1}; -join({pqueue, APQ}, B = {queue, _, _}) -> +join({pqueue, APQ}, B = {queue, _, _, _}) -> {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, APQ), Post1 = case Post of @@ -185,10 +185,10 @@ merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity -> merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> merge(As, Bs, [ {PB, B} | Acc ]). -r2f([]) -> {queue, [], []}; -r2f([_] = R) -> {queue, [], R}; -r2f([X,Y]) -> {queue, [X], [Y]}; -r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}. +r2f([], 0) -> {queue, [], [], 0}; +r2f([_] = R, 1) -> {queue, [], R, 1}; +r2f([X,Y], 2) -> {queue, [X], [Y], 2}; +r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}. maybe_negate_priority(infinity) -> infinity; maybe_negate_priority(P) -> -P. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3f0a7f9c..8c00c85c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,9 +26,9 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, wake_up/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/4, basic_consume/8, basic_cancel/4]). +-export([basic_get/4, basic_consume/9, basic_cancel/4]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]). --export([notify_down_all/2, activate_limit_all/2]). +-export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, @@ -147,9 +147,11 @@ -spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(basic_get/4 :: (rabbit_types:amqqueue(), pid(), boolean(), pid()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). --spec(basic_consume/8 :: +-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), + non_neg_integer(), boolean()) -> 'ok'). +-spec(basic_consume/9 :: (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), - rabbit_types:ctag(), boolean(), any()) + rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). @@ -540,13 +542,16 @@ notify_down_all(QPids, ChPid) -> activate_limit_all(QPids, ChPid) -> delegate:cast(QPids, {activate_limit, ChPid}). +credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) -> + delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}). + basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, OkMsg) -> + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) -> delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, OkMsg}). + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index efe8efc4..3712a625 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -29,8 +29,8 @@ -export([init_with_backing_queue_state/7]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, prioritise_info/2, format_message_queue/2]). + handle_info/2, handle_pre_hibernate/1, prioritise_call/4, + prioritise_cast/3, prioritise_info/3, format_message_queue/2]). %% Queue's state -record(q, {q, @@ -49,10 +49,6 @@ ttl_timer_ref, ttl_timer_expiry, senders, - publish_seqno, - unconfirmed, - delayed_stop, - queue_monitors, dlx, dlx_routing_key, max_length, @@ -66,8 +62,12 @@ monitor_ref, acktags, consumer_count, + %% Queue of {ChPid, #consumer{}} for consumers which have + %% been blocked for any reason blocked_consumers, + %% The limiter itself limiter, + %% Internal flow control for queue -> writer unsent_message_count}). %%---------------------------------------------------------------------------- @@ -147,9 +147,6 @@ init_state(Q) -> has_had_consumers = false, active_consumers = queue:new(), senders = pmon:new(), - publish_seqno = 1, - unconfirmed = dtree:empty(), - queue_monitors = pmon:new(), msg_id_to_channel = gb_trees:empty(), status = running}, rabbit_event:init_stats_timer(State, #q.stats_timer). @@ -406,6 +403,21 @@ block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). +maybe_send_drained(WasEmpty, State) -> + case (not WasEmpty) andalso is_empty(State) of + true -> [send_drained(C) || C <- all_ch_record()]; + false -> ok + end, + State. + +send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> + case rabbit_limiter:drained(Limiter) of + {[], Limiter} -> ok; + {CTagCredit, Limiter2} -> rabbit_channel:send_drained( + ChPid, CTagCredit), + update_ch_record(C#cr{limiter = Limiter2}) + end. + deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; deliver_msgs_to_consumers(DeliverFun, false, @@ -426,7 +438,8 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> true -> block_consumer(C, E), {false, State}; false -> case rabbit_limiter:can_send(C#cr.limiter, - Consumer#consumer.ack_required) of + Consumer#consumer.ack_required, + Consumer#consumer.tag) of {suspend, Limiter} -> block_consumer(C#cr{limiter = Limiter}, E), {false, State}; @@ -585,14 +598,16 @@ maybe_drop_head(State = #q{max_length = MaxLen, requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + WasEmpty = BQ:is_empty(BQS), {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), {_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}), - run_message_queue(drop_expired_msgs(State1)). + run_message_queue(maybe_send_drained(WasEmpty, drop_expired_msgs(State1))). fetch(AckRequired, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), - {Result, drop_expired_msgs(State#q{backing_queue_state = BQS1})}. + State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}), + {Result, maybe_send_drained(Result =:= empty, State1)}. ack(AckTags, ChPid, State) -> subtract_acks(ChPid, AckTags, State, @@ -621,20 +636,29 @@ remove_consumers(ChPid, Queue, QName) -> possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of - not_found -> + not_found -> State; + C -> C1 = Update(C), + case is_ch_blocked(C) andalso not is_ch_blocked(C1) of + false -> update_ch_record(C1), + State; + true -> unblock(State, C1) + end + end. + +unblock(State, C = #cr{limiter = Limiter}) -> + case lists:partition( + fun({_ChPid, #consumer{tag = CTag}}) -> + rabbit_limiter:is_consumer_blocked(Limiter, CTag) + end, queue:to_list(C#cr.blocked_consumers)) of + {_, []} -> + update_ch_record(C), State; - C -> - C1 = Update(C), - case is_ch_blocked(C) andalso not is_ch_blocked(C1) of - false -> update_ch_record(C1), - State; - true -> #cr{blocked_consumers = Consumers} = C1, - update_ch_record( - C1#cr{blocked_consumers = queue:new()}), - AC1 = queue:join(State#q.active_consumers, - Consumers), - run_message_queue(State#q{active_consumers = AC1}) - end + {Blocked, Unblocked} -> + BlockedQ = queue:from_list(Blocked), + UnblockedQ = queue:from_list(Unblocked), + update_ch_record(C#cr{blocked_consumers = BlockedQ}), + AC1 = queue:join(State#q.active_consumers, UnblockedQ), + run_message_queue(State#q{active_consumers = AC1}) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; @@ -731,6 +755,11 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> T -> now_micros() + T * 1000 end. +%% Logically this function should invoke maybe_send_drained/2. +%% However, that is expensive. Since some frequent callers of +%% drop_expired_msgs/1, in particular deliver_or_enqueue/3, cannot +%% possibly cause the queue to become empty, we push the +%% responsibility to the callers. So be cautious when adding new ones. drop_expired_msgs(State) -> case is_empty(State) of true -> State; @@ -784,80 +813,31 @@ dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) -> State1. dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, - publish_seqno = SeqNo0, - unconfirmed = UC0, - queue_monitors = QMons0, backing_queue_state = BQS, backing_queue = BQ}) -> QName = qname(State), - {Res, {AckImm1, SeqNo1, UC1, QMons1}, BQS1} = - Fun(fun (Msg, AckTag, {AckImm, SeqNo, UC, QMons}) -> - case dead_letter_publish(Msg, Reason, - X, RK, SeqNo, QName) of - [] -> {[AckTag | AckImm], SeqNo, UC, QMons}; - QPids -> {AckImm, SeqNo + 1, - dtree:insert(SeqNo, QPids, AckTag, UC), - pmon:monitor_all(QPids, QMons)} - end - end, {[], SeqNo0, UC0, QMons0}, BQS), - {_Guids, BQS2} = BQ:ack(AckImm1, BQS1), - {Res, State#q{publish_seqno = SeqNo1, - unconfirmed = UC1, - queue_monitors = QMons1, - backing_queue_state = BQS2}}. - -dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) -> + {Res, Acks1, BQS1} = + Fun(fun (Msg, AckTag, Acks) -> + dead_letter_publish(Msg, Reason, X, RK, QName), + [AckTag | Acks] + end, [], BQS), + {_Guids, BQS2} = BQ:ack(Acks1, BQS1), + {Res, State#q{backing_queue_state = BQS2}}. + +dead_letter_publish(Msg, Reason, X, RK, QName) -> DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName), - Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo), + Delivery = rabbit_basic:delivery(false, DLMsg, undefined), {Queues, Cycles} = detect_dead_letter_cycles( DLMsg, rabbit_exchange:route(X, Delivery)), lists:foreach(fun log_cycle_once/1, Cycles), - {_, DeliveredQPids} = rabbit_amqqueue:deliver( - rabbit_amqqueue:lookup(Queues), Delivery), - DeliveredQPids. - -handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, - unconfirmed = UC}) -> - case pmon:is_monitored(QPid, QMons) of - false -> noreply(State); - true -> case rabbit_misc:is_abnormal_exit(Reason) of - true -> {Lost, _UC1} = dtree:take_all(QPid, UC), - QNameS = rabbit_misc:rs(qname(State)), - rabbit_log:warning("DLQ ~p for ~s died with " - "~p unconfirmed messages~n", - [QPid, QNameS, length(Lost)]); - false -> ok - end, - {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), - cleanup_after_confirm( - [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], - State#q{queue_monitors = pmon:erase(QPid, QMons), - unconfirmed = UC1}) - end. + rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery), + ok. -stop(State) -> stop(undefined, noreply, State). +stop(State) -> stop(noreply, State). -stop(From, Reply, State = #q{unconfirmed = UC}) -> - case {dtree:is_empty(UC), Reply} of - {true, noreply} -> {stop, normal, State}; - {true, _} -> {stop, normal, Reply, State}; - {false, _} -> noreply(State#q{delayed_stop = {From, Reply}}) - end. +stop(noreply, State) -> {stop, normal, State}; +stop(Reply, State) -> {stop, normal, Reply, State}. -cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, - unconfirmed = UC, - backing_queue = BQ, - backing_queue_state = BQS}) -> - {_Guids, BQS1} = BQ:ack(AckTags, BQS), - State1 = State#q{backing_queue_state = BQS1}, - case dtree:is_empty(UC) andalso DS =/= undefined of - true -> case DS of - {_, noreply} -> ok; - {From, Reply} -> gen_server2:reply(From, Reply) - end, - {stop, normal, State1}; - false -> noreply(State1) - end. detect_dead_letter_cycles(#basic_message{content = Content}, Queues) -> #content{properties = #'P_basic'{headers = Headers}} = @@ -1008,7 +988,7 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) -> %%---------------------------------------------------------------------------- -prioritise_call(Msg, _From, _State) -> +prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 9; {info, _Items} -> 9; @@ -1017,7 +997,7 @@ prioritise_call(Msg, _From, _State) -> _ -> 0 end. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of delete_immediately -> 8; {set_ram_duration_target, _Duration} -> 8; @@ -1026,7 +1006,7 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. -prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> +prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> case Msg of {'DOWN', _, process, DownPid, _} -> 8; update_ram_duration -> 8; @@ -1037,9 +1017,6 @@ prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> _ -> 0 end. -handle_call(_, _, State = #q{delayed_stop = DS}) when DS =/= undefined -> - noreply(State); - handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> declare(Recover, From, State); @@ -1079,16 +1056,15 @@ handle_call({deliver, Delivery, Delivered}, From, State) -> gen_server2:reply(From, ok), noreply(deliver_or_enqueue(Delivery, Delivered, State)); -handle_call({notify_down, ChPid}, From, State) -> +handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues %% are no longer visible by the time we send a response to the %% client. The queue is ultimately deleted in terminate/2; if we %% return stop with a reply, terminate/2 will be called by - %% gen_server2 *before* the reply is sent. FIXME: in case of a - %% delayed stop the reply is sent earlier. + %% gen_server2 *before* the reply is sent. case handle_ch_down(ChPid, State) of {ok, State1} -> reply(ok, State1); - {stop, State1} -> stop(From, ok, State1) + {stop, State1} -> stop(ok, State1) end; handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, @@ -1113,7 +1089,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, OkMsg}, + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}, _From, State = #q{exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of in_use -> @@ -1125,8 +1101,17 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, true -> rabbit_limiter:activate(Limiter); false -> Limiter end, - update_ch_record(C#cr{consumer_count = Count + 1, - limiter = Limiter1}), + Limiter2 = case CreditArgs of + none -> Limiter1; + {Crd, Drain} -> rabbit_limiter:credit( + Limiter1, ConsumerTag, Crd, Drain) + end, + C1 = update_ch_record(C#cr{consumer_count = Count + 1, + limiter = Limiter2}), + case is_empty(State) of + true -> send_drained(C1); + false -> ok + end, Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; @@ -1141,7 +1126,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, reply(ok, run_message_queue(State1#q{active_consumers = AC1})) end; -handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, +handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, State = #q{exclusive_consumer = Holder}) -> ok = maybe_send_reply(ChPid, OkMsg), case lookup_ch(ChPid) of @@ -1156,8 +1141,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, 1 -> rabbit_limiter:deactivate(Limiter); _ -> Limiter end, + Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag), update_ch_record(C#cr{consumer_count = Count - 1, - limiter = Limiter1, + limiter = Limiter2, blocked_consumers = Blocked1}), State1 = State#q{ exclusive_consumer = case Holder of @@ -1169,7 +1155,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, State#q.active_consumers)}, case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); - true -> stop(From, ok, State1) + true -> stop(ok, State1) end end; @@ -1178,20 +1164,21 @@ handle_call(stat, _From, State) -> ensure_expiry_timer(State), reply({ok, BQ:len(BQS), consumer_count()}, State1); -handle_call({delete, IfUnused, IfEmpty}, From, +handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> IsEmpty = BQ:is_empty(BQS), IsUnused = is_unused(State), if IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); IfUnused and not(IsUnused) -> reply({error, in_use}, State); - true -> stop(From, {ok, BQ:len(BQS)}, State) + true -> stop({ok, BQ:len(BQS)}, State) end; handle_call(purge, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Count, BQS1} = BQ:purge(BQS), - reply({ok, Count}, State#q{backing_queue_state = BQS1}); + State1 = State#q{backing_queue_state = BQS1}, + reply({ok, Count}, maybe_send_drained(Count =:= 0, State1)); handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), @@ -1239,19 +1226,6 @@ handle_call(force_event_refresh, _From, end, reply(ok, State). -handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> - {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC), - State1 = case dtree:is_defined(QPid, UC1) of - false -> QMons = State#q.queue_monitors, - State#q{queue_monitors = pmon:demonitor(QPid, QMons)}; - true -> State - end, - cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], - State1#q{unconfirmed = UC1}); - -handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> - noreply(State); - handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}); @@ -1337,18 +1311,27 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, noreply(State#q{backing_queue = BQ1, backing_queue_state = BQS1}); +handle_cast({credit, ChPid, CTag, Credit, Drain}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + Len = BQ:len(BQS), + rabbit_channel:send_credit_reply(ChPid, Len), + C = #cr{limiter = Limiter} = lookup_ch(ChPid), + C1 = C#cr{limiter = rabbit_limiter:credit(Limiter, CTag, Credit, Drain)}, + noreply(case Drain andalso Len == 0 of + true -> update_ch_record(C1), + send_drained(C1), + State; + false -> case is_ch_blocked(C1) of + true -> update_ch_record(C1), + State; + false -> unblock(State, C1) + end + end); + handle_cast(wake_up, State) -> noreply(State). -%% We need to not ignore this as we need to remove outstanding -%% confirms due to queue death. -handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, - State = #q{delayed_stop = DS}) when DS =/= undefined -> - handle_queue_down(DownPid, Reason, State); - -handle_info(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> - noreply(State); - handle_info(maybe_expire, State) -> case is_unused(State) of true -> stop(State); @@ -1356,7 +1339,9 @@ handle_info(maybe_expire, State) -> end; handle_info(drop_expired, State) -> - noreply(drop_expired_msgs(State#q{ttl_timer_ref = undefined})); + WasEmpty = is_empty(State), + State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}), + noreply(maybe_send_drained(WasEmpty, State1)); handle_info(emit_stats, State) -> emit_stats(State), @@ -1375,9 +1360,9 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, %% unexpectedly. stop(State); -handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) -> +handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> case handle_ch_down(DownPid, State) of - {ok, State1} -> handle_queue_down(DownPid, Reason, State1); + {ok, State1} -> noreply(State1); {stop, State1} -> stop(State1) end; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e9f69b62..52c6140e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -21,14 +21,15 @@ -behaviour(gen_server2). -export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, flushed/2]). +-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2, + flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). -export([force_event_refresh/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, prioritise_info/2, format_message_queue/2]). + handle_info/2, handle_pre_hibernate/1, prioritise_call/4, + prioritise_cast/3, prioritise_info/3, format_message_queue/2]). %% Internal -export([list_local/0]). @@ -94,6 +95,9 @@ -spec(deliver/4 :: (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). +-spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}]) + -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(list_local/0 :: () -> [pid()]). @@ -138,6 +142,12 @@ send_command(Pid, Msg) -> deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). +send_credit_reply(Pid, Len) -> + gen_server2:cast(Pid, {send_credit_reply, Len}). + +send_drained(Pid, CTagCredit) -> + gen_server2:cast(Pid, {send_drained, CTagCredit}). + flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). @@ -217,20 +227,20 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, {ok, State1, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -prioritise_call(Msg, _From, _State) -> +prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 9; {info, _Items} -> 9; _ -> 0 end. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of {confirm, _MsgSeqNos, _QPid} -> 5; _ -> 0 end. -prioritise_info(Msg, _State) -> +prioritise_info(Msg, _Len, _State) -> case Msg of emit_stats -> 7; _ -> 0 @@ -315,6 +325,18 @@ handle_cast({deliver, ConsumerTag, AckRequired, Content), noreply(record_sent(ConsumerTag, AckRequired, Msg, State)); +handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.credit_ok'{available = Len}), + noreply(State); + +handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> + [ok = rabbit_writer:send_command( + WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag, + credit_drained = CreditDrained}) + || {ConsumerTag, CreditDrained} <- CTagCredit], + noreply(State); + handle_cast(force_event_refresh, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), noreply(State); @@ -711,7 +733,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_local = _, % FIXME: implement no_ack = NoAck, exclusive = ExclusiveConsume, - nowait = NoWait}, + nowait = NoWait, + arguments = Arguments}, _, State = #ch{conn_pid = ConnPid, limiter = Limiter, consumer_mapping = ConsumerMapping}) -> @@ -737,6 +760,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, rabbit_limiter:pid(Limiter), rabbit_limiter:is_active(Limiter), ActualConsumerTag, ExclusiveConsume, + parse_credit_args(Arguments), ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), Q} @@ -1112,6 +1136,17 @@ handle_method(#'channel.flow'{active = false}, _, State1#ch{blocking = sets:from_list(QPids)})} end; +handle_method(#'basic.credit'{consumer_tag = CTag, + credit = Credit, + drain = Drain}, _, + State = #ch{consumer_mapping = Consumers}) -> + case dict:find(CTag, Consumers) of + {ok, Q} -> ok = rabbit_amqqueue:credit( + Q, self(), CTag, Credit, Drain), + {noreply, State}; + error -> precondition_failed("unknown consumer tag '~s'", [CTag]) + end; + handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). @@ -1178,6 +1213,16 @@ handle_consuming_queue_down(QPid, handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QPid, DQ)}. +parse_credit_args(Arguments) -> + case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of + {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), + rabbit_misc:table_lookup(T, <<"drain">>)} of + {{long, Credit}, {boolean, Drain}} -> {Credit, Drain}; + _ -> none + end; + undefined -> none + end. + binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl index 91798a0f..16a640ec 100644 --- a/src/rabbit_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor2). --export([start_link/1, start_link/2]). +-export([start_link/1, start_link/2, start_link_worker/2]). -export([init/1]). @@ -32,6 +32,8 @@ rabbit_types:ok_pid_or_error()). -spec(start_link/2 :: ({'local', atom()}, rabbit_types:mfargs()) -> rabbit_types:ok_pid_or_error()). +-spec(start_link_worker/2 :: ({'local', atom()}, rabbit_types:mfargs()) -> + rabbit_types:ok_pid_or_error()). -endif. @@ -43,6 +45,13 @@ start_link(Callback) -> start_link(SupName, Callback) -> supervisor2:start_link(SupName, ?MODULE, Callback). +start_link_worker(SupName, Callback) -> + supervisor2:start_link(SupName, ?MODULE, {Callback, worker}). + init({M,F,A}) -> {ok, {{simple_one_for_one, 0, 1}, - [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}. + [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}; +init({{M,F,A}, worker}) -> + {ok, {{simple_one_for_one, 0, 1}, + [{client, {M,F,A}, temporary, ?MAX_WAIT, worker, [M]}]}}. + diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 430c2716..d9f1170e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -16,17 +16,18 @@ %% The purpose of the limiter is to stem the flow of messages from %% queues to channels, in order to act upon various protocol-level -%% flow control mechanisms, specifically AMQP's basic.qos -%% prefetch_count and channel.flow. +%% flow control mechanisms, specifically AMQP 0-9-1's basic.qos +%% prefetch_count and channel.flow, and AMQP 1.0's link (aka consumer) +%% credit mechanism. %% %% Each channel has an associated limiter process, created with %% start_link/1, which it passes to queues on consumer creation with -%% rabbit_amqqueue:basic_consume/8, and rabbit_amqqueue:basic_get/4. +%% rabbit_amqqueue:basic_consume/9, and rabbit_amqqueue:basic_get/4. %% The latter isn't strictly necessary, since basic.get is not %% subject to limiting, but it means that whenever a queue knows about %% a channel, it also knows about its limiter, which is less fiddly. %% -%% Th limiter process holds state that is, in effect, shared between +%% The limiter process holds state that is, in effect, shared between %% the channel and all queues from which the channel is %% consuming. Essentially all these queues are competing for access to %% a single, limited resource - the ability to deliver messages via @@ -54,33 +55,46 @@ %% inactive. In practice it is rare for that to happen, though we %% could optimise this case in the future. %% +%% In addition, the consumer credit bookkeeping is local to queues, so +%% it is not necessary to store information about it in the limiter +%% process. But for abstraction we hide it from the queue behind the +%% limiter API, and it therefore becomes part of the queue local +%% state. +%% %% The interactions with the limiter are as follows: %% %% 1. Channels tell the limiter about basic.qos prefetch counts - %% that's what the limit_prefetch/3, unlimit_prefetch/1, %% is_prefetch_limited/1, get_prefetch_limit/1 API functions are %% about - and channel.flow blocking - that's what block/1, -%% unblock/1 and is_blocked/1 are for. +%% unblock/1 and is_blocked/1 are for. They also tell the limiter +%% queue state (via the queue) about consumer credit changes - +%% that's what credit/4 is for. +%% +%% 2. Queues also tell the limiter queue state about the queue +%% becoming empty (via drained/1) and consumers leaving (via +%% forget_consumer/2). %% -%% 2. Queues register with the limiter - this happens as part of +%% 3. Queues register with the limiter - this happens as part of %% activate/1. %% %% 4. The limiter process maintains an internal counter of 'messages %% sent but not yet acknowledged', called the 'volume'. %% -%% 5. Queues ask the limiter for permission (with can_send/2) whenever +%% 5. Queues ask the limiter for permission (with can_send/3) whenever %% they want to deliver a message to a channel. The limiter checks -%% whether a) the channel isn't blocked by channel.flow, and b) the -%% volume has not yet reached the prefetch limit. If so it -%% increments the volume and tells the queue to proceed. Otherwise -%% it marks the queue as requiring notification (see below) and -%% tells the queue not to proceed. +%% whether a) the channel isn't blocked by channel.flow, b) the +%% volume has not yet reached the prefetch limit, and c) whether +%% the consumer has enough credit. If so it increments the volume +%% and tells the queue to proceed. Otherwise it marks the queue as +%% requiring notification (see below) and tells the queue not to +%% proceed. %% -%% 6. A queue that has told to proceed (by the return value of -%% can_send/2) sends the message to the channel. Conversely, a +%% 6. A queue that has been told to proceed (by the return value of +%% can_send/3) sends the message to the channel. Conversely, a %% queue that has been told not to proceed, will not attempt to %% deliver that message, or any future messages, to the -%% channel. This is accomplished by can_send/2 capturing the +%% channel. This is accomplished by can_send/3 capturing the %% outcome in the local state, where it can be accessed with %% is_suspended/1. %% @@ -88,14 +102,14 @@ %% how many messages were ack'ed. The limiter process decrements %% the volume and if it falls below the prefetch_count then it %% notifies (through rabbit_amqqueue:resume/2) all the queues -%% requiring notification, i.e. all those that had a can_send/2 +%% requiring notification, i.e. all those that had a can_send/3 %% request denied. %% %% 8. Upon receipt of such a notification, queues resume delivery to %% the channel, i.e. they will once again start asking limiter, as %% described in (5). %% -%% 9. When a queues has no more consumers associated with a particular +%% 9. When a queue has no more consumers associated with a particular %% channel, it deactivates use of the limiter with deactivate/1, %% which alters the local state such that no further interactions %% with the limiter process take place until a subsequent @@ -111,16 +125,17 @@ is_prefetch_limited/1, is_blocked/1, is_active/1, get_prefetch_limit/1, ack/2, pid/1]). %% queue API --export([client/1, activate/1, can_send/2, resume/1, deactivate/1, - is_suspended/1]). +-export([client/1, activate/1, can_send/3, resume/1, deactivate/1, + is_suspended/1, is_consumer_blocked/2, credit/4, drained/1, + forget_consumer/2]). %% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, prioritise_call/3]). + handle_info/2, prioritise_call/4]). %%---------------------------------------------------------------------------- -record(lstate, {pid, prefetch_limited, blocked}). --record(qstate, {pid, state}). +-record(qstate, {pid, state, credits}). -ifdef(use_specs). @@ -147,11 +162,17 @@ -spec(client/1 :: (pid()) -> qstate()). -spec(activate/1 :: (qstate()) -> qstate()). --spec(can_send/2 :: (qstate(), boolean()) -> +-spec(can_send/3 :: (qstate(), boolean(), rabbit_types:ctag()) -> {'continue' | 'suspend', qstate()}). -spec(resume/1 :: (qstate()) -> qstate()). -spec(deactivate/1 :: (qstate()) -> qstate()). -spec(is_suspended/1 :: (qstate()) -> boolean()). +-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()). +-spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean()) + -> qstate()). +-spec(drained/1 :: (qstate()) + -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}). +-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()). -endif. @@ -166,6 +187,8 @@ %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. +-record(credit, {credit = 0, drain = false}). + %%---------------------------------------------------------------------------- %% API %%---------------------------------------------------------------------------- @@ -208,23 +231,29 @@ ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}). pid(#lstate{pid = Pid}) -> Pid. -client(Pid) -> #qstate{pid = Pid, state = dormant}. +client(Pid) -> #qstate{pid = Pid, state = dormant, credits = gb_trees:empty()}. activate(L = #qstate{state = dormant}) -> ok = gen_server:cast(L#qstate.pid, {register, self()}), L#qstate{state = active}; activate(L) -> L. -can_send(L = #qstate{state = active}, AckRequired) -> +can_send(L = #qstate{pid = Pid, state = State, credits = Credits}, + AckRequired, CTag) -> + case is_consumer_blocked(L, CTag) of + false -> case (State =/= active orelse + safe_call(Pid, {can_send, self(), AckRequired}, true)) of + true -> {continue, L#qstate{ + credits = record_send_q(CTag, Credits)}}; + false -> {suspend, L#qstate{state = suspended}} + end; + true -> {suspend, L} + end. + +safe_call(Pid, Msg, ExitValue) -> rabbit_misc:with_exit_handler( - fun () -> {continue, L} end, - fun () -> Msg = {can_send, self(), AckRequired}, - case gen_server2:call(L#qstate.pid, Msg, infinity) of - true -> {continue, L}; - false -> {suspend, L#qstate{state = suspended}} - end - end); -can_send(L, _AckRequired) -> {continue, L}. + fun () -> ExitValue end, + fun () -> gen_server2:call(Pid, Msg, infinity) end). resume(L) -> L#qstate{state = active}. @@ -236,14 +265,61 @@ deactivate(L) -> is_suspended(#qstate{state = suspended}) -> true; is_suspended(#qstate{}) -> false. +is_consumer_blocked(#qstate{credits = Credits}, CTag) -> + case gb_trees:lookup(CTag, Credits) of + {value, #credit{credit = C}} when C > 0 -> false; + {value, #credit{}} -> true; + none -> false + end. + +credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) -> + Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}. + +drained(Limiter = #qstate{credits = Credits}) -> + {CTagCredits, Credits2} = + rabbit_misc:gb_trees_fold( + fun (CTag, #credit{credit = C, drain = true}, {Acc, Creds0}) -> + {[{CTag, C} | Acc], update_credit(CTag, 0, false, Creds0)}; + (_CTag, #credit{credit = _C, drain = false}, {Acc, Creds0}) -> + {Acc, Creds0} + end, {[], Credits}, Credits), + {CTagCredits, Limiter#qstate{credits = Credits2}}. + +forget_consumer(Limiter = #qstate{credits = Credits}, CTag) -> + Limiter#qstate{credits = gb_trees:delete_any(CTag, Credits)}. + +%%---------------------------------------------------------------------------- +%% Queue-local code +%%---------------------------------------------------------------------------- + +%% We want to do all the AMQP 1.0-ish link level credit calculations +%% in the queue (to do them elsewhere introduces a ton of +%% races). However, it's a big chunk of code that is conceptually very +%% linked to the limiter concept. So we get the queue to hold a bit of +%% state for us (#qstate.credits), and maintain a fiction that the +%% limiter is making the decisions... + +record_send_q(CTag, Credits) -> + case gb_trees:lookup(CTag, Credits) of + {value, #credit{credit = Credit, drain = Drain}} -> + update_credit(CTag, Credit - 1, Drain, Credits); + none -> + Credits + end. + +update_credit(CTag, Credit, Drain, Credits) -> + %% Using up all credit implies no need to send a 'drained' event + Drain1 = Drain andalso Credit > 0, + gb_trees:enter(CTag, #credit{credit = Credit, drain = Drain1}, Credits). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- init([]) -> {ok, #lim{}}. -prioritise_call(get_prefetch_limit, _From, _State) -> 9; -prioritise_call(_Msg, _From, _State) -> 0. +prioritise_call(get_prefetch_limit, _From, _Len, _State) -> 9; +prioritise_call(_Msg, _From, _Len, _State) -> 0. handle_call({new, ChPid}, _From, State = #lim{ch_pid = undefined}) -> {reply, ok, State#lim{ch_pid = ChPid}}; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index b435e0f3..22edfcb6 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -27,8 +27,8 @@ -export([start_link/1, set_maximum_since_use/2, info/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, prioritise_info/2, format_message_queue/2]). + code_change/3, handle_pre_hibernate/1, prioritise_call/4, + prioritise_cast/3, prioritise_info/3, format_message_queue/2]). -export([joined/2, members_changed/3, handle_msg/3]). @@ -323,14 +323,14 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, BQS3 = BQ:handle_pre_hibernate(BQS2), {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}. -prioritise_call(Msg, _From, _State) -> +prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 9; {gm_deaths, _Deaths} -> 5; _ -> 0 end. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; @@ -339,7 +339,7 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. -prioritise_info(Msg, _State) -> +prioritise_info(Msg, _Len, _State) -> case Msg of update_ram_duration -> 8; sync_timeout -> 6; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 13b40a48..c63321b5 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -29,8 +29,8 @@ -export([transform_dir/3, force_recovery/2]). %% upgrade -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, prioritise_call/3, prioritise_cast/2, - prioritise_info/2, format_message_queue/2]). + code_change/3, prioritise_call/4, prioritise_cast/3, + prioritise_info/3, format_message_queue/2]). %%---------------------------------------------------------------------------- @@ -51,6 +51,9 @@ -define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB + %% i.e. two pairs, so GC does not go idle when busy +-define(MAXIMUM_SIMULTANEOUS_GC_FILES, 4). + %%---------------------------------------------------------------------------- -record(msstate, @@ -738,7 +741,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -prioritise_call(Msg, _From, _State) -> +prioritise_call(Msg, _From, _Len, _State) -> case Msg of successfully_recovered_state -> 7; {new_client_state, _Ref, _Pid, _MODC, _CloseFDsFun} -> 7; @@ -746,7 +749,7 @@ prioritise_call(Msg, _From, _State) -> _ -> 0 end. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of {combine_files, _Source, _Destination, _Reclaimed} -> 8; {delete_file, _File, _Reclaimed} -> 8; @@ -755,7 +758,7 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. -prioritise_info(Msg, _State) -> +prioritise_info(Msg, _Len, _State) -> case Msg of sync -> 8; _ -> 0 @@ -1728,10 +1731,12 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION -> %% TODO: the algorithm here is sub-optimal - it may result in a %% complete traversal of FileSummaryEts. - case ets:first(FileSummaryEts) of - '$end_of_table' -> + First = ets:first(FileSummaryEts), + case First =:= '$end_of_table' orelse + orddict:size(Pending) >= ?MAXIMUM_SIMULTANEOUS_GC_FILES of + true -> State; - First -> + false -> case find_files_to_combine(FileSummaryEts, FileSizeLimit, ets:lookup(FileSummaryEts, First)) of not_found -> diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 3881de23..0dd7a7cc 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -23,7 +23,7 @@ -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, prioritise_cast/2]). + terminate/2, code_change/3, prioritise_cast/3]). -record(state, { pending_no_readers, @@ -79,8 +79,8 @@ init([MsgStoreState]) -> msg_store_state = MsgStoreState }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; -prioritise_cast(_Msg, _State) -> 0. +prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; +prioritise_cast(_Msg, _Len, _State) -> 0. handle_call(stop, _From, State) -> {stop, normal, ok, State}. diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 3872f3df..fb74d4a3 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -32,8 +32,9 @@ -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). +-define(RABBIT_DOWN_PING_INTERVAL, 1000). --record(state, {monitors, partitions, subscribers}). +-record(state, {monitors, partitions, subscribers, down_ping_timer}). %%---------------------------------------------------------------------------- @@ -272,10 +273,34 @@ handle_info({mnesia_system_event, ordsets:add_element(Node, ordsets:from_list(Partitions))), {noreply, State1#state{partitions = Partitions1}}; +handle_info(ping_nodes, State) -> + %% We ping nodes when some are down to ensure that we find out + %% about healed partitions quickly. We ping all nodes rather than + %% just the ones we know are down for simplicity; it's not expensive + %% to ping the nodes that are up, after all. + State1 = State#state{down_ping_timer = undefined}, + Self = self(), + %% all_nodes_up() both pings all the nodes and tells us if we need to again. + %% + %% We ping in a separate process since in a partition it might + %% take some noticeable length of time and we don't want to block + %% the node monitor for that long. + spawn_link(fun () -> + case all_nodes_up() of + true -> ok; + false -> Self ! ping_again + end + end), + {noreply, State1}; + +handle_info(ping_again, State) -> + {noreply, ensure_ping_timer(State)}; + handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> +terminate(_Reason, State) -> + rabbit_misc:stop_timer(State, #state.down_ping_timer), ok. code_change(_OldVsn, State, _Extra) -> @@ -309,17 +334,22 @@ handle_dead_rabbit(Node) -> ok. majority() -> - length(alive_nodes()) / length(rabbit_mnesia:cluster_nodes(all)) > 0.5. + Nodes = rabbit_mnesia:cluster_nodes(all), + length(alive_nodes(Nodes)) / length(Nodes) > 0.5. + +all_nodes_up() -> + Nodes = rabbit_mnesia:cluster_nodes(all), + length(alive_nodes(Nodes)) =:= length(Nodes). %% mnesia:system_info(db_nodes) (and hence %% rabbit_mnesia:cluster_nodes(running)) does not give reliable results %% when partitioned. -alive_nodes() -> - Nodes = rabbit_mnesia:cluster_nodes(all), - [N || N <- Nodes, pong =:= net_adm:ping(N)]. +alive_nodes() -> alive_nodes(rabbit_mnesia:cluster_nodes(all)). + +alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)]. alive_rabbit_nodes() -> - [N || N <- alive_nodes(), rabbit_nodes:is_running(N, rabbit)]. + [N || N <- alive_nodes(), rabbit_nodes:is_process_running(N, rabbit)]. await_cluster_recovery() -> rabbit_log:warning("Cluster minority status detected - awaiting recovery~n", @@ -339,7 +369,7 @@ await_cluster_recovery() -> wait_for_cluster_recovery(Nodes) -> case majority() of true -> rabbit:start(); - false -> timer:sleep(1000), + false -> timer:sleep(?RABBIT_DOWN_PING_INTERVAL), wait_for_cluster_recovery(Nodes) end. @@ -353,7 +383,11 @@ handle_dead_rabbit_state(State = #state{partitions = Partitions}) -> [] -> []; _ -> Partitions end, - State#state{partitions = Partitions1}. + ensure_ping_timer(State#state{partitions = Partitions1}). + +ensure_ping_timer(State) -> + rabbit_misc:ensure_timer( + State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL, ping_nodes). handle_live_rabbit(Node) -> ok = rabbit_alarm:on_node_up(Node), diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index c92e5963..5640f12a 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -16,7 +16,8 @@ -module(rabbit_nodes). --export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, is_running/2]). +-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, + is_running/2, is_process_running/2]). -define(EPMD_TIMEOUT, 30000). @@ -33,6 +34,7 @@ -spec(parts/1 :: (node() | string()) -> {string(), string()}). -spec(cookie_hash/0 :: () -> string()). -spec(is_running/2 :: (node(), atom()) -> boolean()). +-spec(is_process_running/2 :: (node(), atom()) -> boolean()). -endif. @@ -98,3 +100,10 @@ is_running(Node, Application) -> {badrpc, _} -> false; Apps -> proplists:is_defined(Application, Apps) end. + +is_process_running(Node, Process) -> + case rpc:call(Node, erlang, whereis, [Process]) of + {badrpc, _} -> false; + undefined -> false; + P when is_pid(P) -> true + end. diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index b1238623..96277b68 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -162,15 +162,16 @@ format_rdn(#'AttributeTypeAndValue'{type = T, value = V}) -> {?'id-at-pseudonym' , "PSEUDONYM"}, {?'id-domainComponent' , "DC"}, {?'id-emailAddress' , "EMAILADDRESS"}, - {?'street-address' , "STREET"}], + {?'street-address' , "STREET"}, + {{0,9,2342,19200300,100,1,1} , "UID"}], %% Not in public_key.hrl case proplists:lookup(T, Fmts) of {_, Fmt} -> - io_lib:format(Fmt ++ "=~s", [FV]); + rabbit_misc:format(Fmt ++ "=~s", [FV]); none when is_tuple(T) -> - TypeL = [io_lib:format("~w", [X]) || X <- tuple_to_list(T)], - io_lib:format("~s:~s", [string:join(TypeL, "."), FV]); + TypeL = [rabbit_misc:format("~w", [X]) || X <- tuple_to_list(T)], + rabbit_misc:format("~s=~s", [string:join(TypeL, "."), FV]); none -> - io_lib:format("~p:~s", [T, FV]) + rabbit_misc:format("~p=~s", [T, FV]) end. %% Escape a string as per RFC4514. @@ -204,14 +205,26 @@ format_asn1_value({ST, S}) when ST =:= teletexString; ST =:= printableString; format_directory_string(ST, S); format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2, $Z]}) -> - io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ", - [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]); + rabbit_misc:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ", + [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]); %% We appear to get an untagged value back for an ia5string %% (e.g. domainComponent). format_asn1_value(V) when is_list(V) -> V; +format_asn1_value(V) when is_binary(V) -> + %% OTP does not decode some values when combined with an unknown + %% type. That's probably wrong, so as a last ditch effort let's + %% try manually decoding. 'DirectoryString' is semi-arbitrary - + %% but it is the type which covers the various string types we + %% handle below. + try + {ST, S} = public_key:der_decode('DirectoryString', V), + format_directory_string(ST, S) + catch _:_ -> + rabbit_misc:format("~p", [V]) + end; format_asn1_value(V) -> - io_lib:format("~p", [V]). + rabbit_misc:format("~p", [V]). %% DirectoryString { INTEGER : maxSize } ::= CHOICE { %% teletexString TeletexString (SIZE (1..maxSize)), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b2c80364..e7b69879 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1107,7 +1107,7 @@ test_server_status() -> rabbit_misc:r(<<"/">>, queue, Name), false, false, [], none)]], ok = rabbit_amqqueue:basic_consume( - Q, true, Ch, Limiter, false, <<"ctag">>, true, undefined), + Q, true, Ch, Limiter, false, <<"ctag">>, true, none, undefined), %% list queues ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true), diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 56e4b7b3..22b223d2 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -23,7 +23,7 @@ -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, prioritise_cast/2]). + terminate/2, code_change/3, prioritise_cast/3]). %%---------------------------------------------------------------------------- @@ -73,8 +73,8 @@ init([WId]) -> {ok, WId, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; -prioritise_cast(_Msg, _State) -> 0. +prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; +prioritise_cast(_Msg, _Len, _State) -> 0. handle_call({submit, Fun}, From, WId) -> gen_server2:reply(From, run(Fun)), |