summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-17 15:44:55 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-17 15:44:55 +0100
commit4e61d413e034b213472024c42873e34db6e1a22e (patch)
tree4133f12223926a6f1f16f80b93a7a542b1c77311
parenta8d81857f25962ff5af6ad1d14d1345c400a67a4 (diff)
downloadrabbitmq-server-4e61d413e034b213472024c42873e34db6e1a22e.tar.gz
Renaming variables. All tests still pass
-rw-r--r--src/rabbit_amqqueue_process.erl54
-rw-r--r--src/rabbit_disk_queue.erl100
-rw-r--r--src/rabbit_mixed_queue.erl40
3 files changed, 95 insertions, 99 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 593746a7..6dbd95c2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -186,7 +186,7 @@ deliver_queue(Funs = {PredFun, DeliverFun}, FunAcc0,
case (IsMsgReady andalso
rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of
true ->
- {{Msg, IsDelivered, AckTag}, FunAcc1, State2} =
+ {{Msg, IsDelivered, AckTag}, FunAcc1, State1} =
DeliverFun(AckRequired, FunAcc0, State),
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Msg]),
rabbit_channel:deliver(
@@ -212,12 +212,12 @@ deliver_queue(Funs = {PredFun, DeliverFun}, FunAcc0,
{ActiveConsumers1,
queue:in(QEntry, BlockedConsumers1)}
end,
- State3 = State2 #q {
+ State2 = State1 #q {
active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers,
next_msg_id = NextId + 1
},
- deliver_queue(Funs, FunAcc1, State3);
+ deliver_queue(Funs, FunAcc1, State2);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
store_ch_record(C#cr{is_limit_active = true}),
@@ -241,41 +241,41 @@ deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) ->
not IsEmpty.
deliver_from_queue_deliver(AckRequired, {false, AutoAcks},
State = #q { mixed_state = MS }) ->
- {{Msg, IsDelivered, AckTag, Remaining}, MS2} =
+ {{Msg, IsDelivered, AckTag, Remaining}, MS1} =
rabbit_mixed_queue:deliver(MS),
- AutoAcks2 =
+ AutoAcks1 =
case AckRequired of
true -> AutoAcks;
false -> [AckTag | AutoAcks]
end,
- {{Msg, IsDelivered, AckTag}, {0 == Remaining, AutoAcks2},
- State #q { mixed_state = MS2 }}.
+ {{Msg, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1},
+ State #q { mixed_state = MS1 }}.
run_message_queue(State = #q { mixed_state = MS }) ->
Funs = { fun deliver_from_queue_pred/2,
fun deliver_from_queue_deliver/3 },
IsEmpty = rabbit_mixed_queue:is_empty(MS),
- {{_IsEmpty2, AutoAcks}, State2} =
+ {{_IsEmpty1, AutoAcks}, State1} =
deliver_queue(Funs, {IsEmpty, []}, State),
- {ok, MS2} =
- rabbit_mixed_queue:ack(lists:reverse(AutoAcks), State2 #q.mixed_state),
- State2 #q { mixed_state = MS2 }.
+ {ok, MS1} =
+ rabbit_mixed_queue:ack(lists:reverse(AutoAcks), State1 #q.mixed_state),
+ State1 #q { mixed_state = MS1 }.
attempt_immediate_delivery(none, _ChPid, Msg, State) ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
- fun (AckRequired, false, State2) ->
- {AckTag, State3} =
+ fun (AckRequired, false, State1) ->
+ {AckTag, State2} =
case AckRequired of
true ->
- {ok, AckTag2, MS} =
+ {ok, AckTag1, MS} =
rabbit_mixed_queue:publish_delivered(
- Msg, State2 #q.mixed_state),
- {AckTag2, State2 #q { mixed_state = MS }};
+ Msg, State1 #q.mixed_state),
+ {AckTag1, State1 #q { mixed_state = MS }};
false ->
- {noack, State2}
+ {noack, State1}
end,
- {{Msg, false, AckTag}, true, State3}
+ {{Msg, false, AckTag}, true, State2}
end,
deliver_queue({ PredFun, DeliverFun }, false, State);
attempt_immediate_delivery(Txn, ChPid, Msg, State) ->
@@ -307,8 +307,8 @@ deliver_or_requeue_n(MsgsWithAcks, State) ->
NewState #q.mixed_state),
case OutstandingMsgs of
[] -> run_message_queue(NewState #q { mixed_state = MS });
- _ -> {ok, MS2} = rabbit_mixed_queue:requeue(OutstandingMsgs, MS),
- NewState #q { mixed_state = MS2 }
+ _ -> {ok, MS1} = rabbit_mixed_queue:requeue(OutstandingMsgs, MS),
+ NewState #q { mixed_state = MS1 }
end.
deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) ->
@@ -378,7 +378,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
deliver_or_requeue_n(
[MsgWithAck ||
{_MsgId, MsgWithAck} <- dict:to_list(UAM)],
- State1 # q {
+ State1 #q {
exclusive_consumer = case Holder of
{ChPid, _} -> none;
Other -> Other
@@ -576,8 +576,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
mixed_state = MS
}) ->
case rabbit_mixed_queue:deliver(MS) of
- {empty, MS2} -> reply(empty, State #q { mixed_state = MS2 });
- {{Msg, IsDelivered, AckTag, Remaining}, MS2} ->
+ {empty, MS1} -> reply(empty, State #q { mixed_state = MS1 });
+ {{Msg, IsDelivered, AckTag, Remaining}, MS1} ->
AckRequired = not(NoAck),
{ok, MS3} =
case AckRequired of
@@ -585,9 +585,9 @@ handle_call({basic_get, ChPid, NoAck}, _From,
C = #cr{unacked_messages = UAM} = ch_record(ChPid),
NewUAM = dict:store(NextId, {Msg, AckTag}, UAM),
store_ch_record(C#cr{unacked_messages = NewUAM}),
- {ok, MS2};
+ {ok, MS1};
false ->
- rabbit_mixed_queue:ack([AckTag], MS2)
+ rabbit_mixed_queue:ack([AckTag], MS1)
end,
Message = {QName, self(), NextId, IsDelivered, Msg},
reply({ok, Remaining, Message},
@@ -790,11 +790,11 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end));
handle_cast({constrain, Constrain}, State = #q { mixed_state = MS }) ->
- {ok, MS2} = (case Constrain of
+ {ok, MS1} = (case Constrain of
true -> fun rabbit_mixed_queue:to_disk_only_mode/1;
false -> fun rabbit_mixed_queue:to_mixed_mode/1
end)(MS),
- noreply(State #q { mixed_state = MS2 }).
+ noreply(State #q { mixed_state = MS1 }).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index e82feb99..a33a4b28 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -654,7 +654,7 @@ get_read_handle(File, State =
current_file_handle = CurHdl,
current_dirty = IsDirty
}) ->
- IsDirty2 = if CurName =:= File andalso IsDirty ->
+ IsDirty1 = if CurName =:= File andalso IsDirty ->
file:sync(CurHdl),
false;
true -> IsDirty
@@ -680,10 +680,10 @@ get_read_handle(File, State =
{ok, {Hdl, Then}} ->
{Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)}
end,
- ReadHdls3 = dict:store(File, {FileHdl, Now}, ReadHdls1),
+ ReadHdls2 = dict:store(File, {FileHdl, Now}, ReadHdls1),
ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1),
- {FileHdl, State #dqstate { read_file_handles = {ReadHdls3, ReadHdlsAge3},
- current_dirty = IsDirty2
+ {FileHdl, State #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge3},
+ current_dirty = IsDirty1
}}.
adjust_last_msg_seq_id(_Q, ExpectedSeqId, next, _Mode) ->
@@ -784,10 +784,10 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
}) ->
Files =
lists:foldl(
- fun ({MsgId, SeqId}, Files2) ->
+ fun ({MsgId, SeqId}, Files1) ->
[{MsgId, RefCount, File, Offset, TotalSize}] =
dets_ets_lookup(State, MsgId),
- Files3 =
+ Files2 =
case RefCount of
1 ->
ok = dets_ets_delete(State, MsgId),
@@ -800,28 +800,26 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
{File, (ValidTotalSize-TotalSize-
?FILE_PACKING_ADJUSTMENT),
ContiguousTop1, Left, Right}),
- if CurName =:= File -> Files2;
- true -> sets:add_element(File, Files2)
+ if CurName =:= File -> Files1;
+ true -> sets:add_element(File, Files1)
end;
_ when 1 < RefCount ->
ok = dets_ets_insert(
State, {MsgId, RefCount - 1,
File, Offset, TotalSize}),
- Files2
+ Files1
end,
ok = case MnesiaDelete of
- true ->
- mnesia:dirty_delete(rabbit_disk_queue,
- {Q, SeqId});
- txn ->
- mnesia:delete(rabbit_disk_queue,
- {Q, SeqId}, write);
+ true -> mnesia:dirty_delete(rabbit_disk_queue,
+ {Q, SeqId});
+ txn -> mnesia:delete(rabbit_disk_queue,
+ {Q, SeqId}, write);
_ -> ok
end,
- Files3
+ Files2
end, sets:new(), MsgSeqIds),
- State2 = compact(Files, State),
- {ok, State2}.
+ State1 = compact(Files, State),
+ {ok, State1}.
internal_tx_publish(MsgId, MsgBody,
State = #dqstate { current_file_handle = CurHdl,
@@ -870,12 +868,12 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
[{_, FirstSeqIdTo}|_] ->
{InitReadSeqId, InitWriteSeqId, InitLength} =
sequence_lookup(Sequences, Q),
- InitReadSeqId2 = determine_next_read_id(
+ InitReadSeqId1 = determine_next_read_id(
InitReadSeqId, InitWriteSeqId, FirstSeqIdTo),
{ zip_with_tail(PubMsgSeqIds, {last, {next, next}}),
- InitWriteSeqId, InitReadSeqId2, InitLength}
+ InitWriteSeqId, InitReadSeqId1, InitLength}
end,
- {atomic, {Sync, WriteSeqId, State2}} =
+ {atomic, {Sync, WriteSeqId, State1}} =
mnesia:transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
@@ -884,42 +882,42 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
%% it's been published, which is clearly
%% nonsense. I.e. in commit, do not do things in an
%% order which _could_not_ have happened.
- {Sync2, WriteSeqId3} =
+ {Sync1, WriteSeqId1} =
lists:foldl(
fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}},
{Acc, ExpectedSeqId}) ->
[{MsgId, _RefCount, File, _Offset,
_TotalSize}] = dets_ets_lookup(State, MsgId),
- SeqId2 = adjust_last_msg_seq_id(
+ SeqId1 = adjust_last_msg_seq_id(
Q, ExpectedSeqId, SeqId, write),
- NextSeqId2 =
- find_next_seq_id(SeqId2, NextSeqId),
+ NextSeqId1 =
+ find_next_seq_id(SeqId1, NextSeqId),
ok = mnesia:write(
rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id =
- {Q, SeqId2},
+ {Q, SeqId1},
msg_id = MsgId,
is_delivered = false,
- next_seq_id = NextSeqId2
+ next_seq_id = NextSeqId1
},
write),
- {Acc orelse (CurName =:= File), NextSeqId2}
+ {Acc orelse (CurName =:= File), NextSeqId1}
end, {false, PubAcc}, PubList),
- {ok, State3} = remove_messages(Q, AckSeqIds, txn, State),
- {Sync2, WriteSeqId3, State3}
+ {ok, State2} = remove_messages(Q, AckSeqIds, txn, State),
+ {Sync1, WriteSeqId1, State2}
end),
true = case PubList of
[] -> true;
_ -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId,
Length + erlang:length(PubList)})
end,
- IsDirty2 = if IsDirty andalso Sync ->
+ IsDirty1 = if IsDirty andalso Sync ->
ok = file:sync(CurHdl),
false;
true -> IsDirty
end,
- {ok, State2 #dqstate { current_dirty = IsDirty2 }}.
+ {ok, State1 #dqstate { current_dirty = IsDirty1 }}.
%% SeqId can be 'next'
internal_publish(Q, MsgId, SeqId, MsgBody, IsDelivered, State) ->
@@ -971,44 +969,44 @@ internal_requeue(Q, MsgSeqIds = [{_, {FirstSeqIdTo, _}}|_],
%% as they have no concept of sequence id anyway).
{ReadSeqId, WriteSeqId, Length} = sequence_lookup(Sequences, Q),
- ReadSeqId2 = determine_next_read_id(ReadSeqId, WriteSeqId, FirstSeqIdTo),
+ ReadSeqId1 = determine_next_read_id(ReadSeqId, WriteSeqId, FirstSeqIdTo),
MsgSeqIdsZipped = zip_with_tail(MsgSeqIds, {last, {next, {next, true}}}),
- {atomic, {WriteSeqId2, Q}} =
+ {atomic, {WriteSeqId1, Q}} =
mnesia:transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
lists:foldl(fun requeue_message/2, {WriteSeqId, Q},
MsgSeqIdsZipped)
end),
- true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId2,
+ true = ets:insert(Sequences, {Q, ReadSeqId1, WriteSeqId1,
Length + erlang:length(MsgSeqIds)}),
{ok, State}.
requeue_message({{{MsgId, SeqIdOrig}, {SeqIdTo, NewIsDelivered}},
{_NextMsgSeqId, {NextSeqIdTo, _NextNewIsDelivered}}},
{ExpectedSeqIdTo, Q}) ->
- SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write),
- NextSeqIdTo2 = find_next_seq_id(SeqIdTo2, NextSeqIdTo),
+ SeqIdTo1 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write),
+ NextSeqIdTo1 = find_next_seq_id(SeqIdTo1, NextSeqIdTo),
[Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId,
next_seq_id = NextSeqIdOrig }] =
mnesia:read(rabbit_disk_queue, {Q, SeqIdOrig}, write),
- if SeqIdTo2 == SeqIdOrig andalso NextSeqIdTo2 == NextSeqIdOrig -> ok;
+ if SeqIdTo1 == SeqIdOrig andalso NextSeqIdTo1 == NextSeqIdOrig -> ok;
true ->
ok = mnesia:write(rabbit_disk_queue,
- Obj #dq_msg_loc {queue_and_seq_id = {Q, SeqIdTo2},
- next_seq_id = NextSeqIdTo2,
+ Obj #dq_msg_loc {queue_and_seq_id = {Q, SeqIdTo1},
+ next_seq_id = NextSeqIdTo1,
is_delivered = NewIsDelivered
},
write),
ok = mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write)
end,
- {NextSeqIdTo2, Q}.
+ {NextSeqIdTo1, Q}.
internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
case ets:lookup(Sequences, Q) of
[] -> {ok, 0, State};
[{Q, ReadSeqId, WriteSeqId, _Length}] ->
- {atomic, {ok, State2}} =
+ {atomic, {ok, State1}} =
mnesia:transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
@@ -1025,7 +1023,7 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
remove_messages(Q, MsgSeqIds, txn, State)
end),
true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId, 0}),
- {ok, WriteSeqId - ReadSeqId, State2}
+ {ok, WriteSeqId - ReadSeqId, State1}
end.
internal_delete_queue(Q, State) ->
@@ -1279,7 +1277,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
Destination, State) ->
- {FinalOffset, BlockStart2, BlockEnd2} =
+ {FinalOffset, BlockStart1, BlockEnd1} =
lists:foldl(
fun ({MsgId, RefCount, _Source, Offset, TotalSize},
{CurOffset, BlockStart, BlockEnd}) ->
@@ -1309,9 +1307,9 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
end
end, {InitOffset, undefined, undefined}, WorkList),
%% do the last remaining block
- BSize2 = BlockEnd2 - BlockStart2,
- {ok, BlockStart2} = file:position(SourceHdl, {bof, BlockStart2}),
- {ok, BSize2} = file:copy(SourceHdl, DestinationHdl, BSize2),
+ BSize1 = BlockEnd1 - BlockStart1,
+ {ok, BlockStart1} = file:position(SourceHdl, {bof, BlockStart1}),
+ {ok, BSize1} = file:copy(SourceHdl, DestinationHdl, BSize1),
ok.
close_file(File, State = #dqstate { read_file_handles =
@@ -1366,7 +1364,7 @@ del_index() ->
%% hmm, something weird must be going on, but it's probably
%% not the end of the world
{aborted, {no_exists, rabbit_disk_queue,_}} -> ok;
- E2 -> E2
+ E1 -> E1
end.
load_from_disk(State) ->
@@ -1449,11 +1447,11 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) ->
lists:foreach(
fun ({Q, ReadSeqId, WriteSeqId, _Length}) ->
Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0),
- ReadSeqId2 = ReadSeqId + Gap,
- Length = WriteSeqId - ReadSeqId2,
+ ReadSeqId1 = ReadSeqId + Gap,
+ Length = WriteSeqId - ReadSeqId1,
true =
ets:insert(Sequences,
- {Q, ReadSeqId2, WriteSeqId, Length})
+ {Q, ReadSeqId1, WriteSeqId, Length})
end, ets:match_object(Sequences, '_'))
end).
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index a2e01bda..6caea55d 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -133,14 +133,14 @@ deliver_all_messages(Q, IsDurable, Acks, Requeue, Length) ->
#basic_message { guid = MsgId, is_persistent = IsPersistent } =
bin_to_msg(MsgBin),
OnDisk = IsPersistent andalso IsDurable,
- {Acks2, Requeue2, Length2} =
+ {Acks1, Requeue1, Length1} =
if OnDisk -> {Acks,
[{AckTag, {next, IsDelivered}} | Requeue],
Length + 1
};
true -> {[AckTag | Acks], Requeue, Length}
end,
- deliver_all_messages(Q, IsDurable, Acks2, Requeue2, Length2)
+ deliver_all_messages(Q, IsDurable, Acks1, Requeue1, Length1)
end.
msg_to_bin(Msg = #basic_message { content = Content }) ->
@@ -196,25 +196,25 @@ deliver(State = #mqstate { mode = disk, queue = Q, is_durable = IsDurable,
= rabbit_disk_queue:deliver(Q),
#basic_message { guid = MsgId, is_persistent = IsPersistent } =
Msg = bin_to_msg(MsgBin),
- AckTag2 = if IsPersistent andalso IsDurable -> AckTag;
+ AckTag1 = if IsPersistent andalso IsDurable -> AckTag;
true -> ok = rabbit_disk_queue:ack(Q, [AckTag]),
noack
end,
- {{Msg, IsDelivered, AckTag2, Remaining},
+ {{Msg, IsDelivered, AckTag1, Remaining},
State #mqstate { length = Length - 1}};
deliver(State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable,
msg_buf = MsgBuf, length = Length }) ->
{{value, {Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
- IsDelivered, OnDisk}}, MsgBuf2}
+ IsDelivered, OnDisk}}, MsgBuf1}
= queue:out(MsgBuf),
AckTag =
if OnDisk ->
if IsPersistent andalso IsDurable ->
- {MsgId, IsDelivered, AckTag2, _PersistRem} =
+ {MsgId, IsDelivered, AckTag1, _PersistRem} =
rabbit_disk_queue:phantom_deliver(Q),
- AckTag2;
+ AckTag1;
true ->
ok = rabbit_disk_queue:auto_ack_next_message(Q),
noack
@@ -223,7 +223,7 @@ deliver(State = #mqstate { mode = mixed, queue = Q, is_durable = IsDurable,
end,
Rem = Length - 1,
{{Msg, IsDelivered, AckTag, Rem},
- State #mqstate { msg_buf = MsgBuf2, length = Rem }}.
+ State #mqstate { msg_buf = MsgBuf1, length = Rem }}.
remove_noacks(Acks) ->
lists:filter(fun (A) -> A /= noack end, Acks).
@@ -264,17 +264,16 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
is_durable = IsDurable,
length = Length
}) ->
- {PersistentPubs, MsgBuf2} =
+ {PersistentPubs, MsgBuf1} =
lists:foldl(fun (Msg = #basic_message { is_persistent = IsPersistent },
- {Acc, MsgBuf3}) ->
+ {Acc, MsgBuf2}) ->
OnDisk = IsPersistent andalso IsDurable,
- Acc2 =
+ Acc1 =
if OnDisk ->
[Msg #basic_message.guid | Acc];
true -> Acc
end,
- MsgBuf4 = queue:in({Msg, false, OnDisk}, MsgBuf3),
- {Acc2, MsgBuf4}
+ {Acc1, queue:in({Msg, false, OnDisk}, MsgBuf2)}
end, {[], MsgBuf}, Publishes),
%% foldl reverses, so re-reverse PersistentPubs to match
%% requirements of rabbit_disk_queue (ascending SeqIds)
@@ -284,7 +283,7 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
rabbit_disk_queue:tx_commit(
Q, lists:reverse(PersistentPubs), RealAcks)
end,
- {ok, State #mqstate { msg_buf = MsgBuf2,
+ {ok, State #mqstate { msg_buf = MsgBuf1,
length = Length + erlang:length(Publishes) }}.
only_persistent_msg_ids(Pubs) ->
@@ -325,7 +324,7 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q,
true -> rabbit_disk_queue:requeue(
Q, lists:reverse(RQ))
end,
- _AckTag2 = rabbit_disk_queue:publish(
+ _AckTag1 = rabbit_disk_queue:publish(
Q, MsgId, msg_to_bin(Msg), true),
[]
end, [], MessagesWithAckTags),
@@ -336,22 +335,21 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
is_durable = IsDurable,
length = Length
}) ->
- {PersistentPubs, MsgBuf2} =
+ {PersistentPubs, MsgBuf1} =
lists:foldl(
fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag},
- {Acc, MsgBuf3}) ->
+ {Acc, MsgBuf2}) ->
OnDisk = IsDurable andalso IsPersistent,
- Acc2 =
+ Acc1 =
if OnDisk -> [AckTag | Acc];
true -> Acc
end,
- MsgBuf4 = queue:in({Msg, true, OnDisk}, MsgBuf3),
- {Acc2, MsgBuf4}
+ {Acc1, queue:in({Msg, true, OnDisk}, MsgBuf2)}
end, {[], MsgBuf}, MessagesWithAckTags),
ok = if [] == PersistentPubs -> ok;
true -> rabbit_disk_queue:requeue(Q, lists:reverse(PersistentPubs))
end,
- {ok, State #mqstate {msg_buf = MsgBuf2,
+ {ok, State #mqstate {msg_buf = MsgBuf1,
length = Length + erlang:length(MessagesWithAckTags)}}.
purge(State = #mqstate { queue = Q, mode = disk, length = Count }) ->