summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl521
1 files changed, 315 insertions, 206 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a999fe58..2b0fe17e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1,32 +1,17 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
%%
-%% The Original Code is RabbitMQ.
+%% The Original Code is RabbitMQ.
%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue_process).
@@ -36,10 +21,11 @@
-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
--define(SYNC_INTERVAL, 5). %% milliseconds
+-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
--define(BASE_MESSAGE_PROPERTIES, #message_properties{expiry = undefined}).
+-define(BASE_MESSAGE_PROPERTIES,
+ #message_properties{expiry = undefined, needs_confirming = false}).
-export([start_link/1, info_keys/0]).
@@ -47,11 +33,7 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2, prioritise_info/2]).
--import(queue).
--import(erlang).
--import(lists).
-
-% Queue's state
+%% Queue's state
-record(q, {q,
exclusive_consumer,
has_had_consumers,
@@ -64,6 +46,7 @@
rate_timer_ref,
expiry_timer_ref,
stats_timer,
+ msg_id_to_channel,
ttl,
ttl_timer_ref
}).
@@ -128,7 +111,8 @@ init(Q) ->
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
ttl = undefined,
- stats_timer = rabbit_event:init_stats_timer()}, hibernate,
+ stats_timer = rabbit_event:init_stats_timer(),
+ msg_id_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -138,6 +122,8 @@ terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
terminate(_Reason, State = #q{backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
terminate_shutdown(fun (BQS) ->
+ rabbit_event:notify(
+ queue_deleted, [{pid, self()}]),
BQS1 = BQ:delete_and_terminate(BQS),
%% don't care if the internal delete
%% doesn't return 'ok'.
@@ -163,7 +149,7 @@ declare(Recover, From,
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
- BQS = BQ:init(QName, IsDurable, Recover),
+ BQS = bq_init(BQ, QName, IsDurable, Recover),
State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
@@ -173,6 +159,20 @@ declare(Recover, From,
Q1 -> {stop, normal, {existing, Q1}, State}
end.
+bq_init(BQ, QName, IsDurable, Recover) ->
+ Self = self(),
+ BQ:init(QName, IsDurable, Recover,
+ fun (Fun) ->
+ rabbit_amqqueue:run_backing_queue_async(Self, Fun)
+ end,
+ fun (Fun) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> error end,
+ fun () ->
+ rabbit_amqqueue:run_backing_queue(Self, Fun)
+ end)
+ end).
+
process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
lists:foldl(fun({Arg, Fun}, State1) ->
case rabbit_misc:table_lookup(Arguments, Arg) of
@@ -200,7 +200,8 @@ terminate_shutdown(Fun, State) ->
BQ:tx_rollback(Txn, BQSN),
BQSN1
end, BQS, all_ch_record()),
- rabbit_event:notify(queue_deleted, [{pid, self()}]),
+ [emit_consumer_deleted(Ch, CTag)
+ || {Ch, CTag, _} <- consumers(State1)],
State1#q{backing_queue_state = Fun(BQS1)}
end.
@@ -214,20 +215,20 @@ noreply(NewState) ->
{NewState1, Timeout} = next_state(NewState),
{noreply, NewState1, Timeout}.
-next_state(State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- ensure_rate_timer(State),
- State2 = ensure_stats_timer(State1),
- case BQ:needs_idle_timeout(BQS) of
- true -> {ensure_sync_timer(State2), 0};
- false -> {stop_sync_timer(State2), hibernate}
+next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ {MsgIds, BQS1} = BQ:drain_confirmed(BQS),
+ State1 = ensure_stats_timer(
+ ensure_rate_timer(
+ confirm_messages(MsgIds, State#q{
+ backing_queue_state = BQS1}))),
+ case BQ:needs_idle_timeout(BQS1) of
+ true -> {ensure_sync_timer(State1), 0};
+ false -> {stop_sync_timer(State1), hibernate}
end.
-ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) ->
+ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
{ok, TRef} = timer:apply_after(
- ?SYNC_INTERVAL,
- rabbit_amqqueue, maybe_run_queue_via_backing_queue,
- [self(), fun (BQS) -> BQ:idle_timeout(BQS) end]),
+ ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
State.
@@ -298,17 +299,16 @@ lookup_ch(ChPid) ->
ch_record(ChPid) ->
Key = {ch, ChPid},
case get(Key) of
- undefined ->
- MonitorRef = erlang:monitor(process, ChPid),
- C = #cr{consumer_count = 0,
- ch_pid = ChPid,
- monitor_ref = MonitorRef,
- acktags = sets:new(),
- is_limit_active = false,
- txn = none,
- unsent_message_count = 0},
- put(Key, C),
- C;
+ undefined -> MonitorRef = erlang:monitor(process, ChPid),
+ C = #cr{consumer_count = 0,
+ ch_pid = ChPid,
+ monitor_ref = MonitorRef,
+ acktags = sets:new(),
+ is_limit_active = false,
+ txn = none,
+ unsent_message_count = 0},
+ put(Key, C),
+ C;
C = #cr{} -> C
end.
@@ -334,25 +334,18 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
-all_ch_record() ->
- [C || {{ch, _}, C} <- get()].
+all_ch_record() -> [C || {{ch, _}, C} <- get()].
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
ch_record_state_transition(OldCR, NewCR) ->
- BlockedOld = is_ch_blocked(OldCR),
- BlockedNew = is_ch_blocked(NewCR),
- if BlockedOld andalso not(BlockedNew) -> unblock;
- BlockedNew andalso not(BlockedOld) -> block;
- true -> ok
+ case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of
+ {true, false} -> unblock;
+ {false, true} -> block;
+ {_, _} -> ok
end.
-record_current_channel_tx(ChPid, Txn) ->
- %% as a side effect this also starts monitoring the channel (if
- %% that wasn't happening already)
- store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
-
deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
State = #q{q = #amqqueue{name = QName},
active_consumers = ActiveConsumers,
@@ -373,11 +366,11 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
- ChAckTags1 = case AckRequired of
- true -> sets:add_element(
- AckTag, ChAckTags);
- false -> ChAckTags
- end,
+ ChAckTags1 =
+ case AckRequired of
+ true -> sets:add_element(AckTag, ChAckTags);
+ false -> ChAckTags
+ end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
true = maybe_store_ch_record(NewC),
@@ -385,13 +378,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
case ch_record_state_transition(C, NewC) of
ok -> {queue:in(QEntry, ActiveConsumersTail),
BlockedConsumers};
- block ->
- {ActiveConsumers1, BlockedConsumers1} =
- move_consumers(ChPid,
- ActiveConsumersTail,
- BlockedConsumers),
- {ActiveConsumers1,
- queue:in(QEntry, BlockedConsumers1)}
+ block -> {ActiveConsumers1, BlockedConsumers1} =
+ move_consumers(ChPid,
+ ActiveConsumersTail,
+ BlockedConsumers),
+ {ActiveConsumers1,
+ queue:in(QEntry, BlockedConsumers1)}
end,
State2 = State1#q{
active_consumers = NewActiveConsumers,
@@ -416,14 +408,66 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{FunAcc, State}
end.
-deliver_from_queue_pred(IsEmpty, _State) ->
- not IsEmpty.
+deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty.
deliver_from_queue_deliver(AckRequired, false, State) ->
{{Message, IsDelivered, AckTag, Remaining}, State1} =
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
+confirm_messages([], State) ->
+ State;
+confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
+ {CMs, MTC1} = lists:foldl(
+ fun(MsgId, {CMs, MTC0}) ->
+ case dict:find(MsgId, MTC0) of
+ {ok, {ChPid, MsgSeqNo}} ->
+ {gb_trees_cons(ChPid, MsgSeqNo, CMs),
+ dict:erase(MsgId, MTC0)};
+ _ ->
+ {CMs, MTC0}
+ end
+ end, {gb_trees:empty(), MTC}, MsgIds),
+ gb_trees_foreach(fun(ChPid, MsgSeqNos) ->
+ rabbit_channel:confirm(ChPid, MsgSeqNos)
+ end, CMs),
+ State#q{msg_id_to_channel = MTC1}.
+
+gb_trees_foreach(_, none) ->
+ ok;
+gb_trees_foreach(Fun, {Key, Val, It}) ->
+ Fun(Key, Val),
+ gb_trees_foreach(Fun, gb_trees:next(It));
+gb_trees_foreach(Fun, Tree) ->
+ gb_trees_foreach(Fun, gb_trees:next(gb_trees:iterator(Tree))).
+
+gb_trees_cons(Key, Value, Tree) ->
+ case gb_trees:lookup(Key, Tree) of
+ {value, Values} -> gb_trees:update(Key, [Value | Values], Tree);
+ none -> gb_trees:insert(Key, [Value], Tree)
+ end.
+
+should_confirm_message(#delivery{msg_seq_no = undefined}, _State) ->
+ never;
+should_confirm_message(#delivery{sender = ChPid,
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message {
+ is_persistent = true,
+ id = MsgId}},
+ #q{q = #amqqueue{durable = true}}) ->
+ {eventually, ChPid, MsgSeqNo, MsgId};
+should_confirm_message(_Delivery, _State) ->
+ immediately.
+
+needs_confirming({eventually, _, _, _}) -> true;
+needs_confirming(_) -> false.
+
+maybe_record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId},
+ State = #q{msg_id_to_channel = MTC}) ->
+ State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)};
+maybe_record_confirm_message(_Confirm, State) ->
+ State.
+
run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
fun deliver_from_queue_deliver/3},
@@ -433,7 +477,16 @@ run_message_queue(State) ->
{_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1),
State2.
-attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
+attempt_delivery(#delivery{txn = none,
+ sender = ChPid,
+ message = Message,
+ msg_seq_no = MsgSeqNo} = Delivery,
+ State = #q{backing_queue = BQ}) ->
+ Confirm = should_confirm_message(Delivery, State),
+ case Confirm of
+ immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
+ _ -> ok
+ end,
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
@@ -441,36 +494,45 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
%% not being enqueued, so we use an empty
%% message_properties.
{AckTag, BQS1} =
- BQ:publish_delivered(AckRequired, Message,
- ?BASE_MESSAGE_PROPERTIES, BQS),
+ BQ:publish_delivered(
+ AckRequired, Message,
+ (?BASE_MESSAGE_PROPERTIES)#message_properties{
+ needs_confirming = needs_confirming(Confirm)},
+ BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
- deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
-attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- record_current_channel_tx(ChPid, Txn),
- {true,
- State#q{backing_queue_state =
- BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}.
-
-deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
- case attempt_delivery(Txn, ChPid, Message, State) of
- {true, NewState} ->
- {true, NewState};
- {false, NewState} ->
- %% Txn is none and no unblocked channels with consumers
- BQS = BQ:publish(Message,
- message_properties(State),
- State #q.backing_queue_state),
- {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})}
+ {Delivered, State1} =
+ deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State),
+ {Delivered, Confirm, State1};
+attempt_delivery(#delivery{txn = Txn,
+ sender = ChPid,
+ message = Message} = Delivery,
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
+ BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS),
+ {true, should_confirm_message(Delivery, State),
+ State#q{backing_queue_state = BQS1}}.
+
+deliver_or_enqueue(Delivery = #delivery{message = Message}, State) ->
+ {Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
+ State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ maybe_record_confirm_message(Confirm, State1),
+ case Delivered of
+ true -> State2;
+ false -> BQS1 =
+ BQ:publish(Message,
+ (message_properties(State)) #message_properties{
+ needs_confirming = needs_confirming(Confirm)},
+ BQS),
+ ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
- maybe_run_queue_via_backing_queue(
- fun (BQS) ->
- BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)
- end, State).
+ run_backing_queue(
+ fun (BQS) -> BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) end,
+ State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
@@ -485,12 +547,19 @@ remove_consumer(ChPid, ConsumerTag, Queue) ->
end, Queue).
remove_consumers(ChPid, Queue) ->
- queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue).
+ {Kept, Removed} = split_by_channel(ChPid, Queue),
+ [emit_consumer_deleted(Ch, CTag) ||
+ {Ch, #consumer{tag = CTag}} <- queue:to_list(Removed)],
+ Kept.
move_consumers(ChPid, From, To) ->
+ {Kept, Removed} = split_by_channel(ChPid, From),
+ {Kept, queue:join(To, Removed)}.
+
+split_by_channel(ChPid, Queue) ->
{Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(From)),
- {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}.
+ queue:to_list(Queue)),
+ {queue:from_list(Kept), queue:from_list(Removed)}.
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
@@ -534,7 +603,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
true -> {stop, State1};
false -> State2 = case Txn of
none -> State1;
- _ -> rollback_transaction(Txn, ChPid,
+ _ -> rollback_transaction(Txn, C,
State1)
end,
{ok, requeue_and_run(sets:to_list(ChAckTags),
@@ -565,29 +634,29 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
-maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
+backing_queue_idle_timeout(State = #q{backing_queue = BQ}) ->
+ run_backing_queue(fun (BQS) -> BQ:idle_timeout(BQS) end, State).
+
+run_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
run_message_queue(State#q{backing_queue_state = Fun(BQS)}).
-commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- ttl = TTL}) ->
- {AckTags, BQS1} = BQ:tx_commit(Txn,
- fun () -> gen_server2:reply(From, ok) end,
- reset_msg_expiry_fun(TTL),
- BQS),
- %% ChPid must be known here because of the participant management
- %% by the channel.
- C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
+commit_transaction(Txn, From, C = #cr{acktags = ChAckTags},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ ttl = TTL}) ->
+ {AckTags, BQS1} = BQ:tx_commit(
+ Txn, fun () -> gen_server2:reply(From, ok) end,
+ reset_msg_expiry_fun(TTL), BQS),
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
State#q{backing_queue_state = BQS1}.
-rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+rollback_transaction(Txn, C, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
{_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
%% Iff we removed acktags from the channel record on ack+txn then
- %% we would add them back in here (would also require ChPid)
- record_current_channel_tx(ChPid, none),
+ %% we would add them back in here.
+ maybe_store_ch_record(C#cr{txn = none}),
State#q{backing_queue_state = BQS1}.
subtract_acks(A, B) when is_list(B) ->
@@ -602,17 +671,16 @@ message_properties(#q{ttl=TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL)}.
calculate_msg_expiry(undefined) -> undefined;
-calculate_msg_expiry(TTL) -> now_millis() + (TTL * 1000).
+calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) ->
State;
drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
- Now = now_millis(),
+ Now = now_micros(),
BQS1 = BQ:dropwhile(
- fun (#message_properties{expiry = Expiry}) ->
- Now > Expiry
- end, BQS),
+ fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
+ BQS),
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
ensure_ttl_timer(State = #q{backing_queue = BQ,
@@ -629,7 +697,7 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
-now_millis() -> timer:now_diff(now(), {0,0,0}).
+now_micros() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -668,21 +736,43 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
i(Item, _) ->
throw({bad_argument, Item}).
+consumers(#q{active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers}) ->
+ rabbit_misc:queue_fold(
+ fun ({ChPid, #consumer{tag = ConsumerTag,
+ ack_required = AckRequired}}, Acc) ->
+ [{ChPid, ConsumerTag, AckRequired} | Acc]
+ end, [], queue:join(ActiveConsumers, BlockedConsumers)).
+
emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
-%---------------------------------------------------------------------------
+emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) ->
+ rabbit_event:notify(consumer_created,
+ [{consumer_tag, ConsumerTag},
+ {exclusive, Exclusive},
+ {ack_required, AckRequired},
+ {channel, ChPid},
+ {queue, self()}]).
+
+emit_consumer_deleted(ChPid, ConsumerTag) ->
+ rabbit_event:notify(consumer_deleted,
+ [{consumer_tag, ConsumerTag},
+ {channel, ChPid},
+ {queue, self()}]).
+
+%%----------------------------------------------------------------------------
prioritise_call(Msg, _From, _State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- {maybe_run_queue_via_backing_queue, _Fun} -> 6;
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ {run_backing_queue, _Fun} -> 6;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
@@ -694,10 +784,12 @@ prioritise_cast(Msg, _State) ->
maybe_expire -> 8;
drop_expired -> 8;
emit_stats -> 7;
- {ack, _Txn, _MsgIds, _ChPid} -> 7;
- {reject, _MsgIds, _Requeue, _ChPid} -> 7;
+ {ack, _Txn, _AckTags, _ChPid} -> 7;
+ {reject, _AckTags, _Requeue, _ChPid} -> 7;
{notify_sent, _ChPid} -> 7;
{unblock, _ChPid} -> 7;
+ {run_backing_queue, _Fun} -> 6;
+ sync_timeout -> 6;
_ -> 0
end.
@@ -711,20 +803,20 @@ handle_call({init, Recover}, From,
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
- case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of
- true -> erlang:monitor(process, Owner),
- declare(Recover, From, State);
- _ -> #q{q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined} = State,
- gen_server2:reply(From, not_found),
- case Recover of
- true -> ok;
- _ -> rabbit_log:warning(
- "Queue ~p exclusive owner went away~n", [QName])
- end,
- BQS = BQ:init(QName, IsDurable, Recover),
- %% Rely on terminate to delete the queue.
- {stop, normal, State#q{backing_queue_state = BQS}}
+ case rabbit_misc:is_process_alive(Owner) of
+ true -> erlang:monitor(process, Owner),
+ declare(Recover, From, State);
+ false -> #q{backing_queue = BQ, backing_queue_state = undefined,
+ q = #amqqueue{name = QName, durable = IsDurable}} = State,
+ gen_server2:reply(From, not_found),
+ case Recover of
+ true -> ok;
+ _ -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n", [QName])
+ end,
+ BQS = bq_init(BQ, QName, IsDurable, Recover),
+ %% Rely on terminate to delete the queue.
+ {stop, normal, State#q{backing_queue_state = BQS}}
end;
handle_call(info, _From, State) ->
@@ -736,16 +828,10 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call(consumers, _From,
- State = #q{active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers}) ->
- reply(rabbit_misc:queue_fold(
- fun ({ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}, Acc) ->
- [{ChPid, ConsumerTag, AckRequired} | Acc]
- end, [], queue:join(ActiveConsumers, BlockedConsumers)), State);
+handle_call(consumers, _From, State) ->
+ reply(consumers(State), State);
-handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
+handle_call({deliver_immediately, Delivery}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
%% FIXME: Is this correct semantics?
@@ -759,17 +845,23 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State),
- reply(Delivered, NewState);
+ {Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
+ reply(Delivered, case Delivered of
+ true -> maybe_record_confirm_message(Confirm, State1);
+ false -> State1
+ end);
-handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
- %% Synchronous, "mandatory" delivery mode
- {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
- reply(Delivered, NewState);
+handle_call({deliver, Delivery}, From, State) ->
+ %% Synchronous, "mandatory" delivery mode. Reply asap.
+ gen_server2:reply(From, true),
+ noreply(deliver_or_enqueue(Delivery, State));
handle_call({commit, Txn, ChPid}, From, State) ->
- NewState = commit_transaction(Txn, From, ChPid, State),
- noreply(run_message_queue(NewState));
+ case lookup_ch(ChPid) of
+ not_found -> reply(ok, State);
+ C -> noreply(run_message_queue(
+ commit_transaction(Txn, From, C, State)))
+ end;
handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
@@ -790,15 +882,18 @@ handle_call({basic_get, ChPid, NoAck}, _From,
{empty, State2} ->
reply(empty, State2);
{{Message, IsDelivered, AckTag, Remaining}, State2} ->
- case AckRequired of
- true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- true = maybe_store_ch_record(
- C#cr{acktags = sets:add_element(AckTag,
- ChAckTags)});
- false -> ok
- end,
+ State3 =
+ case AckRequired of
+ true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
+ true = maybe_store_ch_record(
+ C#cr{acktags =
+ sets:add_element(AckTag,
+ ChAckTags)}),
+ State2;
+ false -> State2
+ end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State2)
+ reply({ok, Remaining, Msg}, State3)
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid,
@@ -828,16 +923,16 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
case is_ch_blocked(C) of
true -> State1#q{
blocked_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.blocked_consumers)};
+ add_consumer(ChPid, Consumer,
+ State1#q.blocked_consumers)};
false -> run_message_queue(
State1#q{
active_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.active_consumers)})
+ add_consumer(ChPid, Consumer,
+ State1#q.active_consumers)})
end,
+ emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ not NoAck),
reply(ok, State2)
end;
@@ -856,6 +951,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
C1#cr{limiter_pid = undefined};
_ -> C1
end),
+ emit_consumer_deleted(ChPid, ConsumerTag),
ok = maybe_send_reply(ChPid, OkMsg),
NewState =
State#q{exclusive_consumer = cancel_holder(ChPid,
@@ -908,13 +1004,19 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(requeue_and_run(AckTags, State))
end;
-handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
- reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
+handle_call({run_backing_queue, Fun}, _From, State) ->
+ reply(ok, run_backing_queue(Fun, State)).
+
+
+handle_cast({run_backing_queue, Fun}, State) ->
+ noreply(run_backing_queue(Fun, State));
-handle_cast({deliver, Txn, Message, ChPid}, State) ->
+handle_cast(sync_timeout, State) ->
+ noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined}));
+
+handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
- noreply(NewState);
+ noreply(deliver_or_enqueue(Delivery, State));
handle_cast({ack, Txn, AckTags, ChPid},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
@@ -922,18 +1024,23 @@ handle_cast({ack, Txn, AckTags, ChPid},
not_found ->
noreply(State);
C = #cr{acktags = ChAckTags} ->
- {C1, BQS1} =
+ {C1, State1} =
case Txn of
none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)};
- _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)}
+ NewC = C#cr{acktags = ChAckTags1},
+ BQS1 = BQ:ack(AckTags, BQS),
+ {NewC, State#q{backing_queue_state = BQS1}};
+ _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
+ {C#cr{txn = Txn},
+ State#q{backing_queue_state = BQS1}}
end,
maybe_store_ch_record(C1),
- noreply(State#q{backing_queue_state = BQS1})
+ noreply(State1)
end;
handle_cast({reject, AckTags, Requeue, ChPid},
- State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
@@ -943,12 +1050,15 @@ handle_cast({reject, AckTags, Requeue, ChPid},
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
false -> BQS1 = BQ:ack(AckTags, BQS),
- State #q { backing_queue_state = BQS1 }
+ State#q{backing_queue_state = BQS1}
end)
end;
handle_cast({rollback, Txn, ChPid}, State) ->
- noreply(rollback_transaction(Txn, ChPid, State));
+ noreply(case lookup_ch(ChPid) of
+ not_found -> State;
+ C -> rollback_transaction(Txn, C, State)
+ end);
handle_cast(delete_immediately, State) ->
{stop, normal, State};
@@ -1035,9 +1145,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
{stop, NewState} -> {stop, normal, NewState}
end;
-handle_info(timeout, State = #q{backing_queue = BQ}) ->
- noreply(maybe_run_queue_via_backing_queue(
- fun (BQS) -> BQ:idle_timeout(BQS) end, State));
+handle_info(timeout, State) ->
+ noreply(backing_queue_idle_timeout(State));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
@@ -1051,15 +1160,15 @@ handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
handle_pre_hibernate(State = #q{backing_queue = BQ,
backing_queue_state = BQS,
stats_timer = StatsTimer}) ->
- BQS1 = BQ:handle_pre_hibernate(BQS),
- %% no activity for a while == 0 egress and ingress rates
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), infinity),
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ BQS3 = BQ:handle_pre_hibernate(BQS2),
rabbit_event:if_enabled(StatsTimer,
fun () ->
emit_stats(State, [{idle_since, now()}])
end),
State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
- backing_queue_state = BQS2},
+ backing_queue_state = BQS3},
{hibernate, stop_rate_timer(State1)}.