summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl341
1 files changed, 175 insertions, 166 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e794b4aa..2b0fe17e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -33,7 +33,7 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2, prioritise_info/2]).
-% Queue's state
+%% Queue's state
-record(q, {q,
exclusive_consumer,
has_had_consumers,
@@ -46,7 +46,7 @@
rate_timer_ref,
expiry_timer_ref,
stats_timer,
- guid_to_channel,
+ msg_id_to_channel,
ttl,
ttl_timer_ref
}).
@@ -112,7 +112,7 @@ init(Q) ->
expiry_timer_ref = undefined,
ttl = undefined,
stats_timer = rabbit_event:init_stats_timer(),
- guid_to_channel = dict:new()}, hibernate,
+ msg_id_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -149,7 +149,7 @@ declare(Recover, From,
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
- BQS = BQ:init(QName, IsDurable, Recover),
+ BQS = bq_init(BQ, QName, IsDurable, Recover),
State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
@@ -159,6 +159,20 @@ declare(Recover, From,
Q1 -> {stop, normal, {existing, Q1}, State}
end.
+bq_init(BQ, QName, IsDurable, Recover) ->
+ Self = self(),
+ BQ:init(QName, IsDurable, Recover,
+ fun (Fun) ->
+ rabbit_amqqueue:run_backing_queue_async(Self, Fun)
+ end,
+ fun (Fun) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> error end,
+ fun () ->
+ rabbit_amqqueue:run_backing_queue(Self, Fun)
+ end)
+ end).
+
process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
lists:foldl(fun({Arg, Fun}, State1) ->
case rabbit_misc:table_lookup(Arguments, Arg) of
@@ -201,13 +215,15 @@ noreply(NewState) ->
{NewState1, Timeout} = next_state(NewState),
{noreply, NewState1, Timeout}.
-next_state(State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- ensure_rate_timer(State),
- State2 = ensure_stats_timer(State1),
- case BQ:needs_idle_timeout(BQS) of
- true -> {ensure_sync_timer(State2), 0};
- false -> {stop_sync_timer(State2), hibernate}
+next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ {MsgIds, BQS1} = BQ:drain_confirmed(BQS),
+ State1 = ensure_stats_timer(
+ ensure_rate_timer(
+ confirm_messages(MsgIds, State#q{
+ backing_queue_state = BQS1}))),
+ case BQ:needs_idle_timeout(BQS1) of
+ true -> {ensure_sync_timer(State1), 0};
+ false -> {stop_sync_timer(State1), hibernate}
end.
ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
@@ -283,17 +299,16 @@ lookup_ch(ChPid) ->
ch_record(ChPid) ->
Key = {ch, ChPid},
case get(Key) of
- undefined ->
- MonitorRef = erlang:monitor(process, ChPid),
- C = #cr{consumer_count = 0,
- ch_pid = ChPid,
- monitor_ref = MonitorRef,
- acktags = sets:new(),
- is_limit_active = false,
- txn = none,
- unsent_message_count = 0},
- put(Key, C),
- C;
+ undefined -> MonitorRef = erlang:monitor(process, ChPid),
+ C = #cr{consumer_count = 0,
+ ch_pid = ChPid,
+ monitor_ref = MonitorRef,
+ acktags = sets:new(),
+ is_limit_active = false,
+ txn = none,
+ unsent_message_count = 0},
+ put(Key, C),
+ C;
C = #cr{} -> C
end.
@@ -319,18 +334,16 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
-all_ch_record() ->
- [C || {{ch, _}, C} <- get()].
+all_ch_record() -> [C || {{ch, _}, C} <- get()].
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
ch_record_state_transition(OldCR, NewCR) ->
- BlockedOld = is_ch_blocked(OldCR),
- BlockedNew = is_ch_blocked(NewCR),
- if BlockedOld andalso not(BlockedNew) -> unblock;
- BlockedNew andalso not(BlockedOld) -> block;
- true -> ok
+ case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of
+ {true, false} -> unblock;
+ {false, true} -> block;
+ {_, _} -> ok
end.
deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
@@ -365,13 +378,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
case ch_record_state_transition(C, NewC) of
ok -> {queue:in(QEntry, ActiveConsumersTail),
BlockedConsumers};
- block ->
- {ActiveConsumers1, BlockedConsumers1} =
- move_consumers(ChPid,
- ActiveConsumersTail,
- BlockedConsumers),
- {ActiveConsumers1,
- queue:in(QEntry, BlockedConsumers1)}
+ block -> {ActiveConsumers1, BlockedConsumers1} =
+ move_consumers(ChPid,
+ ActiveConsumersTail,
+ BlockedConsumers),
+ {ActiveConsumers1,
+ queue:in(QEntry, BlockedConsumers1)}
end,
State2 = State1#q{
active_consumers = NewActiveConsumers,
@@ -396,56 +408,65 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{FunAcc, State}
end.
-deliver_from_queue_pred(IsEmpty, _State) ->
- not IsEmpty.
+deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty.
deliver_from_queue_deliver(AckRequired, false, State) ->
{{Message, IsDelivered, AckTag, Remaining}, State1} =
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
-confirm_messages(Guids, State = #q{guid_to_channel = GTC}) ->
- {CMs, GTC1} =
- lists:foldl(
- fun(Guid, {CMs, GTC0}) ->
- case dict:find(Guid, GTC0) of
- {ok, {ChPid, MsgSeqNo}} ->
- {[{ChPid, MsgSeqNo} | CMs], dict:erase(Guid, GTC0)};
- _ ->
- {CMs, GTC0}
- end
- end, {[], GTC}, Guids),
- case lists:usort(CMs) of
- [{Ch, MsgSeqNo} | CMs1] ->
- [rabbit_channel:confirm(ChPid, MsgSeqNos) ||
- {ChPid, MsgSeqNos} <- group_confirms_by_channel(
- CMs1, [{Ch, [MsgSeqNo]}])];
- [] ->
- ok
- end,
- State#q{guid_to_channel = GTC1}.
-
-group_confirms_by_channel([], Acc) ->
- Acc;
-group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) ->
- group_confirms_by_channel(CMs, [{Ch, [Msg1 | Msgs]} | Acc]);
-group_confirms_by_channel([{Ch, Msg1} | CMs], Acc) ->
- group_confirms_by_channel(CMs, [{Ch, [Msg1]} | Acc]).
-
-record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
- {no_confirm, State};
-record_confirm_message(#delivery{sender = ChPid,
+confirm_messages([], State) ->
+ State;
+confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
+ {CMs, MTC1} = lists:foldl(
+ fun(MsgId, {CMs, MTC0}) ->
+ case dict:find(MsgId, MTC0) of
+ {ok, {ChPid, MsgSeqNo}} ->
+ {gb_trees_cons(ChPid, MsgSeqNo, CMs),
+ dict:erase(MsgId, MTC0)};
+ _ ->
+ {CMs, MTC0}
+ end
+ end, {gb_trees:empty(), MTC}, MsgIds),
+ gb_trees_foreach(fun(ChPid, MsgSeqNos) ->
+ rabbit_channel:confirm(ChPid, MsgSeqNos)
+ end, CMs),
+ State#q{msg_id_to_channel = MTC1}.
+
+gb_trees_foreach(_, none) ->
+ ok;
+gb_trees_foreach(Fun, {Key, Val, It}) ->
+ Fun(Key, Val),
+ gb_trees_foreach(Fun, gb_trees:next(It));
+gb_trees_foreach(Fun, Tree) ->
+ gb_trees_foreach(Fun, gb_trees:next(gb_trees:iterator(Tree))).
+
+gb_trees_cons(Key, Value, Tree) ->
+ case gb_trees:lookup(Key, Tree) of
+ {value, Values} -> gb_trees:update(Key, [Value | Values], Tree);
+ none -> gb_trees:insert(Key, [Value], Tree)
+ end.
+
+should_confirm_message(#delivery{msg_seq_no = undefined}, _State) ->
+ never;
+should_confirm_message(#delivery{sender = ChPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
- guid = Guid}},
- State =
- #q{guid_to_channel = GTC,
- q = #amqqueue{durable = true}}) ->
- {confirm,
- State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}};
-record_confirm_message(_Delivery, State) ->
- {no_confirm, State}.
+ id = MsgId}},
+ #q{q = #amqqueue{durable = true}}) ->
+ {eventually, ChPid, MsgSeqNo, MsgId};
+should_confirm_message(_Delivery, _State) ->
+ immediately.
+
+needs_confirming({eventually, _, _, _}) -> true;
+needs_confirming(_) -> false.
+
+maybe_record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId},
+ State = #q{msg_id_to_channel = MTC}) ->
+ State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)};
+maybe_record_confirm_message(_Confirm, State) ->
+ State.
run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -459,13 +480,12 @@ run_message_queue(State) ->
attempt_delivery(#delivery{txn = none,
sender = ChPid,
message = Message,
- msg_seq_no = MsgSeqNo},
- {NeedsConfirming, State = #q{backing_queue = BQ}}) ->
- %% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming
- case {NeedsConfirming, MsgSeqNo} of
- {_, undefined} -> ok;
- {no_confirm, _} -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
- {confirm, _} -> ok
+ msg_seq_no = MsgSeqNo} = Delivery,
+ State = #q{backing_queue = BQ}) ->
+ Confirm = should_confirm_message(Delivery, State),
+ case Confirm of
+ immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
+ _ -> ok
end,
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
@@ -477,46 +497,42 @@ attempt_delivery(#delivery{txn = none,
BQ:publish_delivered(
AckRequired, Message,
(?BASE_MESSAGE_PROPERTIES)#message_properties{
- needs_confirming = (NeedsConfirming =:= confirm)},
+ needs_confirming = needs_confirming(Confirm)},
BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
{Delivered, State1} =
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State),
- {Delivered, NeedsConfirming, State1};
-attempt_delivery(#delivery{txn = Txn,
+ {Delivered, Confirm, State1};
+attempt_delivery(#delivery{txn = Txn,
sender = ChPid,
- message = Message},
- {NeedsConfirming,
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}}) ->
+ message = Message} = Delivery,
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
- {true,
- NeedsConfirming,
- State#q{backing_queue_state =
- BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}.
-
-deliver_or_enqueue(Delivery, State) ->
- case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
- {true, _, State1} ->
- {true, State1};
- {false, NeedsConfirming, State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}} ->
- #delivery{message = Message} = Delivery,
- BQS1 = BQ:publish(Message,
- (message_properties(State)) #message_properties{
- needs_confirming =
- (NeedsConfirming =:= confirm)},
- BQS),
- {false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})}
+ BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS),
+ {true, should_confirm_message(Delivery, State),
+ State#q{backing_queue_state = BQS1}}.
+
+deliver_or_enqueue(Delivery = #delivery{message = Message}, State) ->
+ {Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
+ State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ maybe_record_confirm_message(Confirm, State1),
+ case Delivered of
+ true -> State2;
+ false -> BQS1 =
+ BQ:publish(Message,
+ (message_properties(State)) #message_properties{
+ needs_confirming = needs_confirming(Confirm)},
+ BQS),
+ ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
- maybe_run_queue_via_backing_queue(
- fun (BQS) ->
- {[], BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)}
- end, State).
+ run_backing_queue(
+ fun (BQS) -> BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) end,
+ State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
@@ -619,13 +635,10 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
backing_queue_idle_timeout(State = #q{backing_queue = BQ}) ->
- maybe_run_queue_via_backing_queue(
- fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State).
+ run_backing_queue(fun (BQS) -> BQ:idle_timeout(BQS) end, State).
-maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- {Guids, BQS1} = Fun(BQS),
- run_message_queue(
- confirm_messages(Guids, State#q{backing_queue_state = BQS1})).
+run_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
+ run_message_queue(State#q{backing_queue_state = Fun(BQS)}).
commit_transaction(Txn, From, C = #cr{acktags = ChAckTags},
State = #q{backing_queue = BQ,
@@ -666,9 +679,8 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
Now = now_micros(),
BQS1 = BQ:dropwhile(
- fun (#message_properties{expiry = Expiry}) ->
- Now > Expiry
- end, BQS),
+ fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
+ BQS),
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
ensure_ttl_timer(State = #q{backing_queue = BQ,
@@ -727,10 +739,10 @@ i(Item, _) ->
consumers(#q{active_consumers = ActiveConsumers,
blocked_consumers = BlockedConsumers}) ->
rabbit_misc:queue_fold(
- fun ({ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}, Acc) ->
- [{ChPid, ConsumerTag, AckRequired} | Acc]
- end, [], queue:join(ActiveConsumers, BlockedConsumers)).
+ fun ({ChPid, #consumer{tag = ConsumerTag,
+ ack_required = AckRequired}}, Acc) ->
+ [{ChPid, ConsumerTag, AckRequired} | Acc]
+ end, [], queue:join(ActiveConsumers, BlockedConsumers)).
emit_stats(State) ->
emit_stats(State, []).
@@ -752,33 +764,33 @@ emit_consumer_deleted(ChPid, ConsumerTag) ->
{channel, ChPid},
{queue, self()}]).
-%---------------------------------------------------------------------------
+%%----------------------------------------------------------------------------
prioritise_call(Msg, _From, _State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- {maybe_run_queue_via_backing_queue, _Fun} -> 6;
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ {run_backing_queue, _Fun} -> 6;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
- delete_immediately -> 8;
- {set_ram_duration_target, _Duration} -> 8;
- {set_maximum_since_use, _Age} -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
- emit_stats -> 7;
- {ack, _Txn, _MsgIds, _ChPid} -> 7;
- {reject, _MsgIds, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
- {unblock, _ChPid} -> 7;
- {maybe_run_queue_via_backing_queue, _Fun} -> 6;
- sync_timeout -> 6;
- _ -> 0
+ update_ram_duration -> 8;
+ delete_immediately -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ maybe_expire -> 8;
+ drop_expired -> 8;
+ emit_stats -> 7;
+ {ack, _Txn, _AckTags, _ChPid} -> 7;
+ {reject, _AckTags, _Requeue, _ChPid} -> 7;
+ {notify_sent, _ChPid} -> 7;
+ {unblock, _ChPid} -> 7;
+ {run_backing_queue, _Fun} -> 6;
+ sync_timeout -> 6;
+ _ -> 0
end.
prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
@@ -802,7 +814,7 @@ handle_call({init, Recover}, From,
_ -> rabbit_log:warning(
"Queue ~p exclusive owner went away~n", [QName])
end,
- BQS = BQ:init(QName, IsDurable, Recover),
+ BQS = bq_init(BQ, QName, IsDurable, Recover),
%% Rely on terminate to delete the queue.
{stop, normal, State#q{backing_queue_state = BQS}}
end;
@@ -819,8 +831,7 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State) ->
reply(consumers(State), State);
-handle_call({deliver_immediately, Delivery},
- _From, State) ->
+handle_call({deliver_immediately, Delivery}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
%% FIXME: Is this correct semantics?
@@ -834,15 +845,16 @@ handle_call({deliver_immediately, Delivery},
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, _NeedsConfirming, State1} =
- attempt_delivery(Delivery, record_confirm_message(Delivery, State)),
- reply(Delivered, State1);
+ {Delivered, Confirm, State1} = attempt_delivery(Delivery, State),
+ reply(Delivered, case Delivered of
+ true -> maybe_record_confirm_message(Confirm, State1);
+ false -> State1
+ end);
handle_call({deliver, Delivery}, From, State) ->
%% Synchronous, "mandatory" delivery mode. Reply asap.
gen_server2:reply(From, true),
- {_Delivered, NewState} = deliver_or_enqueue(Delivery, State),
- noreply(NewState);
+ noreply(deliver_or_enqueue(Delivery, State));
handle_call({commit, Txn, ChPid}, From, State) ->
case lookup_ch(ChPid) of
@@ -911,15 +923,13 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
case is_ch_blocked(C) of
true -> State1#q{
blocked_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.blocked_consumers)};
+ add_consumer(ChPid, Consumer,
+ State1#q.blocked_consumers)};
false -> run_message_queue(
State1#q{
active_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.active_consumers)})
+ add_consumer(ChPid, Consumer,
+ State1#q.active_consumers)})
end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck),
@@ -994,20 +1004,19 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(requeue_and_run(AckTags, State))
end;
-handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
- reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
+handle_call({run_backing_queue, Fun}, _From, State) ->
+ reply(ok, run_backing_queue(Fun, State)).
-handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
- noreply(maybe_run_queue_via_backing_queue(Fun, State));
+handle_cast({run_backing_queue, Fun}, State) ->
+ noreply(run_backing_queue(Fun, State));
handle_cast(sync_timeout, State) ->
noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined}));
handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- {_Delivered, NewState} = deliver_or_enqueue(Delivery, State),
- noreply(NewState);
+ noreply(deliver_or_enqueue(Delivery, State));
handle_cast({ack, Txn, AckTags, ChPid},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->