diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-30 10:07:00 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-30 10:07:00 +0100 |
commit | 4c8e0ba742304badde6a5a43cc75046e3691d174 (patch) | |
tree | 9eaacf936bc99c0aad81a16a3afe92b189532eff | |
parent | 3c050f4c0cf27239e05ccf608695e9506147544a (diff) | |
download | rabbitmq-server-4c8e0ba742304badde6a5a43cc75046e3691d174.tar.gz |
always record pending acks in #msg_status form
#msg_status { msg = undefined } replaces the previous 4-tuple.
This simplifies the API and gets rid of the obscure, ad-hoc 4-tuple.
-rw-r--r-- | src/rabbit_variable_queue.erl | 87 |
1 files changed, 34 insertions, 53 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0c04bbd6..1a7166ca 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -158,19 +158,13 @@ %% to search through qi segments looking for messages that are yet to %% be acknowledged. %% -%% Pending acks are recorded in memory either as the tuple {SeqId, -%% MsgId, MsgProps} (tuple-form) or as the message itself (message- -%% form). Acks for persistent messages are always stored in the tuple- -%% form. Acks for transient messages are also stored in tuple-form if -%% the message has been sent to disk as part of the memory reduction -%% process. For transient messages that haven't already been written -%% to disk, acks are stored in message-form. +%% Pending acks are recorded in memory by storing the message itself. +%% If the message has been sent to disk, we do not store the message +%% content. During memory reduction, pending acks containing message +%% content have that content removed and the corresponding messages +%% are pushed out to disk. %% -%% During memory reduction, acks stored in message-form are converted -%% to tuple-form, and the corresponding messages are pushed out to -%% disk. -%% -%% The order in which alphas are pushed to betas and message-form acks +%% The order in which alphas are pushed to betas and pending acks %% are pushed to disk is determined dynamically. We always prefer to %% push messages for the source (alphas or acks) that is growing the %% fastest (with growth measured as avg. ingress - avg. egress). In @@ -575,8 +569,8 @@ ack(AckTags, State) -> ack_out_counter = AckOutCount }} = lists:foldl( fun (SeqId, {Acc, State2}) -> - {AckEntry, State3} = remove_pending_ack(SeqId, State2), - {accumulate_ack(SeqId, AckEntry, Acc), State3} + {MsgStatus, State3} = remove_pending_ack(SeqId, State2), + {accumulate_ack(MsgStatus, Acc), State3} end, {accumulate_ack_init(), State}, AckTags), IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) @@ -806,6 +800,8 @@ msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId }, msg_on_disk = false, index_on_disk = false, msg_props = MsgProps }. +trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }. + with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) -> {Result, MSCStateP1} = Fun(MSCStateP), {Result, {MSCStateP1, MSCStateT}}; @@ -1194,16 +1190,13 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, record_pending_ack(#msg_status { seq_id = SeqId, msg_id = MsgId, - is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, - index_on_disk = IndexOnDisk, - msg_props = MsgProps } = MsgStatus, + msg_on_disk = MsgOnDisk } = MsgStatus, State = #vqstate { pending_ack = PA, ram_ack_index = RAI, ack_in_counter = AckInCount}) -> {AckEntry, RAI1} = case MsgOnDisk of - true -> {{IsPersistent, MsgId, MsgProps, IndexOnDisk}, RAI}; + true -> {m(trim_msg_status(MsgStatus)), RAI}; false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)} end, PA1 = dict:store(SeqId, AckEntry, PA), @@ -1222,7 +1215,9 @@ purge_pending_ack(KeepPersistent, index_state = IndexState, msg_store_clients = MSCState }) -> {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = - dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), + dict:fold(fun (_SeqId, MsgStatus, Acc) -> + accumulate_ack(MsgStatus, Acc) + end, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, case KeepPersistent of @@ -1241,16 +1236,17 @@ purge_pending_ack(KeepPersistent, accumulate_ack_init() -> {[], orddict:new(), []}. -accumulate_ack(_SeqId, #msg_status { msg_id = MsgId, - is_persistent = false, %% ASSERTIONS - msg_on_disk = false, - index_on_disk = false }, - {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> - {IndexOnDiskSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]}; -accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps, IndexOnDisk}, +accumulate_ack(#msg_status { seq_id = SeqId, + msg_id = MsgId, + is_persistent = IsPersistent, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }, {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore), + case MsgOnDisk of + true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); + false -> MsgIdsByStore + end, [MsgId | AllMsgIds]}. find_persistent_count(LensByStore) -> @@ -1425,23 +1421,10 @@ delta_merge(SeqIds, #delta { start_seq_id = StartSeqId, %% Mostly opposite of record_pending_ack/2 msg_from_pending_ack(SeqId, MsgPropsFun, State) -> - {AckEntry, State1} = remove_pending_ack(SeqId, State), - #msg_status { msg_props = MsgProps1 } = MsgStatus1 = - case AckEntry of - {IsPersistent, MsgId, MsgProps, IndexOnDisk} -> - m(#msg_status { seq_id = SeqId, - msg_id = MsgId, - msg = undefined, - is_persistent = IsPersistent, - is_delivered = true, - msg_on_disk = true, - index_on_disk = IndexOnDisk, - msg_props = MsgProps }); - #msg_status{} = MsgStatus -> - MsgStatus - end, - {MsgStatus1 #msg_status { - msg_props = (MsgPropsFun(MsgProps1)) #message_properties { + {#msg_status { msg_props = MsgProps } = MsgStatus, State1} = + remove_pending_ack(SeqId, State), + {MsgStatus #msg_status { + msg_props = (MsgPropsFun(MsgProps)) #message_properties { needs_confirming = false } }, State1}. beta_limit(BPQ) -> @@ -1529,13 +1512,11 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, {Quota, State}; false -> {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI), - MsgStatus = #msg_status { - msg_id = MsgId, %% ASSERTION - is_persistent = false, %% ASSERTION - msg_props = MsgProps, - index_on_disk = IndexOnDisk } = dict:fetch(SeqId, PA), - {_, State1} = maybe_write_to_disk(true, false, MsgStatus, State), - PA1 = dict:store(SeqId, {false, MsgId, MsgProps, IndexOnDisk}, PA), + MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} = + dict:fetch(SeqId, PA), + {MsgStatus1, State1} = + maybe_write_to_disk(true, false, MsgStatus, State), + PA1 = dict:store(SeqId, m(trim_msg_status(MsgStatus1)), PA), limit_ram_acks(Quota - 1, State1 #vqstate { pending_ack = PA1, ram_ack_index = RAI1 }) @@ -1724,7 +1705,7 @@ maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> State1 = #vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount }} = maybe_write_to_disk(true, false, MsgStatus, State), - MsgStatus2 = m(MsgStatus1 #msg_status { msg = undefined }), + MsgStatus2 = m(trim_msg_status(MsgStatus1)), RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk), State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1, ram_index_count = RamIndexCount1 }, |