summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl858
1 files changed, 445 insertions, 413 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e3885644..3712a625 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -29,8 +29,8 @@
-export([init_with_backing_queue_state/7]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
+ handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
+ prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
%% Queue's state
-record(q, {q,
@@ -49,12 +49,10 @@
ttl_timer_ref,
ttl_timer_expiry,
senders,
- publish_seqno,
- unconfirmed,
- delayed_stop,
- queue_monitors,
dlx,
- dlx_routing_key
+ dlx_routing_key,
+ max_length,
+ status
}).
-record(consumer, {tag, ack_required}).
@@ -64,9 +62,12 @@
monitor_ref,
acktags,
consumer_count,
+ %% Queue of {ChPid, #consumer{}} for consumers which have
+ %% been blocked for any reason
blocked_consumers,
+ %% The limiter itself
limiter,
- is_limit_active,
+ %% Internal flow control for queue -> writer
unsent_message_count}).
%%----------------------------------------------------------------------------
@@ -85,7 +86,7 @@
%%----------------------------------------------------------------------------
-define(STATISTICS_KEYS,
- [pid,
+ [name,
policy,
exclusive_consumer_pid,
exclusive_consumer_tag,
@@ -93,24 +94,22 @@
messages_unacknowledged,
messages,
consumers,
- active_consumers,
memory,
slave_pids,
synchronised_slave_pids,
- backing_queue_status
+ backing_queue_status,
+ status
]).
-define(CREATION_EVENT_KEYS,
- [pid,
- name,
+ [name,
durable,
auto_delete,
arguments,
owner_pid
]).
--define(INFO_KEYS,
- ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]]).
%%----------------------------------------------------------------------------
@@ -122,26 +121,7 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
process_flag(trap_exit, true),
- State = #q{q = Q#amqqueue{pid = self()},
- exclusive_consumer = none,
- has_had_consumers = false,
- backing_queue = undefined,
- backing_queue_state = undefined,
- active_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = undefined,
- expiry_timer_ref = undefined,
- ttl = undefined,
- senders = pmon:new(),
- dlx = undefined,
- dlx_routing_key = undefined,
- publish_seqno = 1,
- unconfirmed = dtree:empty(),
- delayed_stop = undefined,
- queue_monitors = pmon:new(),
- msg_id_to_channel = gb_trees:empty()},
- {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
+ {ok, init_state(Q#amqqueue{pid = self()}), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
@@ -150,27 +130,26 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
none -> ok;
_ -> erlang:monitor(process, Owner)
end,
+ State = init_state(Q),
+ State1 = State#q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = RateTRef,
+ senders = Senders,
+ msg_id_to_channel = MTC},
+ State2 = process_args(State1),
+ lists:foldl(fun (Delivery, StateN) ->
+ deliver_or_enqueue(Delivery, true, StateN)
+ end, State2, Deliveries).
+
+init_state(Q) ->
State = #q{q = Q,
exclusive_consumer = none,
has_had_consumers = false,
- backing_queue = BQ,
- backing_queue_state = BQS,
active_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = RateTRef,
- expiry_timer_ref = undefined,
- ttl = undefined,
- senders = Senders,
- publish_seqno = 1,
- unconfirmed = dtree:empty(),
- delayed_stop = undefined,
- queue_monitors = pmon:new(),
- msg_id_to_channel = MTC},
- State1 = process_args(rabbit_event:init_stats_timer(State, #q.stats_timer)),
- lists:foldl(fun (Delivery, StateN) ->
- deliver_or_enqueue(Delivery, true, StateN)
- end, State1, Deliveries).
+ senders = pmon:new(),
+ msg_id_to_channel = gb_trees:empty(),
+ status = running},
+ rabbit_event:init_stats_timer(State, #q.stats_timer).
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
@@ -182,7 +161,9 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName},
fun (BQS) ->
BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete doesn't return 'ok'.
- rabbit_amqqueue:internal_delete(QName, self()),
+ rabbit_event:if_enabled(State, #q.stats_timer,
+ fun() -> emit_stats(State) end),
+ rabbit_amqqueue:internal_delete(QName),
BQS1
end, State).
@@ -260,11 +241,12 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
[{<<"x-expires">>, fun init_expires/2},
{<<"x-dead-letter-exchange">>, fun init_dlx/2},
{<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2},
- {<<"x-message-ttl">>, fun init_ttl/2}]).
+ {<<"x-message-ttl">>, fun init_ttl/2},
+ {<<"x-max-length">>, fun init_max_length/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
-init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
+init_ttl(TTL, State) -> drop_expired_msgs(State#q{ttl = TTL}).
init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
State#q{dlx = rabbit_misc:r(QName, exchange, DLX)}.
@@ -272,13 +254,20 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
init_dlx_routing_key(RoutingKey, State) ->
State#q{dlx_routing_key = RoutingKey}.
+init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =
- stop_sync_timer(stop_rate_timer(State)),
+ lists:foldl(fun (F, S) -> F(S) end, State,
+ [fun stop_sync_timer/1,
+ fun stop_rate_timer/1,
+ fun stop_expiry_timer/1,
+ fun stop_ttl_timer/1]),
case BQS of
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
- [emit_consumer_deleted(Ch, CTag)
+ QName = qname(State),
+ [emit_consumer_deleted(Ch, CTag, QName)
|| {Ch, CTag, _} <- consumers(State1)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -308,36 +297,18 @@ backing_queue_module(Q) ->
true -> rabbit_mirror_queue_master
end.
-ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
- TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout),
- State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
- State.
+ rabbit_misc:ensure_timer(State, #q.sync_timer_ref,
+ ?SYNC_INTERVAL, sync_timeout).
-stop_sync_timer(State = #q{sync_timer_ref = undefined}) ->
- State;
-stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
- erlang:cancel_timer(TRef),
- State#q{sync_timer_ref = undefined}.
-
-ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
- TRef = erlang:send_after(
- ?RAM_DURATION_UPDATE_INTERVAL, self(), update_ram_duration),
- State#q{rate_timer_ref = TRef};
-ensure_rate_timer(State) ->
- State.
+stop_sync_timer(State) -> rabbit_misc:stop_timer(State, #q.sync_timer_ref).
-stop_rate_timer(State = #q{rate_timer_ref = undefined}) ->
- State;
-stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
- erlang:cancel_timer(TRef),
- State#q{rate_timer_ref = undefined}.
+ensure_rate_timer(State) ->
+ rabbit_misc:ensure_timer(State, #q.rate_timer_ref,
+ ?RAM_DURATION_UPDATE_INTERVAL,
+ update_ram_duration).
-stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) ->
- State;
-stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) ->
- erlang:cancel_timer(TRef),
- State#q{expiry_timer_ref = undefined}.
+stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #q.rate_timer_ref).
%% We wish to expire only when there are no consumers *and* the expiry
%% hasn't been refreshed (by queue.declare or basic.get) for the
@@ -347,17 +318,41 @@ ensure_expiry_timer(State = #q{expires = undefined}) ->
ensure_expiry_timer(State = #q{expires = Expires}) ->
case is_unused(State) of
true -> NewState = stop_expiry_timer(State),
- TRef = erlang:send_after(Expires, self(), maybe_expire),
- NewState#q{expiry_timer_ref = TRef};
+ rabbit_misc:ensure_timer(NewState, #q.expiry_timer_ref,
+ Expires, maybe_expire);
false -> State
end.
+stop_expiry_timer(State) -> rabbit_misc:stop_timer(State, #q.expiry_timer_ref).
+
+ensure_ttl_timer(undefined, State) ->
+ State;
+ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
+ After = (case Expiry - now_micros() of
+ V when V > 0 -> V + 999; %% always fire later
+ _ -> 0
+ end) div 1000,
+ TRef = erlang:send_after(After, self(), drop_expired),
+ State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
+ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
+ ttl_timer_expiry = TExpiry})
+ when Expiry + 1000 < TExpiry ->
+ case erlang:cancel_timer(TRef) of
+ false -> State;
+ _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
+ end;
+ensure_ttl_timer(_Expiry, State) ->
+ State.
+
+stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref).
+
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
-assert_invariant(#q{active_consumers = AC,
- backing_queue = BQ, backing_queue_state = BQS}) ->
- true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
+assert_invariant(State = #q{active_consumers = AC}) ->
+ true = (queue:is_empty(AC) orelse is_empty(State)).
+
+is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -365,17 +360,17 @@ lookup_ch(ChPid) ->
C -> C
end.
-ch_record(ChPid) ->
+ch_record(ChPid, LimiterPid) ->
Key = {ch, ChPid},
case get(Key) of
undefined -> MonitorRef = erlang:monitor(process, ChPid),
+ Limiter = rabbit_limiter:client(LimiterPid),
C = #cr{ch_pid = ChPid,
monitor_ref = MonitorRef,
- acktags = sets:new(),
+ acktags = queue:new(),
consumer_count = 0,
blocked_consumers = queue:new(),
- is_limit_active = false,
- limiter = rabbit_limiter:make_token(),
+ limiter = Limiter,
unsent_message_count = 0},
put(Key, C),
C;
@@ -385,9 +380,9 @@ ch_record(ChPid) ->
update_ch_record(C = #cr{consumer_count = ConsumerCount,
acktags = ChAckTags,
unsent_message_count = UnsentMessageCount}) ->
- case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of
- {0, 0, 0} -> ok = erase_ch_record(C);
- _ -> ok = store_ch_record(C)
+ case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
+ {true, 0, 0} -> ok = erase_ch_record(C);
+ _ -> ok = store_ch_record(C)
end,
C.
@@ -395,37 +390,32 @@ store_ch_record(C = #cr{ch_pid = ChPid}) ->
put({ch, ChPid}, C),
ok.
-erase_ch_record(#cr{ch_pid = ChPid,
- limiter = Limiter,
- monitor_ref = MonitorRef}) ->
- ok = rabbit_limiter:unregister(Limiter, self()),
+erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) ->
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
ok.
-update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) ->
- ok = rabbit_limiter:register(Limiter, self()),
- update_ch_record(C#cr{consumer_count = 1});
-update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) ->
- ok = rabbit_limiter:unregister(Limiter, self()),
- update_ch_record(C#cr{consumer_count = 0,
- limiter = rabbit_limiter:make_token()});
-update_consumer_count(C = #cr{consumer_count = Count}, Delta) ->
- update_ch_record(C#cr{consumer_count = Count + Delta}).
-
all_ch_record() -> [C || {{ch, _}, C} <- get()].
block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}).
-is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
- Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
+is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
+ Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
+
+maybe_send_drained(WasEmpty, State) ->
+ case (not WasEmpty) andalso is_empty(State) of
+ true -> [send_drained(C) || C <- all_ch_record()];
+ false -> ok
+ end,
+ State.
-ch_record_state_transition(OldCR, NewCR) ->
- case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of
- {true, false} -> unblock;
- {false, true} -> block;
- {_, _} -> ok
+send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
+ case rabbit_limiter:drained(Limiter) of
+ {[], Limiter} -> ok;
+ {CTagCredit, Limiter2} -> rabbit_channel:send_drained(
+ ChPid, CTagCredit),
+ update_ch_record(C#cr{limiter = Limiter2})
end.
deliver_msgs_to_consumers(_DeliverFun, true, State) ->
@@ -443,18 +433,21 @@ deliver_msgs_to_consumers(DeliverFun, false,
end.
deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
- C = ch_record(ChPid),
+ C = lookup_ch(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
{false, State};
- false -> case rabbit_limiter:can_send(C#cr.limiter, self(),
- Consumer#consumer.ack_required) of
- false -> block_consumer(C#cr{is_limit_active = true}, E),
- {false, State};
- true -> AC1 = queue:in(E, State#q.active_consumers),
- deliver_msg_to_consumer(
- DeliverFun, Consumer, C,
- State#q{active_consumers = AC1})
+ false -> case rabbit_limiter:can_send(C#cr.limiter,
+ Consumer#consumer.ack_required,
+ Consumer#consumer.tag) of
+ {suspend, Limiter} ->
+ block_consumer(C#cr{limiter = Limiter}, E),
+ {false, State};
+ {continue, Limiter} ->
+ AC1 = queue:in(E, State#q.active_consumers),
+ deliver_msg_to_consumer(
+ DeliverFun, Consumer, C#cr{limiter = Limiter},
+ State#q{active_consumers = AC1})
end
end.
@@ -470,7 +463,7 @@ deliver_msg_to_consumer(DeliverFun,
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
- true -> sets:add_element(AckTag, ChAckTags);
+ true -> queue:in(AckTag, ChAckTags);
false -> ChAckTags
end,
update_ch_record(C#cr{acktags = ChAckTags1,
@@ -478,10 +471,8 @@ deliver_msg_to_consumer(DeliverFun,
{Stop, State1}.
deliver_from_queue_deliver(AckRequired, State) ->
- {{Message, IsDelivered, AckTag, _Remaining},
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} =
- fetch(AckRequired, State),
- {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State1}.
+ {Result, State1} = fetch(AckRequired, State),
+ {Result, is_empty(State1), State1}.
confirm_messages([], State) ->
State;
@@ -517,25 +508,26 @@ send_or_record_confirm(#delivery{sender = SenderPid,
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
{immediately, State}.
-discard(#delivery{sender = SenderPid, message = #basic_message{id = MsgId}},
- State) ->
- %% fake an 'eventual' confirm from BQ; noop if not needed
+discard(#delivery{sender = SenderPid,
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message{id = MsgId}}, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- confirm_messages([MsgId], State),
+ case MsgSeqNo of
+ undefined -> State;
+ _ -> confirm_messages([MsgId], State)
+ end,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
State1#q{backing_queue_state = BQS1}.
run_message_queue(State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_messages(State),
- {_IsEmpty1, State2} = deliver_msgs_to_consumers(
+ {_IsEmpty1, State1} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
- BQ:is_empty(BQS), State1),
- State2.
+ is_empty(State), State),
+ State1.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
- Props = #message_properties{delivered = Delivered},
- State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ Props, Delivered, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
deliver_msgs_to_consumers(
@@ -557,28 +549,65 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
Delivered, State) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
- Props = message_properties(Message, Confirm, Delivered, State),
- case attempt_delivery(Delivery, Props, State1) of
+ Props = message_properties(Message, Confirm, State),
+ case attempt_delivery(Delivery, Props, Delivered, State1) of
{true, State2} ->
State2;
%% The next one is an optimisation
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
- ensure_ttl_timer(Props#message_properties.expiry,
- State2#q{backing_queue_state = BQS1})
+ BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
+ {Dropped, State3 = #q{backing_queue_state = BQS2}} =
+ maybe_drop_head(State2#q{backing_queue_state = BQS1}),
+ QLen = BQ:len(BQS2),
+ %% optimisation: it would be perfectly safe to always
+ %% invoke drop_expired_msgs here, but that is expensive so
+ %% we only do that if a new message that might have an
+ %% expiry ends up at the head of the queue. If the head
+ %% remains unchanged, or if the newly published message
+ %% has no expiry and becomes the head of the queue then
+ %% the call is unnecessary.
+ case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of
+ {false, false, _} -> State3;
+ {true, true, undefined} -> State3;
+ {_, _, _} -> drop_expired_msgs(State3)
+ end
+ end.
+
+maybe_drop_head(State = #q{max_length = undefined}) ->
+ {0, State};
+maybe_drop_head(State = #q{max_length = MaxLen,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ case BQ:len(BQS) - MaxLen of
+ Excess when Excess > 0 ->
+ {Excess,
+ with_dlx(
+ State#q.dlx,
+ fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end,
+ fun () ->
+ {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) ->
+ BQ:drop(false, BQS0)
+ end, {ok, BQS},
+ lists:seq(1, Excess)),
+ State#q{backing_queue_state = BQS1}
+ end)};
+ _ -> {0, State}
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
+ WasEmpty = BQ:is_empty(BQS),
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- run_message_queue(State#q{backing_queue_state = BQS1}).
+ {_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}),
+ run_message_queue(maybe_send_drained(WasEmpty, drop_expired_msgs(State1))).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
- {Result, drop_expired_messages(State#q{backing_queue_state = BQS1})}.
+ State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}),
+ {Result, maybe_send_drained(Result =:= empty, State1)}.
ack(AckTags, ChPid, State) ->
subtract_acks(ChPid, AckTags, State,
@@ -597,9 +626,9 @@ remove_consumer(ChPid, ConsumerTag, Queue) ->
(CP /= ChPid) or (CTag /= ConsumerTag)
end, Queue).
-remove_consumers(ChPid, Queue) ->
+remove_consumers(ChPid, Queue, QName) ->
queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
- emit_consumer_deleted(ChPid, CTag),
+ emit_consumer_deleted(ChPid, CTag, QName),
false;
(_) ->
true
@@ -607,20 +636,29 @@ remove_consumers(ChPid, Queue) ->
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
- not_found ->
+ not_found -> State;
+ C -> C1 = Update(C),
+ case is_ch_blocked(C) andalso not is_ch_blocked(C1) of
+ false -> update_ch_record(C1),
+ State;
+ true -> unblock(State, C1)
+ end
+ end.
+
+unblock(State, C = #cr{limiter = Limiter}) ->
+ case lists:partition(
+ fun({_ChPid, #consumer{tag = CTag}}) ->
+ rabbit_limiter:is_consumer_blocked(Limiter, CTag)
+ end, queue:to_list(C#cr.blocked_consumers)) of
+ {_, []} ->
+ update_ch_record(C),
State;
- C ->
- C1 = Update(C),
- case ch_record_state_transition(C, C1) of
- ok -> update_ch_record(C1),
- State;
- unblock -> #cr{blocked_consumers = Consumers} = C1,
- update_ch_record(
- C1#cr{blocked_consumers = queue:new()}),
- AC1 = queue:join(State#q.active_consumers,
- Consumers),
- run_message_queue(State#q{active_consumers = AC1})
- end
+ {Blocked, Unblocked} ->
+ BlockedQ = queue:from_list(Blocked),
+ UnblockedQ = queue:from_list(Unblocked),
+ update_ch_record(C#cr{blocked_consumers = BlockedQ}),
+ AC1 = queue:join(State#q.active_consumers, UnblockedQ),
+ run_message_queue(State#q{active_consumers = AC1})
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
@@ -640,7 +678,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = Blocked} ->
- _ = remove_consumers(ChPid, Blocked), %% for stats emission
+ QName = qname(State),
+ _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission
ok = erase_ch_record(C),
State1 = State#q{
exclusive_consumer = case Holder of
@@ -648,11 +687,12 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers),
+ ChPid, State#q.active_consumers,
+ QName),
senders = Senders1},
case should_auto_delete(State1) of
true -> {stop, State1};
- false -> {ok, requeue_and_run(sets:to_list(ChAckTags),
+ false -> {ok, requeue_and_run(queue:to_list(ChAckTags),
ensure_expiry_timer(State1))}
end
end.
@@ -667,13 +707,8 @@ check_exclusive_access(none, true, State) ->
false -> in_use
end.
-consumer_count() -> consumer_count(fun (_) -> false end).
-
-active_consumer_count() -> consumer_count(fun is_ch_blocked/1).
-
-consumer_count(Exclude) ->
- lists:sum([Count || C = #cr{consumer_count = Count} <- all_ch_record(),
- not Exclude(C)]).
+consumer_count() ->
+ lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
is_unused(_State) -> consumer_count() == 0.
@@ -691,15 +726,24 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
not_found ->
State;
C = #cr{acktags = ChAckTags} ->
- update_ch_record(C#cr{acktags = lists:foldl(fun sets:del_element/2,
- ChAckTags, AckTags)}),
+ update_ch_record(
+ C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}),
Fun(State)
end.
-message_properties(Message, Confirm, Delivered, #q{ttl = TTL}) ->
+subtract_acks([], [], AckQ) ->
+ AckQ;
+subtract_acks([], Prefix, AckQ) ->
+ queue:join(queue:from_list(lists:reverse(Prefix)), AckQ);
+subtract_acks([T | TL] = AckTags, Prefix, AckQ) ->
+ case queue:out(AckQ) of
+ {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail);
+ {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail)
+ end.
+
+message_properties(Message, Confirm, #q{ttl = TTL}) ->
#message_properties{expiry = calculate_msg_expiry(Message, TTL),
- needs_confirming = Confirm == eventually,
- delivered = Delivered}.
+ needs_confirming = Confirm == eventually}.
calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
#content{properties = Props} =
@@ -711,103 +755,89 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
-drop_expired_messages(State = #q{dlx = DLX,
- backing_queue_state = BQS,
- backing_queue = BQ }) ->
- Now = now_micros(),
+%% Logically this function should invoke maybe_send_drained/2.
+%% However, that is expensive. Since some frequent callers of
+%% drop_expired_msgs/1, in particular deliver_or_enqueue/3, cannot
+%% possibly cause the queue to become empty, we push the
+%% responsibility to the callers. So be cautious when adding new ones.
+drop_expired_msgs(State) ->
+ case is_empty(State) of
+ true -> State;
+ false -> drop_expired_msgs(now_micros(), State)
+ end.
+
+drop_expired_msgs(Now, State = #q{backing_queue_state = BQS,
+ backing_queue = BQ }) ->
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
- {Props, BQS1} = case DLX of
- undefined -> {Next, undefined, BQS2} =
- BQ:dropwhile(ExpirePred, false, BQS),
- {Next, BQS2};
- _ -> {Next, Msgs, BQS2} =
- BQ:dropwhile(ExpirePred, true, BQS),
- DLXFun = dead_letter_fun(expired),
- DLXFun(Msgs),
- {Next, BQS2}
- end,
+ {Props, State1} =
+ with_dlx(
+ State#q.dlx,
+ fun (X) -> dead_letter_expired_msgs(ExpirePred, X, State) end,
+ fun () -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS),
+ {Next, State#q{backing_queue_state = BQS1}} end),
ensure_ttl_timer(case Props of
- undefined -> undefined;
- #message_properties{expiry = Exp} -> Exp
- end, State#q{backing_queue_state = BQS1}).
-
-ensure_ttl_timer(undefined, State) ->
- State;
-ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
- After = (case Expiry - now_micros() of
- V when V > 0 -> V + 999; %% always fire later
- _ -> 0
- end) div 1000,
- TRef = erlang:send_after(After, self(), drop_expired),
- State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
-ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
- ttl_timer_expiry = TExpiry})
- when Expiry + 1000 < TExpiry ->
- case erlang:cancel_timer(TRef) of
- false -> State;
- _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
- end;
-ensure_ttl_timer(_Expiry, State) ->
- State.
-
-dead_letter_fun(Reason) ->
- fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end.
-
-dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
- DLMsg = make_dead_letter_msg(Reason, Msg, State),
- Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo),
+ undefined -> undefined;
+ #message_properties{expiry = Exp} -> Exp
+ end, State1).
+
+with_dlx(undefined, _With, Without) -> Without();
+with_dlx(DLX, With, Without) -> case rabbit_exchange:lookup(DLX) of
+ {ok, X} -> With(X);
+ {error, not_found} -> Without()
+ end.
+
+dead_letter_expired_msgs(ExpirePred, X, State = #q{backing_queue = BQ}) ->
+ dead_letter_msgs(fun (DLFun, Acc, BQS1) ->
+ BQ:fetchwhile(ExpirePred, DLFun, Acc, BQS1)
+ end, expired, X, State).
+
+dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
+ {ok, State1} =
+ dead_letter_msgs(
+ fun (DLFun, Acc, BQS) ->
+ {Acc1, BQS1} = BQ:ackfold(DLFun, Acc, BQS, AckTags),
+ {ok, Acc1, BQS1}
+ end, rejected, X, State),
+ State1.
+
+dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) ->
+ {ok, State1} =
+ dead_letter_msgs(
+ fun (DLFun, Acc, BQS) ->
+ lists:foldl(fun (_, {ok, Acc0, BQS0}) ->
+ {{Msg, _, AckTag}, BQS1} =
+ BQ:fetch(true, BQS0),
+ {ok, DLFun(Msg, AckTag, Acc0), BQS1}
+ end, {ok, Acc, BQS}, lists:seq(1, Excess))
+ end, maxlen, X, State),
+ State1.
+
+dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
+ backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ QName = qname(State),
+ {Res, Acks1, BQS1} =
+ Fun(fun (Msg, AckTag, Acks) ->
+ dead_letter_publish(Msg, Reason, X, RK, QName),
+ [AckTag | Acks]
+ end, [], BQS),
+ {_Guids, BQS2} = BQ:ack(Acks1, BQS1),
+ {Res, State#q{backing_queue_state = BQS2}}.
+
+dead_letter_publish(Msg, Reason, X, RK, QName) ->
+ DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName),
+ Delivery = rabbit_basic:delivery(false, DLMsg, undefined),
{Queues, Cycles} = detect_dead_letter_cycles(
DLMsg, rabbit_exchange:route(X, Delivery)),
lists:foreach(fun log_cycle_once/1, Cycles),
- {_, DeliveredQPids} = rabbit_amqqueue:deliver(
- rabbit_amqqueue:lookup(Queues), Delivery),
- DeliveredQPids.
-
-handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
- unconfirmed = UC}) ->
- case pmon:is_monitored(QPid, QMons) of
- false -> noreply(State);
- true -> case rabbit_misc:is_abnormal_exit(Reason) of
- true -> {Lost, _UC1} = dtree:take_all(QPid, UC),
- QNameS = rabbit_misc:rs(qname(State)),
- rabbit_log:warning("DLQ ~p for ~s died with "
- "~p unconfirmed messages~n",
- [QPid, QNameS, length(Lost)]);
- false -> ok
- end,
- {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
- cleanup_after_confirm(
- [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
- State#q{queue_monitors = pmon:erase(QPid, QMons),
- unconfirmed = UC1})
- end.
+ rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery),
+ ok.
-stop(State) -> stop(undefined, noreply, State).
+stop(State) -> stop(noreply, State).
-stop(From, Reply, State = #q{unconfirmed = UC}) ->
- case {dtree:is_empty(UC), Reply} of
- {true, noreply} ->
- {stop, normal, State};
- {true, _} ->
- {stop, normal, Reply, State};
- {false, _} ->
- noreply(State#q{delayed_stop = {From, Reply}})
- end.
+stop(noreply, State) -> {stop, normal, State};
+stop(Reply, State) -> {stop, normal, Reply, State}.
-cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
- unconfirmed = UC,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
- State1 = State#q{backing_queue_state = BQS1},
- case dtree:is_empty(UC) andalso DS =/= undefined of
- true -> case DS of
- {_, noreply} -> ok;
- {From, Reply} -> gen_server2:reply(From, Reply)
- end,
- {stop, normal, State1};
- false -> noreply(State1)
- end.
detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
#content{properties = #'P_basic'{headers = Headers}} =
@@ -836,19 +866,16 @@ detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
end
end.
-make_dead_letter_msg(Reason,
- Msg = #basic_message{content = Content,
+make_dead_letter_msg(Msg = #basic_message{content = Content,
exchange_name = Exchange,
routing_keys = RoutingKeys},
- State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) ->
+ Reason, DLX, RK, #resource{name = QName}) ->
{DeathRoutingKeys, HeadersFun1} =
- case DlxRoutingKey of
+ case RK of
undefined -> {RoutingKeys, fun (H) -> H end};
- _ -> {[DlxRoutingKey],
- fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
+ _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
end,
ReasonBin = list_to_binary(atom_to_list(Reason)),
- #resource{name = QName} = qname(State),
TimeSec = rabbit_misc:now_ms() div 1000,
HeadersFun2 =
fun (Headers) ->
@@ -899,14 +926,12 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:len(BQS);
i(messages_unacknowledged, _) ->
- lists:sum([sets:size(C#cr.acktags) || C <- all_ch_record()]);
+ lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]);
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
messages_unacknowledged]]);
i(consumers, _) ->
consumer_count();
-i(active_consumers, _) ->
- active_consumer_count();
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -924,6 +949,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
+i(status, #q{status = Status}) ->
+ Status;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
i(Item, _) ->
@@ -945,23 +972,23 @@ emit_stats(State) ->
emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
-emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) ->
+emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired, QName) ->
rabbit_event:notify(consumer_created,
[{consumer_tag, ConsumerTag},
{exclusive, Exclusive},
{ack_required, AckRequired},
{channel, ChPid},
- {queue, self()}]).
+ {queue, QName}]).
-emit_consumer_deleted(ChPid, ConsumerTag) ->
+emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
rabbit_event:notify(consumer_deleted,
[{consumer_tag, ConsumerTag},
{channel, ChPid},
- {queue, self()}]).
+ {queue, QName}]).
%%----------------------------------------------------------------------------
-prioritise_call(Msg, _From, _State) ->
+prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
info -> 9;
{info, _Items} -> 9;
@@ -970,7 +997,7 @@ prioritise_call(Msg, _From, _State) ->
_ -> 0
end.
-prioritise_cast(Msg, _State) ->
+prioritise_cast(Msg, _Len, _State) ->
case Msg of
delete_immediately -> 8;
{set_ram_duration_target, _Duration} -> 8;
@@ -979,7 +1006,7 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
-prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
+prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
case Msg of
{'DOWN', _, process, DownPid, _} -> 8;
update_ram_duration -> 8;
@@ -990,9 +1017,6 @@ prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
_ -> 0
end.
-handle_call(_, _, State = #q{delayed_stop = DS}) when DS =/= undefined ->
- noreply(State);
-
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
declare(Recover, From, State);
@@ -1032,80 +1056,95 @@ handle_call({deliver, Delivery, Delivered}, From, State) ->
gen_server2:reply(From, ok),
noreply(deliver_or_enqueue(Delivery, Delivered, State));
-handle_call({notify_down, ChPid}, From, State) ->
+handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
%% are no longer visible by the time we send a response to the
%% client. The queue is ultimately deleted in terminate/2; if we
%% return stop with a reply, terminate/2 will be called by
- %% gen_server2 *before* the reply is sent. FIXME: in case of a
- %% delayed stop the reply is sent earlier.
+ %% gen_server2 *before* the reply is sent.
case handle_ch_down(ChPid, State) of
{ok, State1} -> reply(ok, State1);
- {stop, State1} -> stop(From, ok, State1)
+ {stop, State1} -> stop(ok, State1)
end;
-handle_call({basic_get, ChPid, NoAck}, _From,
+handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case fetch(AckRequired, drop_expired_messages(State1)) of
+ case fetch(AckRequired, State1) of
{empty, State2} ->
reply(empty, State2);
- {{Message, IsDelivered, AckTag, Remaining}, State2} ->
- State3 =
+ {{Message, IsDelivered, AckTag}, State2} ->
+ State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
case AckRequired of
- true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- ChAckTags1 = sets:add_element(AckTag, ChAckTags),
+ true -> C = #cr{acktags = ChAckTags} =
+ ch_record(ChPid, LimiterPid),
+ ChAckTags1 = queue:in(AckTag, ChAckTags),
update_ch_record(C#cr{acktags = ChAckTags1}),
State2;
false -> State2
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State3)
+ reply({ok, BQ:len(BQS), Msg}, State3)
end;
-handle_call({basic_consume, NoAck, ChPid, Limiter,
- ConsumerTag, ExclusiveConsume, OkMsg},
- _From, State = #q{exclusive_consumer = ExistingHolder}) ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume,
- State) of
+handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
+ ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg},
+ _From, State = #q{exclusive_consumer = Holder}) ->
+ case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
- C = ch_record(ChPid),
- C1 = update_consumer_count(C#cr{limiter = Limiter}, +1),
+ C = #cr{consumer_count = Count,
+ limiter = Limiter} = ch_record(ChPid, LimiterPid),
+ Limiter1 = case LimiterActive of
+ true -> rabbit_limiter:activate(Limiter);
+ false -> Limiter
+ end,
+ Limiter2 = case CreditArgs of
+ none -> Limiter1;
+ {Crd, Drain} -> rabbit_limiter:credit(
+ Limiter1, ConsumerTag, Crd, Drain)
+ end,
+ C1 = update_ch_record(C#cr{consumer_count = Count + 1,
+ limiter = Limiter2}),
+ case is_empty(State) of
+ true -> send_drained(C1);
+ false -> ok
+ end,
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
+ true -> Holder
end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
- E = {ChPid, Consumer},
- State2 =
- case is_ch_blocked(C1) of
- true -> block_consumer(C1, E),
- State1;
- false -> update_ch_record(C1),
- AC1 = queue:in(E, State1#q.active_consumers),
- run_message_queue(State1#q{active_consumers = AC1})
- end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck),
- reply(ok, State2)
+ not NoAck, qname(State1)),
+ AC1 = queue:in({ChPid, Consumer}, State1#q.active_consumers),
+ reply(ok, run_message_queue(State1#q{active_consumers = AC1}))
end;
-handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
+handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
State = #q{exclusive_consumer = Holder}) ->
ok = maybe_send_reply(ChPid, OkMsg),
case lookup_ch(ChPid) of
not_found ->
reply(ok, State);
- C = #cr{blocked_consumers = Blocked} ->
- emit_consumer_deleted(ChPid, ConsumerTag),
+ C = #cr{consumer_count = Count,
+ limiter = Limiter,
+ blocked_consumers = Blocked} ->
+ emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
- update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1),
+ Limiter1 = case Count of
+ 1 -> rabbit_limiter:deactivate(Limiter);
+ _ -> Limiter
+ end,
+ Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag),
+ update_ch_record(C#cr{consumer_count = Count - 1,
+ limiter = Limiter2,
+ blocked_consumers = Blocked1}),
State1 = State#q{
exclusive_consumer = case Holder of
{ChPid, ConsumerTag} -> none;
@@ -1113,65 +1152,83 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
end,
active_consumers = remove_consumer(
ChPid, ConsumerTag,
- State#q.active_consumers)},
+ State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
- true -> stop(From, ok, State1)
+ true -> stop(ok, State1)
end
end;
handle_call(stat, _From, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_messages(ensure_expiry_timer(State)),
- reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
+ ensure_expiry_timer(State),
+ reply({ok, BQ:len(BQS), consumer_count()}, State1);
-handle_call({delete, IfUnused, IfEmpty}, From,
+handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
- IsEmpty = BQ:is_empty(BQS),
+ IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
if
- IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
- IfUnused and not(IsUnused) -> reply({error, in_use}, State);
- true -> stop(From, {ok, BQ:len(BQS)}, State)
+ IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
+ IfUnused and not(IsUnused) -> reply({error, in_use}, State);
+ true -> stop({ok, BQ:len(BQS)}, State)
end;
handle_call(purge, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Count, BQS1} = BQ:purge(BQS),
- reply({ok, Count}, State#q{backing_queue_state = BQS1});
+ State1 = State#q{backing_queue_state = BQS1},
+ reply({ok, Count}, maybe_send_drained(Count =:= 0, State1));
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
noreply(requeue(AckTags, ChPid, State));
+handle_call(sync_mirrors, _From,
+ State = #q{backing_queue = rabbit_mirror_queue_master,
+ backing_queue_state = BQS}) ->
+ S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end,
+ HandleInfo = fun (Status) ->
+ receive {'$gen_call', From, {info, Items}} ->
+ Infos = infos(Items, State#q{status = Status}),
+ gen_server2:reply(From, {ok, Infos})
+ after 0 ->
+ ok
+ end
+ end,
+ EmitStats = fun (Status) ->
+ rabbit_event:if_enabled(
+ State, #q.stats_timer,
+ fun() -> emit_stats(State#q{status = Status}) end)
+ end,
+ case rabbit_mirror_queue_master:sync_mirrors(HandleInfo, EmitStats, BQS) of
+ {ok, BQS1} -> reply(ok, S(BQS1));
+ {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)}
+ end;
+
+handle_call(sync_mirrors, _From, State) ->
+ reply({error, not_mirrored}, State);
+
+%% By definition if we get this message here we do not have to do anything.
+handle_call(cancel_sync_mirrors, _From, State) ->
+ reply({ok, not_syncing}, State);
+
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
+ QName = qname(State),
case Exclusive of
- none -> [emit_consumer_created(Ch, CTag, false, AckRequired) ||
+ none -> [emit_consumer_created(
+ Ch, CTag, false, AckRequired, QName) ||
{Ch, CTag, AckRequired} <- consumers(State)];
{Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
- emit_consumer_created(Ch, CTag, true, AckRequired)
+ emit_consumer_created(Ch, CTag, true, AckRequired, QName)
end,
reply(ok, State).
-handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
- {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC),
- State1 = case dtree:is_defined(QPid, UC1) of
- false -> QMons = State#q.queue_monitors,
- State#q{queue_monitors = pmon:demonitor(QPid, QMons)};
- true -> State
- end,
- cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
- State1#q{unconfirmed = UC1});
-
-handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
- noreply(State);
-
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- noreply(run_message_queue(
- State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}));
+ noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State = #q{senders = Senders}) ->
@@ -1190,27 +1247,25 @@ handle_cast({ack, AckTags, ChPid}, State) ->
handle_cast({reject, AckTags, true, ChPid}, State) ->
noreply(requeue(AckTags, ChPid, State));
-handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = undefined}) ->
- noreply(ack(AckTags, ChPid, State));
-
handle_cast({reject, AckTags, false, ChPid}, State) ->
- DLXFun = dead_letter_fun(rejected),
- noreply(subtract_acks(
- ChPid, AckTags, State,
- fun (State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- BQS1 = BQ:fold(fun(M, A) -> DLXFun([{M, A}]) end,
- BQS, AckTags),
- State1#q{backing_queue_state = BQS1}
- end));
+ noreply(with_dlx(
+ State#q.dlx,
+ fun (X) -> subtract_acks(ChPid, AckTags, State,
+ fun (State1) ->
+ dead_letter_rejected_msgs(
+ AckTags, X, State1)
+ end) end,
+ fun () -> ack(AckTags, ChPid, State) end));
handle_cast(delete_immediately, State) ->
stop(State);
-handle_cast({unblock, ChPid}, State) ->
+handle_cast({resume, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,
- fun (C) -> C#cr{is_limit_active = false} end));
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:resume(Limiter)}
+ end));
handle_cast({notify_sent, ChPid, Credit}, State) ->
noreply(
@@ -1219,21 +1274,12 @@ handle_cast({notify_sent, ChPid, Credit}, State) ->
C#cr{unsent_message_count = Count - Credit}
end));
-handle_cast({limit, ChPid, Limiter}, State) ->
+handle_cast({activate_limit, ChPid}, State) ->
noreply(
- possibly_unblock(
- State, ChPid,
- fun (C = #cr{consumer_count = ConsumerCount,
- limiter = OldLimiter,
- is_limit_active = OldLimited}) ->
- case (ConsumerCount =/= 0 andalso
- not rabbit_limiter:is_enabled(OldLimiter)) of
- true -> ok = rabbit_limiter:register(Limiter, self());
- false -> ok
- end,
- Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter),
- C#cr{limiter = Limiter, is_limit_active = Limited}
- end));
+ possibly_unblock(State, ChPid,
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:activate(Limiter)}
+ end));
handle_cast({flush, ChPid}, State) ->
ok = rabbit_channel:flushed(ChPid, self()),
@@ -1248,31 +1294,6 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
- case rabbit_exchange:lookup(XName) of
- {ok, X} ->
- {AckImmediately, State2} =
- lists:foldl(
- fun({Msg, AckTag},
- {Acks, State1 = #q{publish_seqno = SeqNo,
- unconfirmed = UC,
- queue_monitors = QMons}}) ->
- case dead_letter_publish(Msg, Reason, X, State1) of
- [] -> {[AckTag | Acks], State1};
- QPids -> UC1 = dtree:insert(
- SeqNo, QPids, AckTag, UC),
- QMons1 = pmon:monitor_all(QPids, QMons),
- {Acks,
- State1#q{publish_seqno = SeqNo + 1,
- unconfirmed = UC1,
- queue_monitors = QMons1}}
- end
- end, {[], State}, Msgs),
- cleanup_after_confirm(AckImmediately, State2);
- {error, not_found} ->
- cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
- end;
-
handle_cast(start_mirroring, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
%% lookup again to get policy for init_with_existing_bq
@@ -1290,18 +1311,27 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
noreply(State#q{backing_queue = BQ1,
backing_queue_state = BQS1});
+handle_cast({credit, ChPid, CTag, Credit, Drain},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ Len = BQ:len(BQS),
+ rabbit_channel:send_credit_reply(ChPid, Len),
+ C = #cr{limiter = Limiter} = lookup_ch(ChPid),
+ C1 = C#cr{limiter = rabbit_limiter:credit(Limiter, CTag, Credit, Drain)},
+ noreply(case Drain andalso Len == 0 of
+ true -> update_ch_record(C1),
+ send_drained(C1),
+ State;
+ false -> case is_ch_blocked(C1) of
+ true -> update_ch_record(C1),
+ State;
+ false -> unblock(State, C1)
+ end
+ end);
+
handle_cast(wake_up, State) ->
noreply(State).
-%% We need to not ignore this as we need to remove outstanding
-%% confirms due to queue death.
-handle_info({'DOWN', _MonitorRef, process, DownPid, Reason},
- State = #q{delayed_stop = DS}) when DS =/= undefined ->
- handle_queue_down(DownPid, Reason, State);
-
-handle_info(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
- noreply(State);
-
handle_info(maybe_expire, State) ->
case is_unused(State) of
true -> stop(State);
@@ -1309,7 +1339,9 @@ handle_info(maybe_expire, State) ->
end;
handle_info(drop_expired, State) ->
- noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
+ WasEmpty = is_empty(State),
+ State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}),
+ noreply(maybe_send_drained(WasEmpty, State1));
handle_info(emit_stats, State) ->
emit_stats(State),
@@ -1328,9 +1360,9 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
%% unexpectedly.
stop(State);
-handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) ->
+handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
- {ok, State1} -> handle_queue_down(DownPid, Reason, State1);
+ {ok, State1} -> noreply(State1);
{stop, State1} -> stop(State1)
end;