diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-06-17 15:44:55 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-06-17 15:44:55 +0100 |
commit | 4e61d413e034b213472024c42873e34db6e1a22e (patch) | |
tree | 4133f12223926a6f1f16f80b93a7a542b1c77311 | |
parent | a8d81857f25962ff5af6ad1d14d1345c400a67a4 (diff) | |
download | rabbitmq-server-4e61d413e034b213472024c42873e34db6e1a22e.tar.gz |
Renaming variables. All tests still pass
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 54 | ||||
-rw-r--r-- | src/rabbit_disk_queue.erl | 100 | ||||
-rw-r--r-- | src/rabbit_mixed_queue.erl | 40 |
3 files changed, 95 insertions, 99 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 593746a7..6dbd95c2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -186,7 +186,7 @@ deliver_queue(Funs = {PredFun, DeliverFun}, FunAcc0, case (IsMsgReady andalso rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of true -> - {{Msg, IsDelivered, AckTag}, FunAcc1, State2} = + {{Msg, IsDelivered, AckTag}, FunAcc1, State1} = DeliverFun(AckRequired, FunAcc0, State), ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Msg]), rabbit_channel:deliver( @@ -212,12 +212,12 @@ deliver_queue(Funs = {PredFun, DeliverFun}, FunAcc0, {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - State3 = State2 #q { + State2 = State1 #q { active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers, next_msg_id = NextId + 1 }, - deliver_queue(Funs, FunAcc1, State3); + deliver_queue(Funs, FunAcc1, State2); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> store_ch_record(C#cr{is_limit_active = true}), @@ -241,41 +241,41 @@ deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) -> not IsEmpty. deliver_from_queue_deliver(AckRequired, {false, AutoAcks}, State = #q { mixed_state = MS }) -> - {{Msg, IsDelivered, AckTag, Remaining}, MS2} = + {{Msg, IsDelivered, AckTag, Remaining}, MS1} = rabbit_mixed_queue:deliver(MS), - AutoAcks2 = + AutoAcks1 = case AckRequired of true -> AutoAcks; false -> [AckTag | AutoAcks] end, - {{Msg, IsDelivered, AckTag}, {0 == Remaining, AutoAcks2}, - State #q { mixed_state = MS2 }}. + {{Msg, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1}, + State #q { mixed_state = MS1 }}. run_message_queue(State = #q { mixed_state = MS }) -> Funs = { fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3 }, IsEmpty = rabbit_mixed_queue:is_empty(MS), - {{_IsEmpty2, AutoAcks}, State2} = + {{_IsEmpty1, AutoAcks}, State1} = deliver_queue(Funs, {IsEmpty, []}, State), - {ok, MS2} = - rabbit_mixed_queue:ack(lists:reverse(AutoAcks), State2 #q.mixed_state), - State2 #q { mixed_state = MS2 }. + {ok, MS1} = + rabbit_mixed_queue:ack(lists:reverse(AutoAcks), State1 #q.mixed_state), + State1 #q { mixed_state = MS1 }. attempt_immediate_delivery(none, _ChPid, Msg, State) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = - fun (AckRequired, false, State2) -> - {AckTag, State3} = + fun (AckRequired, false, State1) -> + {AckTag, State2} = case AckRequired of true -> - {ok, AckTag2, MS} = + {ok, AckTag1, MS} = rabbit_mixed_queue:publish_delivered( - Msg, State2 #q.mixed_state), - {AckTag2, State2 #q { mixed_state = MS }}; + Msg, State1 #q.mixed_state), + {AckTag1, State1 #q { mixed_state = MS }}; false -> - {noack, State2} + {noack, State1} end, - {{Msg, false, AckTag}, true, State3} + {{Msg, false, AckTag}, true, State2} end, deliver_queue({ PredFun, DeliverFun }, false, State); attempt_immediate_delivery(Txn, ChPid, Msg, State) -> @@ -307,8 +307,8 @@ deliver_or_requeue_n(MsgsWithAcks, State) -> NewState #q.mixed_state), case OutstandingMsgs of [] -> run_message_queue(NewState #q { mixed_state = MS }); - _ -> {ok, MS2} = rabbit_mixed_queue:requeue(OutstandingMsgs, MS), - NewState #q { mixed_state = MS2 } + _ -> {ok, MS1} = rabbit_mixed_queue:requeue(OutstandingMsgs, MS), + NewState #q { mixed_state = MS1 } end. deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) -> @@ -378,7 +378,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> deliver_or_requeue_n( [MsgWithAck || {_MsgId, MsgWithAck} <- dict:to_list(UAM)], - State1 # q { + State1 #q { exclusive_consumer = case Holder of {ChPid, _} -> none; Other -> Other @@ -576,8 +576,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, mixed_state = MS }) -> case rabbit_mixed_queue:deliver(MS) of - {empty, MS2} -> reply(empty, State #q { mixed_state = MS2 }); - {{Msg, IsDelivered, AckTag, Remaining}, MS2} -> + {empty, MS1} -> reply(empty, State #q { mixed_state = MS1 }); + {{Msg, IsDelivered, AckTag, Remaining}, MS1} -> AckRequired = not(NoAck), {ok, MS3} = case AckRequired of @@ -585,9 +585,9 @@ handle_call({basic_get, ChPid, NoAck}, _From, C = #cr{unacked_messages = UAM} = ch_record(ChPid), NewUAM = dict:store(NextId, {Msg, AckTag}, UAM), store_ch_record(C#cr{unacked_messages = NewUAM}), - {ok, MS2}; + {ok, MS1}; false -> - rabbit_mixed_queue:ack([AckTag], MS2) + rabbit_mixed_queue:ack([AckTag], MS1) end, Message = {QName, self(), NextId, IsDelivered, Msg}, reply({ok, Remaining, Message}, @@ -790,11 +790,11 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end)); handle_cast({constrain, Constrain}, State = #q { mixed_state = MS }) -> - {ok, MS2} = (case Constrain of + {ok, MS1} = (case Constrain of true -> fun rabbit_mixed_queue:to_disk_only_mode/1; false -> fun rabbit_mixed_queue:to_mixed_mode/1 end)(MS), - noreply(State #q { mixed_state = MS2 }). + noreply(State #q { mixed_state = MS1 }). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index e82feb99..a33a4b28 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -654,7 +654,7 @@ get_read_handle(File, State = current_file_handle = CurHdl, current_dirty = IsDirty }) -> - IsDirty2 = if CurName =:= File andalso IsDirty -> + IsDirty1 = if CurName =:= File andalso IsDirty -> file:sync(CurHdl), false; true -> IsDirty @@ -680,10 +680,10 @@ get_read_handle(File, State = {ok, {Hdl, Then}} -> {Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)} end, - ReadHdls3 = dict:store(File, {FileHdl, Now}, ReadHdls1), + ReadHdls2 = dict:store(File, {FileHdl, Now}, ReadHdls1), ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1), - {FileHdl, State #dqstate { read_file_handles = {ReadHdls3, ReadHdlsAge3}, - current_dirty = IsDirty2 + {FileHdl, State #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge3}, + current_dirty = IsDirty1 }}. adjust_last_msg_seq_id(_Q, ExpectedSeqId, next, _Mode) -> @@ -784,10 +784,10 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete, }) -> Files = lists:foldl( - fun ({MsgId, SeqId}, Files2) -> + fun ({MsgId, SeqId}, Files1) -> [{MsgId, RefCount, File, Offset, TotalSize}] = dets_ets_lookup(State, MsgId), - Files3 = + Files2 = case RefCount of 1 -> ok = dets_ets_delete(State, MsgId), @@ -800,28 +800,26 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete, {File, (ValidTotalSize-TotalSize- ?FILE_PACKING_ADJUSTMENT), ContiguousTop1, Left, Right}), - if CurName =:= File -> Files2; - true -> sets:add_element(File, Files2) + if CurName =:= File -> Files1; + true -> sets:add_element(File, Files1) end; _ when 1 < RefCount -> ok = dets_ets_insert( State, {MsgId, RefCount - 1, File, Offset, TotalSize}), - Files2 + Files1 end, ok = case MnesiaDelete of - true -> - mnesia:dirty_delete(rabbit_disk_queue, - {Q, SeqId}); - txn -> - mnesia:delete(rabbit_disk_queue, - {Q, SeqId}, write); + true -> mnesia:dirty_delete(rabbit_disk_queue, + {Q, SeqId}); + txn -> mnesia:delete(rabbit_disk_queue, + {Q, SeqId}, write); _ -> ok end, - Files3 + Files2 end, sets:new(), MsgSeqIds), - State2 = compact(Files, State), - {ok, State2}. + State1 = compact(Files, State), + {ok, State1}. internal_tx_publish(MsgId, MsgBody, State = #dqstate { current_file_handle = CurHdl, @@ -870,12 +868,12 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, [{_, FirstSeqIdTo}|_] -> {InitReadSeqId, InitWriteSeqId, InitLength} = sequence_lookup(Sequences, Q), - InitReadSeqId2 = determine_next_read_id( + InitReadSeqId1 = determine_next_read_id( InitReadSeqId, InitWriteSeqId, FirstSeqIdTo), { zip_with_tail(PubMsgSeqIds, {last, {next, next}}), - InitWriteSeqId, InitReadSeqId2, InitLength} + InitWriteSeqId, InitReadSeqId1, InitLength} end, - {atomic, {Sync, WriteSeqId, State2}} = + {atomic, {Sync, WriteSeqId, State1}} = mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), @@ -884,42 +882,42 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, %% it's been published, which is clearly %% nonsense. I.e. in commit, do not do things in an %% order which _could_not_ have happened. - {Sync2, WriteSeqId3} = + {Sync1, WriteSeqId1} = lists:foldl( fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}}, {Acc, ExpectedSeqId}) -> [{MsgId, _RefCount, File, _Offset, _TotalSize}] = dets_ets_lookup(State, MsgId), - SeqId2 = adjust_last_msg_seq_id( + SeqId1 = adjust_last_msg_seq_id( Q, ExpectedSeqId, SeqId, write), - NextSeqId2 = - find_next_seq_id(SeqId2, NextSeqId), + NextSeqId1 = + find_next_seq_id(SeqId1, NextSeqId), ok = mnesia:write( rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = - {Q, SeqId2}, + {Q, SeqId1}, msg_id = MsgId, is_delivered = false, - next_seq_id = NextSeqId2 + next_seq_id = NextSeqId1 }, write), - {Acc orelse (CurName =:= File), NextSeqId2} + {Acc orelse (CurName =:= File), NextSeqId1} end, {false, PubAcc}, PubList), - {ok, State3} = remove_messages(Q, AckSeqIds, txn, State), - {Sync2, WriteSeqId3, State3} + {ok, State2} = remove_messages(Q, AckSeqIds, txn, State), + {Sync1, WriteSeqId1, State2} end), true = case PubList of [] -> true; _ -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId, Length + erlang:length(PubList)}) end, - IsDirty2 = if IsDirty andalso Sync -> + IsDirty1 = if IsDirty andalso Sync -> ok = file:sync(CurHdl), false; true -> IsDirty end, - {ok, State2 #dqstate { current_dirty = IsDirty2 }}. + {ok, State1 #dqstate { current_dirty = IsDirty1 }}. %% SeqId can be 'next' internal_publish(Q, MsgId, SeqId, MsgBody, IsDelivered, State) -> @@ -971,44 +969,44 @@ internal_requeue(Q, MsgSeqIds = [{_, {FirstSeqIdTo, _}}|_], %% as they have no concept of sequence id anyway). {ReadSeqId, WriteSeqId, Length} = sequence_lookup(Sequences, Q), - ReadSeqId2 = determine_next_read_id(ReadSeqId, WriteSeqId, FirstSeqIdTo), + ReadSeqId1 = determine_next_read_id(ReadSeqId, WriteSeqId, FirstSeqIdTo), MsgSeqIdsZipped = zip_with_tail(MsgSeqIds, {last, {next, {next, true}}}), - {atomic, {WriteSeqId2, Q}} = + {atomic, {WriteSeqId1, Q}} = mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), lists:foldl(fun requeue_message/2, {WriteSeqId, Q}, MsgSeqIdsZipped) end), - true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId2, + true = ets:insert(Sequences, {Q, ReadSeqId1, WriteSeqId1, Length + erlang:length(MsgSeqIds)}), {ok, State}. requeue_message({{{MsgId, SeqIdOrig}, {SeqIdTo, NewIsDelivered}}, {_NextMsgSeqId, {NextSeqIdTo, _NextNewIsDelivered}}}, {ExpectedSeqIdTo, Q}) -> - SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write), - NextSeqIdTo2 = find_next_seq_id(SeqIdTo2, NextSeqIdTo), + SeqIdTo1 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write), + NextSeqIdTo1 = find_next_seq_id(SeqIdTo1, NextSeqIdTo), [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId, next_seq_id = NextSeqIdOrig }] = mnesia:read(rabbit_disk_queue, {Q, SeqIdOrig}, write), - if SeqIdTo2 == SeqIdOrig andalso NextSeqIdTo2 == NextSeqIdOrig -> ok; + if SeqIdTo1 == SeqIdOrig andalso NextSeqIdTo1 == NextSeqIdOrig -> ok; true -> ok = mnesia:write(rabbit_disk_queue, - Obj #dq_msg_loc {queue_and_seq_id = {Q, SeqIdTo2}, - next_seq_id = NextSeqIdTo2, + Obj #dq_msg_loc {queue_and_seq_id = {Q, SeqIdTo1}, + next_seq_id = NextSeqIdTo1, is_delivered = NewIsDelivered }, write), ok = mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write) end, - {NextSeqIdTo2, Q}. + {NextSeqIdTo1, Q}. internal_purge(Q, State = #dqstate { sequences = Sequences }) -> case ets:lookup(Sequences, Q) of [] -> {ok, 0, State}; [{Q, ReadSeqId, WriteSeqId, _Length}] -> - {atomic, {ok, State2}} = + {atomic, {ok, State1}} = mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), @@ -1025,7 +1023,7 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) -> remove_messages(Q, MsgSeqIds, txn, State) end), true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId, 0}), - {ok, WriteSeqId - ReadSeqId, State2} + {ok, WriteSeqId - ReadSeqId, State1} end. internal_delete_queue(Q, State) -> @@ -1279,7 +1277,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop, copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, Destination, State) -> - {FinalOffset, BlockStart2, BlockEnd2} = + {FinalOffset, BlockStart1, BlockEnd1} = lists:foldl( fun ({MsgId, RefCount, _Source, Offset, TotalSize}, {CurOffset, BlockStart, BlockEnd}) -> @@ -1309,9 +1307,9 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, end end, {InitOffset, undefined, undefined}, WorkList), %% do the last remaining block - BSize2 = BlockEnd2 - BlockStart2, - {ok, BlockStart2} = file:position(SourceHdl, {bof, BlockStart2}), - {ok, BSize2} = file:copy(SourceHdl, DestinationHdl, BSize2), + BSize1 = BlockEnd1 - BlockStart1, + {ok, BlockStart1} = file:position(SourceHdl, {bof, BlockStart1}), + {ok, BSize1} = file:copy(SourceHdl, DestinationHdl, BSize1), ok. close_file(File, State = #dqstate { read_file_handles = @@ -1366,7 +1364,7 @@ del_index() -> %% hmm, something weird must be going on, but it's probably %% not the end of the world {aborted, {no_exists, rabbit_disk_queue,_}} -> ok; - E2 -> E2 + E1 -> E1 end. load_from_disk(State) -> @@ -1449,11 +1447,11 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) -> lists:foreach( fun ({Q, ReadSeqId, WriteSeqId, _Length}) -> Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0), - ReadSeqId2 = ReadSeqId + Gap, - Length = WriteSeqId - ReadSeqId2, + ReadSeqId1 = ReadSeqId + Gap, + Length = WriteSeqId - ReadSeqId1, true = ets:insert(Sequences, - {Q, ReadSeqId2, WriteSeqId, Length}) + {Q, ReadSeqId1, WriteSeqId, Length}) end, ets:match_object(Sequences, '_')) end). diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index a2e01bda..6caea55d 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -133,14 +133,14 @@ deliver_all_messages(Q, IsDurable, Acks, Requeue, Length) -> #basic_message { guid = MsgId, is_persistent = IsPersistent } = bin_to_msg(MsgBin), OnDisk = IsPersistent andalso IsDurable, - {Acks2, Requeue2, Length2} = + {Acks1, Requeue1, Length1} = if OnDisk -> {Acks, [{AckTag, {next, IsDelivered}} | Requeue], Length + 1 }; true -> {[AckTag | Acks], Requeue, Length} end, - deliver_all_messages(Q, IsDurable, Acks2, Requeue2, Length2) + deliver_all_messages(Q, IsDurable, Acks1, Requeue1, Length1) end. msg_to_bin(Msg = #basic_message { content = Content }) -> @@ -196,25 +196,25 @@ deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable, = rabbit_disk_queue:deliver(Q), #basic_message { guid = MsgId, is_persistent = IsPersistent } = Msg = bin_to_msg(MsgBin), - AckTag2 = if IsPersistent andalso IsDurable -> AckTag; + AckTag1 = if IsPersistent andalso IsDurable -> AckTag; true -> ok = rabbit_disk_queue:ack(Q, [AckTag]), noack end, - {{Msg, IsDelivered, AckTag2, Remaining}, + {{Msg, IsDelivered, AckTag1, Remaining}, State #mqstate { length = Length - 1}}; deliver(State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable, msg_buf = MsgBuf, length = Length }) -> {{value, {Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, - IsDelivered, OnDisk}}, MsgBuf2} + IsDelivered, OnDisk}}, MsgBuf1} = queue:out(MsgBuf), AckTag = if OnDisk -> if IsPersistent andalso IsDurable -> - {MsgId, IsDelivered, AckTag2, _PersistRem} = + {MsgId, IsDelivered, AckTag1, _PersistRem} = rabbit_disk_queue:phantom_deliver(Q), - AckTag2; + AckTag1; true -> ok = rabbit_disk_queue:auto_ack_next_message(Q), noack @@ -223,7 +223,7 @@ deliver(State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable, end, Rem = Length - 1, {{Msg, IsDelivered, AckTag, Rem}, - State #mqstate { msg_buf = MsgBuf2, length = Rem }}. + State #mqstate { msg_buf = MsgBuf1, length = Rem }}. remove_noacks(Acks) -> lists:filter(fun (A) -> A /= noack end, Acks). @@ -264,17 +264,16 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable, length = Length }) -> - {PersistentPubs, MsgBuf2} = + {PersistentPubs, MsgBuf1} = lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent }, - {Acc, MsgBuf3}) -> + {Acc, MsgBuf2}) -> OnDisk = IsPersistent andalso IsDurable, - Acc2 = + Acc1 = if OnDisk -> [Msg #basic_message.guid | Acc]; true -> Acc end, - MsgBuf4 = queue:in({Msg, false, OnDisk}, MsgBuf3), - {Acc2, MsgBuf4} + {Acc1, queue:in({Msg, false, OnDisk}, MsgBuf2)} end, {[], MsgBuf}, Publishes), %% foldl reverses, so re-reverse PersistentPubs to match %% requirements of rabbit_disk_queue (ascending SeqIds) @@ -284,7 +283,7 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q, rabbit_disk_queue:tx_commit( Q, lists:reverse(PersistentPubs), RealAcks) end, - {ok, State #mqstate { msg_buf = MsgBuf2, + {ok, State #mqstate { msg_buf = MsgBuf1, length = Length + erlang:length(Publishes) }}. only_persistent_msg_ids(Pubs) -> @@ -325,7 +324,7 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q, true -> rabbit_disk_queue:requeue( Q, lists:reverse(RQ)) end, - _AckTag2 = rabbit_disk_queue:publish( + _AckTag1 = rabbit_disk_queue:publish( Q, MsgId, msg_to_bin(Msg), true), [] end, [], MessagesWithAckTags), @@ -336,22 +335,21 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable, length = Length }) -> - {PersistentPubs, MsgBuf2} = + {PersistentPubs, MsgBuf1} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag}, - {Acc, MsgBuf3}) -> + {Acc, MsgBuf2}) -> OnDisk = IsDurable andalso IsPersistent, - Acc2 = + Acc1 = if OnDisk -> [AckTag | Acc]; true -> Acc end, - MsgBuf4 = queue:in({Msg, true, OnDisk}, MsgBuf3), - {Acc2, MsgBuf4} + {Acc1, queue:in({Msg, true, OnDisk}, MsgBuf2)} end, {[], MsgBuf}, MessagesWithAckTags), ok = if [] == PersistentPubs -> ok; true -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs)) end, - {ok, State #mqstate {msg_buf = MsgBuf2, + {ok, State #mqstate {msg_buf = MsgBuf1, length = Length + erlang:length(MessagesWithAckTags)}}. purge(State = #mqstate { queue = Q, mode = disk, length = Count }) -> |