path: root/src/rabbit_variable_queue.erl
diff options
Diffstat (limited to 'src/rabbit_variable_queue.erl')
1 files changed, 303 insertions, 223 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 1acc9ef0..f7c6c729 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,12 +16,13 @@
--export([init/3, terminate/2, delete_and_terminate/2, purge/1,
- publish/4, publish_delivered/4, discard/3, drain_confirmed/1,
- dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
- depth/1, set_ram_duration_target/2, ram_duration/1,
+-export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1,
+ publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
+ dropwhile/2, fetchwhile/4,
+ fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
+ is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, multiple_routing_keys/0, fold/3]).
+ is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -254,16 +255,13 @@
- pending_ack,
- pending_ack_index,
- ram_ack_index,
+ ram_pending_ack,
+ disk_pending_ack,
- async_callback,
@@ -348,16 +346,14 @@
q3 :: ?QUEUE:?QUEUE(),
q4 :: ?QUEUE:?QUEUE(),
next_seq_id :: seq_id(),
- pending_ack :: gb_tree(),
- ram_ack_index :: gb_tree(),
+ ram_pending_ack :: gb_tree(),
+ disk_pending_ack :: gb_tree(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
durable :: boolean(),
transient_threshold :: non_neg_integer(),
- async_callback :: rabbit_backing_queue:async_callback(),
len :: non_neg_integer(),
persistent_count :: non_neg_integer(),
@@ -426,7 +422,7 @@ init(Queue, Recover, AsyncCallback) ->
init(#amqqueue { name = QueueName, durable = IsDurable }, false,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
- init(IsDurable, IndexState, 0, [], AsyncCallback,
+ init(IsDurable, IndexState, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
MsgOnDiskFun, AsyncCallback);
@@ -454,7 +450,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
rabbit_msg_store:contains(MsgId, PersistentClient)
- init(true, IndexState, DeltaCount, Terms1, AsyncCallback,
+ init(true, IndexState, DeltaCount, Terms1,
PersistentClient, TransientClient).
terminate(_Reason, State) ->
@@ -519,18 +515,19 @@ purge(State = #vqstate { q4 = Q4,
ram_msg_count = 0,
persistent_count = PCount1 })}.
+purge_acks(State) -> a(purge_pending_ack(false, State)).
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- next_seq_id = SeqId,
- len = Len,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- ram_msg_count = RamMsgCount,
- unconfirmed = UC }) ->
+ IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ next_seq_id = SeqId,
+ len = Len,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps),
+ MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case ?QUEUE:is_empty(Q3) of
false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
@@ -538,12 +535,12 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- a(reduce_memory_use(State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- ram_msg_count = RamMsgCount + 1,
- unconfirmed = UC1 })).
+ a(reduce_memory_use(
+ inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 }))).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
@@ -557,8 +554,7 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
durable = IsDurable,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
- #msg_status { is_delivered = true },
+ MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
@@ -579,27 +575,28 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
-dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
+dropwhile(Pred, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {undefined, a(State1)};
+ {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
+ case Pred(MsgProps) of
+ true -> {_, State2} = remove(false, MsgStatus, State1),
+ dropwhile(Pred, State2);
+ false -> {MsgProps, a(in_r(MsgStatus, State1))}
+ end
+ end.
-dropwhile(Pred, AckRequired, State, Msgs) ->
- End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S};
- (Next, S) -> {Next, undefined, S}
- end,
+fetchwhile(Pred, Fun, Acc, State) ->
case queue_out(State) of
{empty, State1} ->
- End(undefined, a(State1));
+ {undefined, Acc, a(State1)};
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case {Pred(MsgProps), AckRequired} of
- {true, true} ->
- {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {{Msg, _, AckTag, _}, State3} =
- internal_fetch(true, MsgStatus1, State2),
- dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]);
- {true, false} ->
- {_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile(Pred, AckRequired, State2, undefined);
- {false, _} ->
- End(MsgProps, a(in_r(MsgStatus, State1)))
+ case Pred(MsgProps) of
+ true -> {Msg, State2} = read_msg(MsgStatus, State1),
+ {AckTag, State3} = remove(true, MsgStatus, State2),
+ fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3);
+ false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))}
@@ -610,9 +607,18 @@ fetch(AckRequired, State) ->
{{value, MsgStatus}, State1} ->
%% it is possible that the message wasn't read from disk
%% at this point, so read it in.
- {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2),
- {Res, a(State3)}
+ {Msg, State2} = read_msg(MsgStatus, State1),
+ {AckTag, State3} = remove(AckRequired, MsgStatus, State2),
+ {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
+ end.
+drop(AckRequired, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {empty, a(State1)};
+ {{value, MsgStatus}, State1} ->
+ {AckTag, State2} = remove(AckRequired, MsgStatus, State1),
+ {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)}
ack([], State) ->
@@ -638,16 +644,6 @@ ack(AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-fold(undefined, State, _AckTags) ->
- State;
-fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
- a(lists:foldl(fun(SeqId, State1) ->
- {MsgStatus, State2} =
- read_msg(gb_trees:get(SeqId, PA), false, State1),
- MsgFun(MsgStatus#msg_status.msg, SeqId),
- State2
- end, State, AckTags)).
requeue(AckTags, #vqstate { delta = Delta,
q3 = Q3,
q4 = Q4,
@@ -669,12 +665,28 @@ requeue(AckTags, #vqstate { delta = Delta,
in_counter = InCounter + MsgCount,
len = Len + MsgCount }))}.
+ackfold(MsgFun, Acc, State, AckTags) ->
+ {AccN, StateN} =
+ lists:foldl(fun(SeqId, {Acc0, State0}) ->
+ MsgStatus = lookup_pending_ack(SeqId, State0),
+ {Msg, State1} = read_msg(MsgStatus, State0),
+ {MsgFun(Msg, SeqId, Acc0), State1}
+ end, {Acc, State}, AckTags),
+ {AccN, a(StateN)}.
+fold(Fun, Acc, State = #vqstate{index_state = IndexState}) ->
+ {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState},
+ [msg_iterator(State),
+ disk_ack_iterator(State),
+ ram_ack_iterator(State)]),
+ ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}).
len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
-depth(State = #vqstate { pending_ack = Ack }) ->
- len(State) + gb_trees:size(Ack).
+depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) ->
+ len(State) + gb_trees:size(RPA) + gb_trees:size(DPA).
DurationTarget, State = #vqstate {
@@ -711,7 +723,7 @@ ram_duration(State = #vqstate {
ack_out_counter = AckOutCount,
ram_msg_count = RamMsgCount,
ram_msg_count_prev = RamMsgCountPrev,
- ram_ack_index = RamAckIndex,
+ ram_pending_ack = RPA,
ram_ack_count_prev = RamAckCountPrev }) ->
Now = now(),
{AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
@@ -722,7 +734,7 @@ ram_duration(State = #vqstate {
{AvgAckIngressRate, AckIngress1} =
update_rate(Now, AckTimestamp, AckInCount, AckIngress),
- RamAckCount = gb_trees:size(RamAckIndex),
+ RamAckCount = gb_trees:size(RPA),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
@@ -754,21 +766,20 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_timeout(State = #vqstate { index_state = IndexState }) ->
- case must_sync_index(State) of
- true -> timed;
- false ->
- case rabbit_queue_index:needs_sync(IndexState) of
- true -> idle;
- false -> case reduce_memory_use(
- fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State) of
- {true, _State} -> idle;
- {false, _State} -> false
- end
- end
+needs_timeout(State = #vqstate { index_state = IndexState,
+ target_ram_count = TargetRamCount }) ->
+ case rabbit_queue_index:needs_sync(IndexState) of
+ confirms -> timed;
+ other -> idle;
+ false when TargetRamCount == infinity -> false;
+ false -> case reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State) of
+ {true, _State} -> idle;
+ {false, _State} -> false
+ end
timeout(State = #vqstate { index_state = IndexState }) ->
@@ -782,8 +793,8 @@ handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
status(#vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
- pending_ack = PA,
- ram_ack_index = RAI,
+ ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
target_ram_count = TargetRamCount,
ram_msg_count = RamMsgCount,
next_seq_id = NextSeqId,
@@ -798,10 +809,10 @@ status(#vqstate {
{q3 , ?QUEUE:len(Q3)},
{q4 , ?QUEUE:len(Q4)},
{len , Len},
- {pending_acks , gb_trees:size(PA)},
+ {pending_acks , gb_trees:size(RPA) + gb_trees:size(DPA)},
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
- {ram_ack_count , gb_trees:size(RAI)},
+ {ram_ack_count , gb_trees:size(RPA)},
{next_seq_id , NextSeqId},
{persistent_count , PersistentCount},
{avg_ingress_rate , AvgIngressRate},
@@ -862,16 +873,28 @@ cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
gb_sets_maybe_insert(false, _Val, Set) -> Set;
-%% when requeueing, we re-add a msg_id to the unconfirmed set
-gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
-msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
- MsgProps = #message_properties { delivered = Delivered }) ->
- %% TODO would it make sense to remove #msg_status.is_delivered?
- #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
- is_persistent = IsPersistent, is_delivered = Delivered,
- msg_on_disk = false, index_on_disk = false,
- msg_props = MsgProps }.
+gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
+msg_status(IsPersistent, IsDelivered, SeqId,
+ Msg = #basic_message {id = MsgId}, MsgProps) ->
+ #msg_status{seq_id = SeqId,
+ msg_id = MsgId,
+ msg = Msg,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = false,
+ index_on_disk = false,
+ msg_props = MsgProps}.
+beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ #msg_status{seq_id = SeqId,
+ msg_id = MsgId,
+ msg = undefined,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true,
+ msg_props = MsgProps}.
trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }.
@@ -935,31 +958,21 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
+betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) ->
{Filtered, Delivers, Acks} =
- fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered},
+ fun ({_MsgId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
{Filtered1, Delivers1, Acks1} = Acc) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
- false -> case gb_trees:is_defined(SeqId, PA) of
- false ->
- {?QUEUE:in_r(
- m(#msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- msg = undefined,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = true,
- index_on_disk = true,
- msg_props = MsgProps
- }), Filtered1),
- Delivers1, Acks1};
- true ->
- Acc
+ false -> case (gb_trees:is_defined(SeqId, RPA) orelse
+ gb_trees:is_defined(SeqId, DPA)) of
+ false -> {?QUEUE:in_r(m(beta_msg_status(M)),
+ Filtered1),
+ Delivers1, Acks1};
+ true -> Acc
end, {?QUEUE:new(), [], []}, List),
@@ -987,7 +1000,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%% Internal major helpers for Public API
-init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
+init(IsDurable, IndexState, DeltaCount, Terms,
PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
@@ -1006,15 +1019,13 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
q3 = ?QUEUE:new(),
q4 = ?QUEUE:new(),
next_seq_id = NextSeqId,
- pending_ack = gb_trees:empty(),
- ram_ack_index = gb_trees:empty(),
+ ram_pending_ack = gb_trees:empty(),
+ disk_pending_ack = gb_trees:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
durable = IsDurable,
transient_threshold = NextSeqId,
- async_callback = AsyncCallback,
len = DeltaCount1,
persistent_count = DeltaCount1,
@@ -1045,9 +1056,11 @@ in_r(MsgStatus = #msg_status { msg = undefined },
State = #vqstate { q3 = Q3, q4 = Q4 }) ->
case ?QUEUE:is_empty(Q4) of
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
- false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
+ false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }
+ inc_ram_msg_count(
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
+ msg = Msg }, Q4a) })
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1063,35 +1076,35 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
{{value, MsgStatus}, State #vqstate { q4 = Q4a }}
-read_msg(MsgStatus, State) -> read_msg(MsgStatus, true, State).
+read_msg(#msg_status{msg = undefined,
+ msg_id = MsgId,
+ is_persistent = IsPersistent}, State) ->
+ read_msg(MsgId, IsPersistent, State);
+read_msg(#msg_status{msg = Msg}, State) ->
+ {Msg, State}.
-read_msg(MsgStatus = #msg_status { msg = undefined,
- msg_id = MsgId,
- is_persistent = IsPersistent },
- CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount,
- msg_store_clients = MSCState}) ->
+read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
- {MsgStatus #msg_status { msg = Msg },
- State #vqstate { ram_msg_count = RamMsgCount + one_if(CountDiskToRam),
- msg_store_clients = MSCState1 }};
-read_msg(MsgStatus, _CountDiskToRam, State) ->
- {MsgStatus, State}.
-internal_fetch(AckRequired, MsgStatus = #msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- msg = Msg,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk },
- State = #vqstate {ram_msg_count = RamMsgCount,
- out_counter = OutCount,
- index_state = IndexState,
- msg_store_clients = MSCState,
- len = Len,
- persistent_count = PCount }) ->
+ {Msg, State #vqstate {msg_store_clients = MSCState1}}.
+inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
+ State#vqstate{ram_msg_count = RamMsgCount + 1}.
+remove(AckRequired, MsgStatus = #msg_status {
+ seq_id = SeqId,
+ msg_id = MsgId,
+ msg = Msg,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk },
+ State = #vqstate {ram_msg_count = RamMsgCount,
+ out_counter = OutCount,
+ index_state = IndexState,
+ msg_store_clients = MSCState,
+ len = Len,
+ persistent_count = PCount}) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
@@ -1102,12 +1115,11 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
- IndexState2 =
- case {AckRequired, MsgOnDisk, IndexOnDisk} of
- {false, true, false} -> Rem(), IndexState1;
- {false, true, true} -> Rem(), Ack();
- _ -> IndexState1
- end,
+ IndexState2 = case {AckRequired, MsgOnDisk, IndexOnDisk} of
+ {false, true, false} -> Rem(), IndexState1;
+ {false, true, true} -> Rem(), Ack();
+ _ -> IndexState1
+ end,
%% 3. If an ack is required, add something sensible to PA
{AckTag, State1} = case AckRequired of
@@ -1118,16 +1130,14 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
false -> {undefined, State}
- PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
- Len1 = Len - 1,
+ PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
- {{Msg, IsDelivered, AckTag, Len1},
- State1 #vqstate { ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len1,
- persistent_count = PCount1 }}.
+ {AckTag, State1 #vqstate {ram_msg_count = RamMsgCount1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len - 1,
+ persistent_count = PCount1}}.
State = #vqstate { q3 = Q3,
@@ -1224,37 +1234,48 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
%% Internal gubbins for acks
-record_pending_ack(#msg_status { seq_id = SeqId,
- msg_id = MsgId,
- msg_on_disk = MsgOnDisk } = MsgStatus,
- State = #vqstate { pending_ack = PA,
- ram_ack_index = RAI,
- ack_in_counter = AckInCount}) ->
- {AckEntry, RAI1} =
- case MsgOnDisk of
- true -> {m(trim_msg_status(MsgStatus)), RAI};
- false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
+record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus,
+ State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ ack_in_counter = AckInCount}) ->
+ {RPA1, DPA1} =
+ case Msg of
+ undefined -> {RPA, gb_trees:insert(SeqId, MsgStatus, DPA)};
+ _ -> {gb_trees:insert(SeqId, MsgStatus, RPA), DPA}
- State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA),
- ram_ack_index = RAI1,
- ack_in_counter = AckInCount + 1}.
+ State #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1,
+ ack_in_counter = AckInCount + 1}.
+lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
+ case gb_trees:lookup(SeqId, RPA) of
+ {value, V} -> V;
+ none -> gb_trees:get(SeqId, DPA)
+ end.
-remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA,
- ram_ack_index = RAI }) ->
- {gb_trees:get(SeqId, PA),
- State #vqstate { pending_ack = gb_trees:delete(SeqId, PA),
- ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}.
+remove_pending_ack(SeqId, State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
+ case gb_trees:lookup(SeqId, RPA) of
+ {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA),
+ {V, State #vqstate { ram_pending_ack = RPA1 }};
+ none -> DPA1 = gb_trees:delete(SeqId, DPA),
+ {gb_trees:get(SeqId, DPA),
+ State #vqstate { disk_pending_ack = DPA1 }}
+ end.
- State = #vqstate { pending_ack = PA,
+ State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
+ F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end,
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
- rabbit_misc:gb_trees_fold(fun (_SeqId, MsgStatus, Acc) ->
- accumulate_ack(MsgStatus, Acc)
- end, accumulate_ack_init(), PA),
- State1 = State #vqstate { pending_ack = gb_trees:empty(),
- ram_ack_index = gb_trees:empty() },
+ rabbit_misc:gb_trees_fold(
+ F, rabbit_misc:gb_trees_fold(F, accumulate_ack_init(), RPA), DPA),
+ State1 = State #vqstate { ram_pending_ack = gb_trees:empty(),
+ disk_pending_ack = gb_trees:empty() },
case KeepPersistent of
true -> case orddict:find(false, MsgIdsByStore) of
error -> State1;
@@ -1304,21 +1325,6 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet),
confirmed = gb_sets:union(C, MsgIdSet) }.
-must_sync_index(#vqstate { msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- %% If UC is empty then by definition, MIOD and MOD are also empty
- %% and there's nothing that can be pending a sync.
- %% If UC is not empty, then we want to find is_empty(UC - MIOD),
- %% but the subtraction can be expensive. Thus instead, we test to
- %% see if UC is a subset of MIOD. This can only be the case if
- %% MIOD == UC, which would indicate that every message in UC is
- %% also in MIOD and is thus _all_ pending on a msg_store sync, not
- %% on a qi sync. Thus the negation of this is sufficient. Because
- %% is_subset is short circuiting, this is more efficient than the
- %% subtraction.
- not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);
@@ -1351,9 +1357,10 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
- read_msg(MsgStatus, State);
-publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
- {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.
+ {Msg, State1} = read_msg(MsgStatus, State),
+ {MsgStatus#msg_status { msg = Msg }, inc_ram_msg_count(State1)};
+publish_alpha(MsgStatus, State) ->
+ {MsgStatus, inc_ram_msg_count(State)}.
publish_beta(MsgStatus, State) ->
{#msg_status { msg = Msg} = MsgStatus1,
@@ -1417,6 +1424,82 @@ delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
+%% Iterator
+ram_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}.
+disk_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}.
+msg_iterator(State) -> istate(start, State).
+istate(start, State) -> {q4, State#vqstate.q4, State};
+istate(q4, State) -> {q3, State#vqstate.q3, State};
+istate(q3, State) -> {delta,, State};
+istate(delta, State) -> {q2, State#vqstate.q2, State};
+istate(q2, State) -> {q1, State#vqstate.q1, State};
+istate(q1, _State) -> done.
+next({ack, It}, IndexState) ->
+ case gb_trees:next(It) of
+ none -> {empty, IndexState};
+ {_SeqId, MsgStatus, It1} -> Next = {ack, It1},
+ {value, MsgStatus, true, Next, IndexState}
+ end;
+next(done, IndexState) -> {empty, IndexState};
+next({delta, #delta{start_seq_id = SeqId,
+ end_seq_id = SeqId}, State}, IndexState) ->
+ next(istate(delta, State), IndexState);
+next({delta, #delta{start_seq_id = SeqId,
+ end_seq_id = SeqIdEnd} = Delta, State}, IndexState) ->
+ SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId),
+ SeqId1 = lists:min([SeqIdB, SeqIdEnd]),
+ {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState),
+ next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1);
+next({delta, Delta, [], State}, IndexState) ->
+ next({delta, Delta, State}, IndexState);
+next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
+ case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse
+ gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of
+ false -> Next = {delta, Delta, Rest, State},
+ {value, beta_msg_status(M), false, Next, IndexState};
+ true -> next({delta, Delta, Rest, State}, IndexState)
+ end;
+next({Key, Q, State}, IndexState) ->
+ case ?QUEUE:out(Q) of
+ {empty, _Q} -> next(istate(Key, State), IndexState);
+ {{value, MsgStatus}, QN} -> Next = {Key, QN, State},
+ {value, MsgStatus, false, Next, IndexState}
+ end.
+inext(It, {Its, IndexState}) ->
+ case next(It, IndexState) of
+ {empty, IndexState1} ->
+ {Its, IndexState1};
+ {value, MsgStatus1, Unacked, It1, IndexState1} ->
+ {[{MsgStatus1, Unacked, It1} | Its], IndexState1}
+ end.
+ifold(_Fun, Acc, [], State) ->
+ {Acc, State};
+ifold(Fun, Acc, Its, State) ->
+ [{MsgStatus, Unacked, It} | Rest] =
+ lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _},
+ {#msg_status{seq_id = SeqId2}, _, _}) ->
+ SeqId1 =< SeqId2
+ end, Its),
+ {Msg, State1} = read_msg(MsgStatus, State),
+ case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of
+ {stop, Acc1} ->
+ {Acc1, State};
+ {cont, Acc1} ->
+ {Its1, IndexState1} = inext(It, {Rest, State1#vqstate.index_state}),
+ ifold(Fun, Acc1, Its1, State1#vqstate{index_state = IndexState1})
+ end.
%% Phase changes
@@ -1439,12 +1522,9 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
%% one segment's worth of messages in q3 - and thus would risk
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
-reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun,
- State = #vqstate {target_ram_count = infinity}) ->
- {false, State};
reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
State = #vqstate {
- ram_ack_index = RamAckIndex,
+ ram_pending_ack = RPA,
ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount,
rates = #rates { avg_ingress = AvgIngress,
@@ -1454,8 +1534,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
}) ->
{Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
- case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
- TargetRamCount) of
+ case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
0 -> {false, State};
%% Reduce memory of pending acks and alphas. The order is
%% determined based on which is growing faster. Whichever
@@ -1480,23 +1559,23 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
limit_ram_acks(0, State) ->
{0, State};
-limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
- ram_ack_index = RAI }) ->
- case gb_trees:is_empty(RAI) of
+limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
+ case gb_trees:is_empty(RPA) of
true ->
{Quota, State};
false ->
- {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
- MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} =
- gb_trees:get(SeqId, PA),
+ {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
- PA1 = gb_trees:update(SeqId, m(trim_msg_status(MsgStatus1)), PA),
+ DPA1 = gb_trees:insert(SeqId, m(trim_msg_status(MsgStatus1)), DPA),
limit_ram_acks(Quota - 1,
- State1 #vqstate { pending_ack = PA1,
- ram_ack_index = RAI1 })
+ State1 #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1 })
+reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
+ State;
reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
fun push_betas_to_deltas/2,
@@ -1562,7 +1641,8 @@ maybe_deltas_to_betas(State = #vqstate {
delta = Delta,
q3 = Q3,
index_state = IndexState,
- pending_ack = PA,
+ ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
transient_threshold = TransientThreshold }) ->
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
@@ -1570,10 +1650,10 @@ maybe_deltas_to_betas(State = #vqstate {
DeltaSeqId1 =
- {List, IndexState1} =
- rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
- {Q3a, IndexState2} =
- betas_from_index_entries(List, TransientThreshold, PA, IndexState1),
+ {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
+ IndexState),
+ {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold,
+ RPA, DPA, IndexState1),
State1 = State #vqstate { index_state = IndexState2 },
case ?QUEUE:len(Q3a) of
0 ->