summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-09 15:22:12 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-09 15:22:12 +0000
commit626142a346e5d7cf25f1b249c518b2e723c8d83a (patch)
tree9e8d81c12366ee95656177f164303c502627a21e
parent01a0f6109f9e8ef9f29379755f7212d86f1508c1 (diff)
parent06ed05a9ab5b603aa20b022c4463f89f8b76c582 (diff)
downloadrabbitmq-server-626142a346e5d7cf25f1b249c518b2e723c8d83a.tar.gz
merge bug25345 into default
-rw-r--r--src/rabbit_amqqueue_process.erl163
-rw-r--r--src/rabbit_channel.erl93
-rw-r--r--src/rabbit_mirror_queue_slave.erl21
-rw-r--r--src/rabbit_variable_queue.erl157
4 files changed, 236 insertions, 198 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index bb1a5f86..e76bf6ea 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -355,7 +355,7 @@ ch_record(ChPid) ->
undefined -> MonitorRef = erlang:monitor(process, ChPid),
C = #cr{ch_pid = ChPid,
monitor_ref = MonitorRef,
- acktags = sets:new(),
+ acktags = queue:new(),
consumer_count = 0,
blocked_consumers = queue:new(),
is_limit_active = false,
@@ -369,9 +369,9 @@ ch_record(ChPid) ->
update_ch_record(C = #cr{consumer_count = ConsumerCount,
acktags = ChAckTags,
unsent_message_count = UnsentMessageCount}) ->
- case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of
- {0, 0, 0} -> ok = erase_ch_record(C);
- _ -> ok = store_ch_record(C)
+ case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
+ {true, 0, 0} -> ok = erase_ch_record(C);
+ _ -> ok = store_ch_record(C)
end,
C.
@@ -454,7 +454,7 @@ deliver_msg_to_consumer(DeliverFun,
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
- true -> sets:add_element(AckTag, ChAckTags);
+ true -> queue:in(AckTag, ChAckTags);
false -> ChAckTags
end,
update_ch_record(C#cr{acktags = ChAckTags1,
@@ -641,7 +641,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
senders = Senders1},
case should_auto_delete(State1) of
true -> {stop, State1};
- false -> {ok, requeue_and_run(sets:to_list(ChAckTags),
+ false -> {ok, requeue_and_run(queue:to_list(ChAckTags),
ensure_expiry_timer(State1))}
end
end.
@@ -680,11 +680,21 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
not_found ->
State;
C = #cr{acktags = ChAckTags} ->
- update_ch_record(C#cr{acktags = lists:foldl(fun sets:del_element/2,
- ChAckTags, AckTags)}),
+ update_ch_record(
+ C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}),
Fun(State)
end.
+subtract_acks([], [], AckQ) ->
+ AckQ;
+subtract_acks([], Prefix, AckQ) ->
+ queue:join(queue:from_list(lists:reverse(Prefix)), AckQ);
+subtract_acks([T | TL] = AckTags, Prefix, AckQ) ->
+ case queue:out(AckQ) of
+ {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail);
+ {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail)
+ end.
+
message_properties(Message, Confirm, #q{ttl = TTL}) ->
#message_properties{expiry = calculate_msg_expiry(Message, TTL),
needs_confirming = Confirm == eventually}.
@@ -699,30 +709,63 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
-drop_expired_msgs(State = #q{dlx = DLX,
- backing_queue_state = BQS,
+drop_expired_msgs(State = #q{backing_queue_state = BQS,
backing_queue = BQ }) ->
Now = now_micros(),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
- {Props, BQS1} = case DLX of
- undefined -> BQ:dropwhile(ExpirePred, BQS);
- _ -> {Next, Msgs, BQS2} =
- BQ:fetchwhile(ExpirePred,
- fun accumulate_msgs/3,
- [], BQS),
- case Msgs of
- [] -> ok;
- _ -> (dead_letter_fun(expired))(
- lists:reverse(Msgs))
- end,
- {Next, BQS2}
- end,
+ {Props, State1} =
+ with_dlx(
+ State#q.dlx,
+ fun (X) -> dead_letter_expired_msgs(ExpirePred, X, State) end,
+ fun () -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS),
+ {Next, State#q{backing_queue_state = BQS1}} end),
ensure_ttl_timer(case Props of
undefined -> undefined;
#message_properties{expiry = Exp} -> Exp
- end, State#q{backing_queue_state = BQS1}).
-
-accumulate_msgs(Msg, AckTag, Acc) -> [{Msg, AckTag} | Acc].
+ end, State1).
+
+with_dlx(undefined, _With, Without) -> Without();
+with_dlx(DLX, With, Without) -> case rabbit_exchange:lookup(DLX) of
+ {ok, X} -> With(X);
+ {error, not_found} -> Without()
+ end.
+
+dead_letter_expired_msgs(ExpirePred, X, State = #q{backing_queue = BQ}) ->
+ dead_letter_msgs(fun (DLFun, Acc, BQS1) ->
+ BQ:fetchwhile(ExpirePred, DLFun, Acc, BQS1)
+ end, expired, X, State).
+
+dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
+ {ok, State1} =
+ dead_letter_msgs(
+ fun (DLFun, Acc, BQS) ->
+ {Acc1, BQS1} = BQ:ackfold(DLFun, Acc, BQS, AckTags),
+ {ok, Acc1, BQS1}
+ end, rejected, X, State),
+ State1.
+
+dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
+ publish_seqno = SeqNo0,
+ unconfirmed = UC0,
+ queue_monitors = QMons0,
+ backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ QName = qname(State),
+ {Res, {AckImm1, SeqNo1, UC1, QMons1}, BQS1} =
+ Fun(fun (Msg, AckTag, {AckImm, SeqNo, UC, QMons}) ->
+ case dead_letter_publish(Msg, Reason,
+ X, RK, SeqNo, QName) of
+ [] -> {[AckTag | AckImm], SeqNo, UC, QMons};
+ QPids -> {AckImm, SeqNo + 1,
+ dtree:insert(SeqNo, QPids, AckTag, UC),
+ pmon:monitor_all(QPids, QMons)}
+ end
+ end, {[], SeqNo0, UC0, QMons0}, BQS),
+ {_Guids, BQS2} = BQ:ack(AckImm1, BQS1),
+ {Res, State#q{publish_seqno = SeqNo1,
+ unconfirmed = UC1,
+ queue_monitors = QMons1,
+ backing_queue_state = BQS2}}.
ensure_ttl_timer(undefined, State) ->
State;
@@ -743,11 +786,8 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
ensure_ttl_timer(_Expiry, State) ->
State.
-dead_letter_fun(Reason) ->
- fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end.
-
-dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
- DLMsg = make_dead_letter_msg(Reason, Msg, State),
+dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) ->
+ DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName),
Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo),
{Queues, Cycles} = detect_dead_letter_cycles(
DLMsg, rabbit_exchange:route(X, Delivery)),
@@ -826,19 +866,16 @@ detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
end
end.
-make_dead_letter_msg(Reason,
- Msg = #basic_message{content = Content,
+make_dead_letter_msg(Msg = #basic_message{content = Content,
exchange_name = Exchange,
routing_keys = RoutingKeys},
- State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) ->
+ Reason, DLX, RK, #resource{name = QName}) ->
{DeathRoutingKeys, HeadersFun1} =
- case DlxRoutingKey of
+ case RK of
undefined -> {RoutingKeys, fun (H) -> H end};
- _ -> {[DlxRoutingKey],
- fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
+ _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
end,
ReasonBin = list_to_binary(atom_to_list(Reason)),
- #resource{name = QName} = qname(State),
TimeSec = rabbit_misc:now_ms() div 1000,
HeadersFun2 =
fun (Headers) ->
@@ -889,7 +926,7 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:len(BQS);
i(messages_unacknowledged, _) ->
- lists:sum([sets:size(C#cr.acktags) || C <- all_ch_record()]);
+ lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]);
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
messages_unacknowledged]]);
@@ -1047,7 +1084,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- ChAckTags1 = sets:add_element(AckTag, ChAckTags),
+ ChAckTags1 = queue:in(AckTag, ChAckTags),
update_ch_record(C#cr{acktags = ChAckTags1}),
State2;
false -> State2
@@ -1216,20 +1253,15 @@ handle_cast({ack, AckTags, ChPid}, State) ->
handle_cast({reject, AckTags, true, ChPid}, State) ->
noreply(requeue(AckTags, ChPid, State));
-handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = undefined}) ->
- noreply(ack(AckTags, ChPid, State));
-
handle_cast({reject, AckTags, false, ChPid}, State) ->
- DLXFun = dead_letter_fun(rejected),
- noreply(subtract_acks(
- ChPid, AckTags, State,
- fun (State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {ok, BQS1} = BQ:ackfold(
- fun (M, A, ok) -> DLXFun([{M, A}]) end,
- ok, BQS, AckTags),
- State1#q{backing_queue_state = BQS1}
- end));
+ noreply(with_dlx(
+ State#q.dlx,
+ fun (X) -> subtract_acks(ChPid, AckTags, State,
+ fun (State1) ->
+ dead_letter_rejected_msgs(
+ AckTags, X, State1)
+ end) end,
+ fun () -> ack(AckTags, ChPid, State) end));
handle_cast(delete_immediately, State) ->
stop(State);
@@ -1275,31 +1307,6 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
- case rabbit_exchange:lookup(XName) of
- {ok, X} ->
- {AckImmediately, State2} =
- lists:foldl(
- fun({Msg, AckTag},
- {Acks, State1 = #q{publish_seqno = SeqNo,
- unconfirmed = UC,
- queue_monitors = QMons}}) ->
- case dead_letter_publish(Msg, Reason, X, State1) of
- [] -> {[AckTag | Acks], State1};
- QPids -> UC1 = dtree:insert(
- SeqNo, QPids, AckTag, UC),
- QMons1 = pmon:monitor_all(QPids, QMons),
- {Acks,
- State1#q{publish_seqno = SeqNo + 1,
- unconfirmed = UC1,
- queue_monitors = QMons1}}
- end
- end, {[], State}, Msgs),
- cleanup_after_confirm(AckImmediately, State2);
- {error, not_found} ->
- cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
- end;
-
handle_cast(start_mirroring, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
%% lookup again to get policy for init_with_existing_bq
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 2686d76d..88e3dfc5 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -40,8 +40,6 @@
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, capabilities, trace_state}).
--record(tx, {msgs, acks, nacks}).
-
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(STATISTICS_KEYS,
@@ -622,12 +620,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
DQ = {Delivery, QNames},
- {noreply,
- case Tx of
- none -> deliver_to_queues(DQ, State1);
- #tx{msgs = Msgs} -> Msgs1 = queue:in(DQ, Msgs),
- State1#ch{tx = Tx#tx{msgs = Msgs1}}
- end};
+ {noreply, case Tx of
+ none -> deliver_to_queues(DQ, State1);
+ {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
+ State1#ch{tx = {Msgs1, Acks}}
+ end};
{error, Reason} ->
precondition_failed("invalid message: ~p", [Reason])
end;
@@ -643,12 +640,12 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
_, State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
- {noreply,
- case Tx of
- none -> ack(Acked, State1),
- State1;
- #tx{acks = Acks} -> State1#ch{tx = Tx#tx{acks = Acked ++ Acks}}
- end};
+ {noreply, case Tx of
+ none -> ack(Acked, State1),
+ State1;
+ {Msgs, Acks} -> Acks1 = ack_cons(ack, Acked, Acks),
+ State1#ch{tx = {Msgs, Acks1}}
+ end};
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
@@ -805,7 +802,7 @@ handle_method(#'basic.recover_async'{requeue = true},
rabbit_misc:with_exit_handler(
OkFun,
fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end)
- end, UAMQL),
+ end, lists:reverse(UAMQL)),
ok = notify_limiter(Limiter, UAMQL),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
@@ -1032,28 +1029,25 @@ handle_method(#'tx.select'{}, _, State) ->
handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
-handle_method(#'tx.commit'{}, _, State = #ch{tx = #tx{msgs = Msgs,
- acks = Acks,
- nacks = Nacks},
+handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks},
limiter = Limiter}) ->
State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
- ack(Acks, State1),
- lists:foreach(
- fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, Nacks),
+ lists:foreach(fun ({ack, A}) -> ack(A, State1);
+ ({Requeue, A}) -> reject(Requeue, A, Limiter)
+ end, lists:reverse(Acks)),
{noreply, maybe_complete_tx(State1#ch{tx = committing})};
handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- tx = #tx{acks = Acks,
- nacks = Nacks}}) ->
- NacksL = lists:append([L || {_, L} <- Nacks]),
- UAMQ1 = queue:from_list(lists:usort(Acks ++ NacksL ++ queue:to_list(UAMQ))),
+ tx = {_Msgs, Acks}}) ->
+ AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])),
+ UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))),
{reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1,
tx = new_tx()}};
-handle_method(#'confirm.select'{}, _, #ch{tx = #tx{}}) ->
+handle_method(#'confirm.select'{}, _, #ch{tx = {_, _}}) ->
precondition_failed("cannot switch from tx to confirm mode");
handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
@@ -1204,14 +1198,14 @@ reject(DeliveryTag, Requeue, Multiple,
State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
- {noreply,
- case Tx of
- none -> reject(Requeue, Acked, State1#ch.limiter),
- State1;
- #tx{nacks = Nacks} -> Nacks1 = [{Requeue, Acked} | Nacks],
- State1#ch{tx = Tx#tx{nacks = Nacks1}}
- end}.
-
+ {noreply, case Tx of
+ none -> reject(Requeue, Acked, State1#ch.limiter),
+ State1;
+ {Msgs, Acks} -> Acks1 = ack_cons(Requeue, Acked, Acks),
+ State1#ch{tx = {Msgs, Acks1}}
+ end}.
+
+%% NB: Acked is in youngest-first order
reject(Requeue, Acked, Limiter) ->
foreach_per_queue(
fun (QPid, MsgIds) ->
@@ -1242,8 +1236,9 @@ record_sent(ConsumerTag, AckRequired,
end,
State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
+%% NB: returns acks in youngest-first order
collect_acks(Q, 0, true) ->
- {queue:to_list(Q), queue:new()};
+ {lists:reverse(queue:to_list(Q)), queue:new()};
collect_acks(Q, DeliveryTag, Multiple) ->
collect_acks([], [], Q, DeliveryTag, Multiple).
@@ -1270,6 +1265,7 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
precondition_failed("unknown delivery tag ~w", [DeliveryTag])
end.
+%% NB: Acked is in youngest-first order
ack(Acked, State = #ch{queue_names = QNames}) ->
foreach_per_queue(
fun (QPid, MsgIds) ->
@@ -1282,7 +1278,19 @@ ack(Acked, State = #ch{queue_names = QNames}) ->
end, Acked),
ok = notify_limiter(State#ch.limiter, Acked).
-new_tx() -> #tx{msgs = queue:new(), acks = [], nacks = []}.
+%% {Msgs, Acks}
+%%
+%% Msgs is a queue.
+%%
+%% Acks looks s.t. like this:
+%% [{false,[5,4]},{true,[3]},{ack,[2,1]}, ...]
+%%
+%% Each element is a pair consisting of a tag and a list of
+%% ack'ed/reject'ed msg ids. The tag is one of 'ack' (to ack), 'true'
+%% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as
+%% well as the list overall, are in "most-recent (generally youngest)
+%% ack first" order.
+new_tx() -> {queue:new(), []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1296,6 +1304,8 @@ foreach_per_queue(_F, []) ->
ok;
foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
F(QPid, [MsgId]);
+%% NB: UAL should be in youngest-first order; the tree values will
+%% then be in oldest-first order
foreach_per_queue(F, UAL) ->
T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
rabbit_misc:gb_trees_cons(QPid, MsgId, T)
@@ -1440,7 +1450,12 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
State.
-maybe_complete_tx(State = #ch{tx = #tx{}}) ->
+ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, Acked ++ Acks} | L];
+ack_cons(Tag, Acked, Acks) -> [{Tag, Acked} | Acks].
+
+ack_len(Acks) -> lists:sum([length(L) || {ack, L} <- Acks]).
+
+maybe_complete_tx(State = #ch{tx = {_, _}}) ->
State;
maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
case dtree:is_empty(UC) of
@@ -1472,9 +1487,9 @@ i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = CM}) -> dict:size(CM);
i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ);
-i(messages_uncommitted, #ch{tx = #tx{msgs = Msgs}}) -> queue:len(Msgs);
+i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs);
i(messages_uncommitted, #ch{}) -> 0;
-i(acks_uncommitted, #ch{tx = #tx{acks = Acks}}) -> length(Acks);
+i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_limit(Limiter);
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 53564f09..feddf45a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -257,8 +257,12 @@ handle_cast({set_ram_duration_target, Duration},
handle_info(update_ram_duration, State = #state{backing_queue = BQ,
backing_queue_state = BQS}) ->
- noreply(State#state{rate_timer_ref = just_measured,
- backing_queue_state = update_ram_duration(BQ, BQS)});
+ BQS1 = update_ram_duration(BQ, BQS),
+ %% Don't call noreply/1, we don't want to set timers
+ {State1, Timeout} = next_state(State #state {
+ rate_timer_ref = undefined,
+ backing_queue_state = BQS1 }),
+ {noreply, State1, Timeout};
handle_info(sync_timeout, State) ->
noreply(backing_queue_timeout(
@@ -566,17 +570,16 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
noreply(State) ->
{NewState, Timeout} = next_state(State),
- {noreply, NewState, Timeout}.
+ {noreply, ensure_rate_timer(NewState), Timeout}.
reply(Reply, State) ->
{NewState, Timeout} = next_state(State),
- {reply, Reply, NewState, Timeout}.
+ {reply, Reply, ensure_rate_timer(NewState), Timeout}.
next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
{MsgIds, BQS1} = BQ:drain_confirmed(BQS),
- State1 = ensure_rate_timer(
- confirm_messages(MsgIds, State #state {
- backing_queue_state = BQS1 })),
+ State1 = confirm_messages(MsgIds,
+ State #state { backing_queue_state = BQS1 }),
case BQ:needs_timeout(BQS1) of
false -> {stop_sync_timer(State1), hibernate };
idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
@@ -602,15 +605,11 @@ ensure_rate_timer(State = #state { rate_timer_ref = undefined }) ->
TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL,
self(), update_ram_duration),
State #state { rate_timer_ref = TRef };
-ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
- State #state { rate_timer_ref = undefined };
ensure_rate_timer(State) ->
State.
stop_rate_timer(State = #state { rate_timer_ref = undefined }) ->
State;
-stop_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
- State #state { rate_timer_ref = undefined };
stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
erlang:cancel_timer(TRef),
State #state { rate_timer_ref = undefined }.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 37ca6de0..90ee3439 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -255,8 +255,8 @@
q3,
q4,
next_seq_id,
- pending_ack,
- ram_ack_index,
+ ram_pending_ack,
+ disk_pending_ack,
index_state,
msg_store_clients,
durable,
@@ -348,8 +348,8 @@
q3 :: ?QUEUE:?QUEUE(),
q4 :: ?QUEUE:?QUEUE(),
next_seq_id :: seq_id(),
- pending_ack :: gb_tree(),
- ram_ack_index :: gb_set(),
+ ram_pending_ack :: gb_tree(),
+ disk_pending_ack :: gb_tree(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
@@ -670,12 +670,11 @@ requeue(AckTags, #vqstate { delta = Delta,
ackfold(MsgFun, Acc, State, AckTags) ->
{AccN, StateN} =
- lists:foldl(
- fun(SeqId, {Acc0, State0 = #vqstate{ pending_ack = PA }}) ->
- MsgStatus = gb_trees:get(SeqId, PA),
- {Msg, State1} = read_msg(MsgStatus, false, State0),
- {MsgFun(Msg, SeqId, Acc0), State1}
- end, {Acc, State}, AckTags),
+ lists:foldl(fun(SeqId, {Acc0, State0}) ->
+ MsgStatus = lookup_pending_ack(SeqId, State0),
+ {Msg, State1} = read_msg(MsgStatus, false, State0),
+ {MsgFun(Msg, SeqId, Acc0), State1}
+ end, {Acc, State}, AckTags),
{AccN, a(StateN)}.
fold(Fun, Acc, #vqstate { q1 = Q1,
@@ -702,8 +701,8 @@ len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
-depth(State = #vqstate { pending_ack = Ack }) ->
- len(State) + gb_trees:size(Ack).
+depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) ->
+ len(State) + gb_trees:size(RPA) + gb_trees:size(DPA).
set_ram_duration_target(
DurationTarget, State = #vqstate {
@@ -740,7 +739,7 @@ ram_duration(State = #vqstate {
ack_out_counter = AckOutCount,
ram_msg_count = RamMsgCount,
ram_msg_count_prev = RamMsgCountPrev,
- ram_ack_index = RamAckIndex,
+ ram_pending_ack = RPA,
ram_ack_count_prev = RamAckCountPrev }) ->
Now = now(),
{AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
@@ -751,7 +750,7 @@ ram_duration(State = #vqstate {
{AvgAckIngressRate, AckIngress1} =
update_rate(Now, AckTimestamp, AckInCount, AckIngress),
- RamAckCount = gb_sets:size(RamAckIndex),
+ RamAckCount = gb_trees:size(RPA),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
@@ -783,19 +782,24 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_timeout(State = #vqstate { index_state = IndexState }) ->
+needs_timeout(State = #vqstate { index_state = IndexState,
+ target_ram_count = TargetRamCount }) ->
case must_sync_index(State) of
true -> timed;
false ->
case rabbit_queue_index:needs_sync(IndexState) of
true -> idle;
- false -> case reduce_memory_use(
- fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State) of
- {true, _State} -> idle;
- {false, _State} -> false
+ false -> case TargetRamCount of
+ infinity -> false;
+ _ -> case
+ reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State) of
+ {true, _State} -> idle;
+ {false, _State} -> false
+ end
end
end
end.
@@ -811,8 +815,8 @@ handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
status(#vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
- pending_ack = PA,
- ram_ack_index = RAI,
+ ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
target_ram_count = TargetRamCount,
ram_msg_count = RamMsgCount,
next_seq_id = NextSeqId,
@@ -827,10 +831,10 @@ status(#vqstate {
{q3 , ?QUEUE:len(Q3)},
{q4 , ?QUEUE:len(Q4)},
{len , Len},
- {pending_acks , gb_trees:size(PA)},
+ {pending_acks , gb_trees:size(RPA) + gb_trees:size(DPA)},
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
- {ram_ack_count , gb_sets:size(RAI)},
+ {ram_ack_count , gb_trees:size(RPA)},
{next_seq_id , NextSeqId},
{persistent_count , PersistentCount},
{avg_ingress_rate , AvgIngressRate},
@@ -962,7 +966,7 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
+betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered},
@@ -971,7 +975,8 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
- false -> case gb_trees:is_defined(SeqId, PA) of
+ false -> case (gb_trees:is_defined(SeqId, RPA) orelse
+ gb_trees:is_defined(SeqId, DPA)) of
false ->
{?QUEUE:in_r(
m(#msg_status {
@@ -1033,8 +1038,8 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
q3 = ?QUEUE:new(),
q4 = ?QUEUE:new(),
next_seq_id = NextSeqId,
- pending_ack = gb_trees:empty(),
- ram_ack_index = gb_sets:empty(),
+ ram_pending_ack = gb_trees:empty(),
+ disk_pending_ack = gb_trees:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
durable = IsDurable,
@@ -1248,33 +1253,47 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
%%----------------------------------------------------------------------------
record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus,
- State = #vqstate { pending_ack = PA,
- ram_ack_index = RAI,
- ack_in_counter = AckInCount}) ->
- RAI1 = case Msg of
- undefined -> RAI;
- _ -> gb_sets:insert(SeqId, RAI)
- end,
- State #vqstate { pending_ack = gb_trees:insert(SeqId, MsgStatus, PA),
- ram_ack_index = RAI1,
- ack_in_counter = AckInCount + 1}.
+ State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ ack_in_counter = AckInCount}) ->
+ {RPA1, DPA1} =
+ case Msg of
+ undefined -> {RPA, gb_trees:insert(SeqId, MsgStatus, DPA)};
+ _ -> {gb_trees:insert(SeqId, MsgStatus, RPA), DPA}
+ end,
+ State #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1,
+ ack_in_counter = AckInCount + 1}.
+
+lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
+ case gb_trees:lookup(SeqId, RPA) of
+ {value, V} -> V;
+ none -> gb_trees:get(SeqId, DPA)
+ end.
-remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA,
- ram_ack_index = RAI }) ->
- {gb_trees:get(SeqId, PA),
- State #vqstate { pending_ack = gb_trees:delete(SeqId, PA),
- ram_ack_index = gb_sets:delete_any(SeqId, RAI) }}.
+remove_pending_ack(SeqId, State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
+ case gb_trees:lookup(SeqId, RPA) of
+ {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA),
+ {V, State #vqstate { ram_pending_ack = RPA1 }};
+ none -> DPA1 = gb_trees:delete(SeqId, DPA),
+ {gb_trees:get(SeqId, DPA),
+ State #vqstate { disk_pending_ack = DPA1 }}
+ end.
purge_pending_ack(KeepPersistent,
- State = #vqstate { pending_ack = PA,
+ State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
+ F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end,
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
- rabbit_misc:gb_trees_fold(fun (_SeqId, MsgStatus, Acc) ->
- accumulate_ack(MsgStatus, Acc)
- end, accumulate_ack_init(), PA),
- State1 = State #vqstate { pending_ack = gb_trees:empty(),
- ram_ack_index = gb_sets:empty() },
+ rabbit_misc:gb_trees_fold(
+ F, rabbit_misc:gb_trees_fold(F, accumulate_ack_init(), RPA), DPA),
+ State1 = State #vqstate { ram_pending_ack = gb_trees:empty(),
+ disk_pending_ack = gb_trees:empty() },
+
case KeepPersistent of
true -> case orddict:find(false, MsgIdsByStore) of
error -> State1;
@@ -1495,12 +1514,9 @@ delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd,
%% one segment's worth of messages in q3 - and thus would risk
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
-reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun,
- State = #vqstate {target_ram_count = infinity}) ->
- {false, State};
reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
State = #vqstate {
- ram_ack_index = RamAckIndex,
+ ram_pending_ack = RPA,
ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount,
rates = #rates { avg_ingress = AvgIngress,
@@ -1510,8 +1526,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
}) ->
{Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
- case chunk_size(RamMsgCount + gb_sets:size(RamAckIndex),
- TargetRamCount) of
+ case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
0 -> {false, State};
%% Reduce memory of pending acks and alphas. The order is
%% determined based on which is growing faster. Whichever
@@ -1536,22 +1551,23 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
limit_ram_acks(0, State) ->
{0, State};
-limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
- ram_ack_index = RAI }) ->
- case gb_sets:is_empty(RAI) of
+limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
+ case gb_trees:is_empty(RPA) of
true ->
{Quota, State};
false ->
- {SeqId, RAI1} = gb_sets:take_largest(RAI),
- MsgStatus = gb_trees:get(SeqId, PA),
+ {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
- PA1 = gb_trees:update(SeqId, m(trim_msg_status(MsgStatus1)), PA),
+ DPA1 = gb_trees:insert(SeqId, m(trim_msg_status(MsgStatus1)), DPA),
limit_ram_acks(Quota - 1,
- State1 #vqstate { pending_ack = PA1,
- ram_ack_index = RAI1 })
+ State1 #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1 })
end.
+reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
+ State;
reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
fun push_betas_to_deltas/2,
@@ -1617,7 +1633,8 @@ maybe_deltas_to_betas(State = #vqstate {
delta = Delta,
q3 = Q3,
index_state = IndexState,
- pending_ack = PA,
+ ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
transient_threshold = TransientThreshold }) ->
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
@@ -1625,10 +1642,10 @@ maybe_deltas_to_betas(State = #vqstate {
DeltaSeqId1 =
lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
DeltaSeqIdEnd]),
- {List, IndexState1} =
- rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
- {Q3a, IndexState2} =
- betas_from_index_entries(List, TransientThreshold, PA, IndexState1),
+ {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
+ IndexState),
+ {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold,
+ RPA, DPA, IndexState1),
State1 = State #vqstate { index_state = IndexState2 },
case ?QUEUE:len(Q3a) of
0 ->