summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-02 16:58:55 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-02 16:58:55 +0000
commit87cc957f8b05a985dd3ee09f11e4d56ca684d126 (patch)
treec72de2ccebc7c501439cb8926f5ccd65b05c944f
parenta7de37ccf7ec5b75fab1f63bc0dc8feb186a86ba (diff)
downloadrabbitmq-server-bug25373.tar.gz
only retain ram msgs when inserting into pending_ackbug25373
which keeps memory use constant during fetch operations
-rw-r--r--src/rabbit_variable_queue.erl51
1 files changed, 24 insertions, 27 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9508b9c8..37ca6de0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -596,9 +596,8 @@ fetchwhile(Pred, Fun, Acc, State) ->
{undefined, Acc, a(State1)};
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case Pred(MsgProps) of
- true -> {MsgStatus1 = #msg_status { msg = Msg }, State2} =
- read_msg(MsgStatus, State1),
- {AckTag, State3} = remove(true, MsgStatus1, State2),
+ true -> {Msg, State2} = read_msg(MsgStatus, false, State1),
+ {AckTag, State3} = remove(true, MsgStatus, State2),
fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3);
false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))}
end
@@ -611,11 +610,9 @@ fetch(AckRequired, State) ->
{{value, MsgStatus}, State1} ->
%% it is possible that the message wasn't read from disk
%% at this point, so read it in.
- {MsgStatus1 = #msg_status { msg = Msg,
- is_delivered = IsDelivered }, State2} =
- read_msg(MsgStatus, State1),
- {AckTag, State3} = remove(AckRequired, MsgStatus1, State2),
- {{Msg, IsDelivered, AckTag}, a(State3)}
+ {Msg, State2} = read_msg(MsgStatus, false, State1),
+ {AckTag, State3} = remove(AckRequired, MsgStatus, State2),
+ {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
end.
drop(AckRequired, State) ->
@@ -675,8 +672,8 @@ ackfold(MsgFun, Acc, State, AckTags) ->
{AccN, StateN} =
lists:foldl(
fun(SeqId, {Acc0, State0 = #vqstate{ pending_ack = PA }}) ->
- {#msg_status { msg = Msg }, State1} =
- read_msg(gb_trees:get(SeqId, PA), false, State0),
+ MsgStatus = gb_trees:get(SeqId, PA),
+ {Msg, State1} = read_msg(MsgStatus, false, State0),
{MsgFun(Msg, SeqId, Acc0), State1}
end, {Acc, State}, AckTags),
{AccN, a(StateN)}.
@@ -688,9 +685,9 @@ fold(Fun, Acc, #vqstate { q1 = Q1,
q3 = Q3,
q4 = Q4 } = State) ->
QFun = fun(MsgStatus, {Acc0, State0}) ->
- {#msg_status { msg = Msg, msg_props = MsgProps }, State1 } =
- read_msg(MsgStatus, false, State0),
- {StopGo, AccNext} = Fun(Msg, MsgProps, Acc0),
+ {Msg, State1} = read_msg(MsgStatus, false, State0),
+ {StopGo, AccNext} =
+ Fun(Msg, MsgStatus#msg_status.msg_props, Acc0),
{StopGo, {AccNext, State1}}
end,
{Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4),
@@ -1075,9 +1072,10 @@ in_r(MsgStatus = #msg_status { msg = undefined },
State = #vqstate { q3 = Q3, q4 = Q4 }) ->
case ?QUEUE:is_empty(Q4) of
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
- false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
- read_msg(MsgStatus, State),
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }
+ false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
+ read_msg(MsgStatus, true, State),
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
+ msg = Msg }, Q4a) }
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1093,20 +1091,18 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
{{value, MsgStatus}, State #vqstate { q4 = Q4a }}
end.
-read_msg(MsgStatus, State) -> read_msg(MsgStatus, true, State).
-
-read_msg(MsgStatus = #msg_status { msg = undefined,
- msg_id = MsgId,
- is_persistent = IsPersistent },
+read_msg(#msg_status { msg = undefined,
+ msg_id = MsgId,
+ is_persistent = IsPersistent },
CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount,
msg_store_clients = MSCState}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
- {MsgStatus #msg_status { msg = Msg },
- State #vqstate { ram_msg_count = RamMsgCount + one_if(CountDiskToRam),
- msg_store_clients = MSCState1 }};
-read_msg(MsgStatus, _CountDiskToRam, State) ->
- {MsgStatus, State}.
+ RamMsgCount1 = RamMsgCount + one_if(CountDiskToRam),
+ {Msg, State #vqstate { ram_msg_count = RamMsgCount1,
+ msg_store_clients = MSCState1 }};
+read_msg(#msg_status { msg = Msg }, _CountDiskToRam, State) ->
+ {Msg, State}.
remove(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,
@@ -1375,7 +1371,8 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
- read_msg(MsgStatus, State);
+ {Msg, State1} = read_msg(MsgStatus, true, State),
+ {MsgStatus#msg_status { msg = Msg }, State1};
publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
{MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.