diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-23 08:20:08 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-23 08:20:08 +0100 |
commit | e5e4c8942efecc455e2a72c5435bc6bc535a742e (patch) | |
tree | 64bd2d6fc20e73ed47ecb48f918b24ea3fb76df4 | |
parent | 2816ffa027a523a9ff2de052cb834a706e8068c5 (diff) | |
download | rabbitmq-server-e5e4c8942efecc455e2a72c5435bc6bc535a742e.tar.gz |
cosmetics and minor refactoring on code from recently merged bug23111
-rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 51 | ||||
-rw-r--r-- | src/rabbit_invariable_queue.erl | 10 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 36 | ||||
-rw-r--r-- | src/rabbit_types.erl | 6 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 112 |
6 files changed, 95 insertions, 122 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index bc2b5ac9..9d78bafa 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -311,7 +311,7 @@ check_declare_arguments(QueueName, Args) -> "invalid arg '~s' for ~s: ~w", [Key, rabbit_misc:rs(QueueName), Error]) end || {Key, Fun} <- - [{<<"x-expires">>, fun check_expires_argument/1}, + [{<<"x-expires">>, fun check_expires_argument/1}, {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]], ok. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e9b70a8f..6048920e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -150,22 +150,6 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -init_queue_state(State) -> - lists:foldl(fun(F, S) -> F(S) end, State, - [fun init_expires/1, fun init_ttl/1]). - -init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> - case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of - {_Type, Expires} -> ensure_expiry_timer(State#q{expires = Expires}); - undefined -> State - end. - -init_ttl(State = #q{q = #amqqueue{arguments = Arguments}}) -> - case rabbit_misc:table_lookup(Arguments, <<"x-message-ttl">>) of - {_Type, TTL} -> drop_expired_messages(State#q{ttl = TTL}); - undefined -> State - end. - declare(Recover, From, State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, backing_queue = BQ, backing_queue_state = undefined, @@ -180,8 +164,7 @@ declare(Recover, From, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), - State1 = init_queue_state( - State#q{backing_queue_state = BQS}), + State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(StatsTimer, @@ -190,6 +173,19 @@ declare(Recover, From, Q1 -> {stop, normal, {existing, Q1}, State} end. +process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> + lists:foldl(fun({Arg, Fun}, State1) -> + case rabbit_misc:table_lookup(Arguments, Arg) of + {_Type, Val} -> Fun(Val, State1); + undefined -> State1 + end + end, State, [{<<"x-expires">>, fun init_expires/2}, + {<<"x-message-ttl">>, fun init_ttl/2}]). + +init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). + +init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}). + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), @@ -588,10 +584,8 @@ reset_msg_expiry_fun(TTL) -> message_properties(#q{ttl=TTL}) -> #message_properties{expiry = calculate_msg_expiry(TTL)}. -calculate_msg_expiry(undefined) -> - undefined; -calculate_msg_expiry(TTL) -> - now_millis() + (TTL * 1000). +calculate_msg_expiry(undefined) -> undefined; +calculate_msg_expiry(TTL) -> now_millis() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> State; @@ -610,18 +604,15 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ttl_timer_ref = undefined}) when TTL =/= undefined -> case BQ:is_empty(BQS) of - true -> - State; - false -> - State#q{ttl_timer_ref = - timer:apply_after(TTL, rabbit_amqqueue, - drop_expired, [self()])} + true -> State; + false -> TRef = timer:apply_after(TTL, rabbit_amqqueue, drop_expired, + [self()]), + State#q{ttl_timer_ref = TRef} end; ensure_ttl_timer(State) -> State. -now_millis() -> - timer:now_diff(now(), {0,0,0}). +now_millis() -> timer:now_diff(now(), {0,0,0}). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index ad01d8f7..9855f18c 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -122,12 +122,10 @@ dropwhile(_Pred, State = #iv_state { len = 0 }) -> dropwhile(Pred, State = #iv_state { queue = Q }) -> {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), case Pred(MsgProps) of - true -> - {_, State1} = - fetch_internal(false, Q1, Msg, MsgProps, IsDelivered, State), - dropwhile(Pred, State1); - false -> - State + true -> {_, State1} = fetch_internal(false, Q1, Msg, MsgProps, + IsDelivered, State), + dropwhile(Pred, State1); + false -> State end. fetch(_AckRequired, State = #iv_state { len = 0 }) -> diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index f84dff83..1b837128 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -98,7 +98,7 @@ %% and seeding the message store on start up. %% %% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{Guid, MsgProperties, IsPersistent}), +%% the tuple: {('no_pub'|{Guid, MsgProps, IsPersistent}), %% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly %% necessary for most operations. However, for startup, and to ensure %% the safe and correct combination of journal entries with entries @@ -162,7 +162,7 @@ %% ---- misc ---- --define(PUB, {_, _, _}). %% {Guid, MsgProperties, IsPersistent} +-define(PUB, {_, _, _}). %% {Guid, MsgProps, IsPersistent} -define(READ_MODE, [binary, raw, read]). -define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]). @@ -259,8 +259,7 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(Guid, SeqId, MsgProperties, IsPersistent, State) - when is_binary(Guid) -> +publish(Guid, SeqId, MsgProps, IsPersistent, State) when is_binary(Guid) -> ?GUID_BYTES = size(Guid), {JournalHdl, State1} = get_journal_handle(State), ok = file_handle_cache:append( @@ -269,9 +268,9 @@ publish(Guid, SeqId, MsgProperties, IsPersistent, State) false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, - create_pub_record_body(Guid, MsgProperties)]), + create_pub_record_body(Guid, MsgProps)]), maybe_flush_journal( - add_to_journal(SeqId, {Guid, MsgProperties, IsPersistent}, State1)). + add_to_journal(SeqId, {Guid, MsgProps, IsPersistent}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -464,9 +463,7 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, - {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack}, - Segment1) -> + fun (RelSeq, {{Guid, _MsgProps, _IsPersistent}, Del, no_ack}, Segment1) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, Del, RelSeq, Segment1) end, @@ -519,8 +516,7 @@ queue_index_walker_reader(QueueName, Gatherer) -> State = #qistate { segments = Segments, dir = Dir } = recover_journal(blank_state(QueueName)), [ok = segment_entries_foldr( - fun (_RelSeq, - {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, + fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, ok) -> gatherer:in(Gatherer, {Guid, 1}); (_RelSeq, _Value, Acc) -> @@ -678,8 +674,8 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> case read_pub_record_body(Hdl) of - {Guid, MsgProperties} -> - Publish = {Guid, MsgProperties, + {Guid, MsgProps} -> + Publish = {Guid, MsgProps, case Prefix of ?PUB_PERSIST_JPREFIX -> true; ?PUB_TRANS_JPREFIX -> false @@ -781,12 +777,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> ok = case Pub of no_pub -> ok; - {Guid, MsgProperties, IsPersistent} -> + {Guid, MsgProps, IsPersistent} -> file_handle_cache:append( Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, (bool_to_int(IsPersistent)):1, RelSeq:?REL_SEQ_BITS>>, - create_pub_record_body(Guid, MsgProperties)]) + create_pub_record_body(Guid, MsgProps)]) end, ok = case {Del, Ack} of {no_del, no_ack} -> @@ -806,12 +802,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, - {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack}, - Acc) + fun (RelSeq, {{Guid, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProperties, + [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, IsPersistent, IsDelivered == del} | Acc ]; (_RelSeq, _Value, Acc) -> Acc @@ -841,8 +835,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> - {Guid, MsgProperties} = read_pub_record_body(Hdl), - Obj = {{Guid, MsgProperties, 1 == IsPersistentNum}, no_del, no_ack}, + {Guid, MsgProps} = read_pub_record_body(Hdl), + Obj = {{Guid, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), load_segment_entries(KeepAcked, Hdl, SegEntries1, UnackedCount + 1); diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index d625bda1..91f2c4ca 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -37,13 +37,13 @@ -export_type([txn/0, maybe/1, info/0, info_key/0, message/0, basic_message/0, delivery/0, content/0, decoded_content/0, undecoded_content/0, - unencoded_content/0, encoded_content/0, vhost/0, ctag/0, - amqp_error/0, r/1, r2/2, r3/3, listener/0, + unencoded_content/0, encoded_content/0, message_properties/0, + vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0, binding/0, binding_source/0, binding_destination/0, amqqueue/0, exchange/0, connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, - connection_exit/0, message_properties/0]). + connection_exit/0]). -type(channel_exit() :: no_return()). -type(connection_exit() :: no_return()). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0b948c1b..1395c77f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -249,7 +249,7 @@ is_delivered, msg_on_disk, index_on_disk, - msg_properties + msg_props }). -record(delta, @@ -499,8 +499,8 @@ purge(State = #vqstate { q4 = Q4, ram_index_count = 0, persistent_count = PCount1 })}. -publish(Msg, MsgProperties, State) -> - {_SeqId, State1} = publish(Msg, MsgProperties, false, false, State), +publish(Msg, MsgProps, State) -> + {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) -> @@ -532,7 +532,7 @@ dropwhile(Pred, State) -> dropwhile1(Pred, State) -> internal_queue_out( - fun(MsgStatus = #msg_status { msg_properties = MsgProps }, State1) -> + fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) -> case Pred(MsgProps) of true -> {_, State2} = internal_fetch(false, MsgStatus, State1), @@ -634,14 +634,13 @@ ack(AckTags, State) -> fun (_AckEntry, State1) -> State1 end, AckTags, State)). -tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, - MsgProperties, +tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, State = #vqstate { durable = IsDurable, msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProperties} | Pubs] }), + store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), a(case IsPersistent andalso IsDurable of - true -> MsgStatus = msg_status(true, undefined, Msg, MsgProperties), + true -> MsgStatus = msg_status(true, undefined, Msg, MsgProps), {#msg_status { msg_on_disk = true }, MSCState1} = maybe_write_msg_to_disk(false, MsgStatus, MSCState), State #vqstate { msg_store_clients = MSCState1 }; @@ -683,18 +682,16 @@ tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> requeue(AckTags, MsgPropsFun, State) -> a(reduce_memory_use( ack(fun rabbit_msg_store:release/2, - fun (#msg_status { msg = Msg, - msg_properties = MsgProperties }, State1) -> - {_SeqId, State2} = - publish(Msg, MsgPropsFun(MsgProperties), true, - false, State1), + fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> + {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps), + true, false, State1), State2; - ({IsPersistent, Guid, MsgProperties}, State1) -> + ({IsPersistent, Guid, MsgProps}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, {{ok, Msg = #basic_message{}}, MSCState1} = read_from_msg_store(MSCState, IsPersistent, Guid), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProperties), + {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps), true, true, State2), State3 end, @@ -844,11 +841,11 @@ cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, - MsgProperties) -> + MsgProps) -> #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, is_persistent = IsPersistent, is_delivered = false, msg_on_disk = false, index_on_disk = false, - msg_properties = MsgProperties }. + msg_props = MsgProps }. find_msg_store(true) -> ?PERSISTENT_MSG_STORE; find_msg_store(false) -> ?TRANSIENT_MSG_STORE. @@ -883,26 +880,26 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx). erase_tx(Txn) -> erase({txn, Txn}). persistent_guids(Pubs) -> - [Guid || {#basic_message { guid = Guid, is_persistent = true }, _MsgProps} - <- Pubs]. + [Guid || {#basic_message { guid = Guid, + is_persistent = true }, _MsgProps} <- Pubs]. betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( - fun ({Guid, SeqId, MsgProperties, IsPersistent, IsDelivered}, + fun ({Guid, SeqId, MsgProps, IsPersistent, IsDelivered}, {Filtered1, Delivers1, Acks1}) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, cons_if(not IsDelivered, SeqId, Delivers1), [SeqId | Acks1]}; - false -> {[m(#msg_status { msg = undefined, - guid = Guid, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = true, - index_on_disk = true, - msg_properties = MsgProperties + false -> {[m(#msg_status { msg = undefined, + guid = Guid, + seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = true, + index_on_disk = true, + msg_props = MsgProps }) | Filtered1], Delivers1, Acks1} @@ -978,9 +975,10 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, case IsDurable of true -> [AckTag || AckTag <- AckTags, case dict:fetch(AckTag, PA) of - #msg_status {} -> false; - {IsPersistent, - _Guid, _MsgProps} -> IsPersistent + #msg_status {} -> + false; + {IsPersistent, _Guid, _MsgProps} -> + IsPersistent end]; false -> [] end, @@ -1011,21 +1009,19 @@ tx_commit_index(State = #vqstate { on_sync = #sync { durable = IsDurable }) -> PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), - Pubs = lists:foldl( - fun({Fun, PubsN}, OuterAcc) -> - lists:foldl( - fun({Msg, MsgProps}, Acc) -> - [{Msg, Fun(MsgProps)} | Acc] - end, OuterAcc, PubsN) - end, [], SPubs), + Pubs = lists:foldl(fun({Fun, PubsN}, OuterAcc) -> + lists:foldl(fun({Msg, MsgProps}, Acc) -> + [{Msg, Fun(MsgProps)} | Acc] + end, OuterAcc, PubsN) + end, [], SPubs), {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, - MsgProperties}, + MsgProps}, {SeqIdsAcc, State2}) -> IsPersistent1 = IsDurable andalso IsPersistent, {SeqId, State3} = - publish(Msg, MsgProperties, false, IsPersistent1, State2), + publish(Msg, MsgProps, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), @@ -1082,7 +1078,7 @@ sum_guids_by_store_to_len(LensByStore, GuidsByStore) -> %%---------------------------------------------------------------------------- publish(Msg = #basic_message { is_persistent = IsPersistent }, - MsgProperties, IsDelivered, MsgOnDisk, + MsgProps, IsDelivered, MsgOnDisk, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, len = Len, @@ -1091,7 +1087,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, durable = IsDurable, ram_msg_count = RamMsgCount }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties)) + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case bpqueue:is_empty(Q3) of @@ -1131,19 +1127,15 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - guid = Guid, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_properties = MsgProperties}, - IndexState) + guid = Guid, + seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_props = MsgProps}, IndexState) when Force orelse IsPersistent -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION - IndexState1 = rabbit_queue_index:publish(Guid, - SeqId, - MsgProperties, - IsPersistent, - IndexState), + IndexState1 = rabbit_queue_index:publish( + Guid, SeqId, MsgProps, IsPersistent, IndexState), {MsgStatus #msg_status { index_on_disk = true }, maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> @@ -1163,13 +1155,13 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, %% Internal gubbins for acks %%---------------------------------------------------------------------------- -record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, +record_pending_ack(#msg_status { seq_id = SeqId, + guid = Guid, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, - msg_properties = MsgProperties } = MsgStatus, - PA) -> + msg_on_disk = MsgOnDisk, + msg_props = MsgProps } = MsgStatus, PA) -> AckEntry = case MsgOnDisk of - true -> {IsPersistent, Guid, MsgProperties}; + true -> {IsPersistent, Guid, MsgProps}; false -> MsgStatus end, dict:store(SeqId, AckEntry, PA). @@ -1220,9 +1212,7 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false }, Acc) -> Acc; -accumulate_ack(SeqId, - {IsPersistent, Guid, _MsgProperties}, - {SeqIdsAcc, Dict}) -> +accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. |