diff options
authorMatthias Radestock <>2011-09-30 10:07:00 +0100
committerMatthias Radestock <>2011-09-30 10:07:00 +0100
commit4c8e0ba742304badde6a5a43cc75046e3691d174 (patch)
parent3c050f4c0cf27239e05ccf608695e9506147544a (diff)
always record pending acks in #msg_status form
#msg_status { msg = undefined } replaces the previous 4-tuple. This simplifies the API and gets rid of the obscure, ad-hoc 4-tuple.
1 files changed, 34 insertions, 53 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0c04bbd6..1a7166ca 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -158,19 +158,13 @@
%% to search through qi segments looking for messages that are yet to
%% be acknowledged.
-%% Pending acks are recorded in memory either as the tuple {SeqId,
-%% MsgId, MsgProps} (tuple-form) or as the message itself (message-
-%% form). Acks for persistent messages are always stored in the tuple-
-%% form. Acks for transient messages are also stored in tuple-form if
-%% the message has been sent to disk as part of the memory reduction
-%% process. For transient messages that haven't already been written
-%% to disk, acks are stored in message-form.
+%% Pending acks are recorded in memory by storing the message itself.
+%% If the message has been sent to disk, we do not store the message
+%% content. During memory reduction, pending acks containing message
+%% content have that content removed and the corresponding messages
+%% are pushed out to disk.
-%% During memory reduction, acks stored in message-form are converted
-%% to tuple-form, and the corresponding messages are pushed out to
-%% disk.
-%% The order in which alphas are pushed to betas and message-form acks
+%% The order in which alphas are pushed to betas and pending acks
%% are pushed to disk is determined dynamically. We always prefer to
%% push messages for the source (alphas or acks) that is growing the
%% fastest (with growth measured as avg. ingress - avg. egress). In
@@ -575,8 +569,8 @@ ack(AckTags, State) ->
ack_out_counter = AckOutCount }} =
fun (SeqId, {Acc, State2}) ->
- {AckEntry, State3} = remove_pending_ack(SeqId, State2),
- {accumulate_ack(SeqId, AckEntry, Acc), State3}
+ {MsgStatus, State3} = remove_pending_ack(SeqId, State2),
+ {accumulate_ack(MsgStatus, Acc), State3}
end, {accumulate_ack_init(), State}, AckTags),
IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
@@ -806,6 +800,8 @@ msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
msg_on_disk = false, index_on_disk = false,
msg_props = MsgProps }.
+trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }.
with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
{Result, MSCStateP1} = Fun(MSCStateP),
{Result, {MSCStateP1, MSCStateT}};
@@ -1194,16 +1190,13 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
record_pending_ack(#msg_status { seq_id = SeqId,
msg_id = MsgId,
- is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk,
- msg_props = MsgProps } = MsgStatus,
+ msg_on_disk = MsgOnDisk } = MsgStatus,
State = #vqstate { pending_ack = PA,
ram_ack_index = RAI,
ack_in_counter = AckInCount}) ->
{AckEntry, RAI1} =
case MsgOnDisk of
- true -> {{IsPersistent, MsgId, MsgProps, IndexOnDisk}, RAI};
+ true -> {m(trim_msg_status(MsgStatus)), RAI};
false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
PA1 = dict:store(SeqId, AckEntry, PA),
@@ -1222,7 +1215,9 @@ purge_pending_ack(KeepPersistent,
index_state = IndexState,
msg_store_clients = MSCState }) ->
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
- dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
+ dict:fold(fun (_SeqId, MsgStatus, Acc) ->
+ accumulate_ack(MsgStatus, Acc)
+ end, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
case KeepPersistent of
@@ -1241,16 +1236,17 @@ purge_pending_ack(KeepPersistent,
accumulate_ack_init() -> {[], orddict:new(), []}.
-accumulate_ack(_SeqId, #msg_status { msg_id = MsgId,
- is_persistent = false, %% ASSERTIONS
- msg_on_disk = false,
- index_on_disk = false },
- {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
- {IndexOnDiskSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]};
-accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps, IndexOnDisk},
+accumulate_ack(#msg_status { seq_id = SeqId,
+ msg_id = MsgId,
+ is_persistent = IsPersistent,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk },
{IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
{cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore),
+ case MsgOnDisk of
+ true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
+ false -> MsgIdsByStore
+ end,
[MsgId | AllMsgIds]}.
find_persistent_count(LensByStore) ->
@@ -1425,23 +1421,10 @@ delta_merge(SeqIds, #delta { start_seq_id = StartSeqId,
%% Mostly opposite of record_pending_ack/2
msg_from_pending_ack(SeqId, MsgPropsFun, State) ->
- {AckEntry, State1} = remove_pending_ack(SeqId, State),
- #msg_status { msg_props = MsgProps1 } = MsgStatus1 =
- case AckEntry of
- {IsPersistent, MsgId, MsgProps, IndexOnDisk} ->
- m(#msg_status { seq_id = SeqId,
- msg_id = MsgId,
- msg = undefined,
- is_persistent = IsPersistent,
- is_delivered = true,
- msg_on_disk = true,
- index_on_disk = IndexOnDisk,
- msg_props = MsgProps });
- #msg_status{} = MsgStatus ->
- MsgStatus
- end,
- {MsgStatus1 #msg_status {
- msg_props = (MsgPropsFun(MsgProps1)) #message_properties {
+ {#msg_status { msg_props = MsgProps } = MsgStatus, State1} =
+ remove_pending_ack(SeqId, State),
+ {MsgStatus #msg_status {
+ msg_props = (MsgPropsFun(MsgProps)) #message_properties {
needs_confirming = false } }, State1}.
beta_limit(BPQ) ->
@@ -1529,13 +1512,11 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
{Quota, State};
false ->
{SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
- MsgStatus = #msg_status {
- msg_id = MsgId, %% ASSERTION
- is_persistent = false, %% ASSERTION
- msg_props = MsgProps,
- index_on_disk = IndexOnDisk } = dict:fetch(SeqId, PA),
- {_, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
- PA1 = dict:store(SeqId, {false, MsgId, MsgProps, IndexOnDisk}, PA),
+ MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} =
+ dict:fetch(SeqId, PA),
+ {MsgStatus1, State1} =
+ maybe_write_to_disk(true, false, MsgStatus, State),
+ PA1 = dict:store(SeqId, m(trim_msg_status(MsgStatus1)), PA),
limit_ram_acks(Quota - 1,
State1 #vqstate { pending_ack = PA1,
ram_ack_index = RAI1 })
@@ -1724,7 +1705,7 @@ maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
State1 = #vqstate { ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount }} =
maybe_write_to_disk(true, false, MsgStatus, State),
- MsgStatus2 = m(MsgStatus1 #msg_status { msg = undefined }),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk),
State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1,
ram_index_count = RamIndexCount1 },