summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-12 10:18:28 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-12 10:18:28 +0000
commit0c84e4f85fa92a384646af16ec9087696c65c139 (patch)
treea44f39ecf56a189fef6732da04a92a0026bd452b
parente2393df71b70a19895cd27cde9ead044930c83fa (diff)
downloadrabbitmq-server-0c84e4f85fa92a384646af16ec9087696c65c139.tar.gz
unmodalise vq:read_msg
-rw-r--r--src/rabbit_variable_queue.erl54
1 files changed, 27 insertions, 27 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 90ee3439..285dfcd7 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -527,7 +527,6 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
in_counter = InCount,
persistent_count = PCount,
durable = IsDurable,
- ram_msg_count = RamMsgCount,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps),
@@ -538,12 +537,12 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
end,
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- a(reduce_memory_use(State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- ram_msg_count = RamMsgCount + 1,
- unconfirmed = UC1 })).
+ a(reduce_memory_use(
+ inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 }))).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
@@ -596,7 +595,7 @@ fetchwhile(Pred, Fun, Acc, State) ->
{undefined, Acc, a(State1)};
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case Pred(MsgProps) of
- true -> {Msg, State2} = read_msg(MsgStatus, false, State1),
+ true -> {Msg, State2} = read_msg(MsgStatus, State1),
{AckTag, State3} = remove(true, MsgStatus, State2),
fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3);
false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))}
@@ -610,7 +609,7 @@ fetch(AckRequired, State) ->
{{value, MsgStatus}, State1} ->
%% it is possible that the message wasn't read from disk
%% at this point, so read it in.
- {Msg, State2} = read_msg(MsgStatus, false, State1),
+ {Msg, State2} = read_msg(MsgStatus, State1),
{AckTag, State3} = remove(AckRequired, MsgStatus, State2),
{{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
end.
@@ -672,7 +671,7 @@ ackfold(MsgFun, Acc, State, AckTags) ->
{AccN, StateN} =
lists:foldl(fun(SeqId, {Acc0, State0}) ->
MsgStatus = lookup_pending_ack(SeqId, State0),
- {Msg, State1} = read_msg(MsgStatus, false, State0),
+ {Msg, State1} = read_msg(MsgStatus, State0),
{MsgFun(Msg, SeqId, Acc0), State1}
end, {Acc, State}, AckTags),
{AccN, a(StateN)}.
@@ -684,7 +683,7 @@ fold(Fun, Acc, #vqstate { q1 = Q1,
q3 = Q3,
q4 = Q4 } = State) ->
QFun = fun(MsgStatus, {Acc0, State0}) ->
- {Msg, State1} = read_msg(MsgStatus, false, State0),
+ {Msg, State1} = read_msg(MsgStatus, State0),
{StopGo, AccNext} =
Fun(Msg, MsgStatus#msg_status.msg_props, Acc0),
{StopGo, {AccNext, State1}}
@@ -1078,9 +1077,10 @@ in_r(MsgStatus = #msg_status { msg = undefined },
case ?QUEUE:is_empty(Q4) of
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
- read_msg(MsgStatus, true, State),
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
- msg = Msg }, Q4a) }
+ read_msg(MsgStatus, State),
+ inc_ram_msg_count(
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
+ msg = Msg }, Q4a) })
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1096,19 +1096,19 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
{{value, MsgStatus}, State #vqstate { q4 = Q4a }}
end.
-read_msg(#msg_status { msg = undefined,
- msg_id = MsgId,
- is_persistent = IsPersistent },
- CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount,
- msg_store_clients = MSCState}) ->
+read_msg(#msg_status{msg = undefined,
+ msg_id = MsgId,
+ is_persistent = IsPersistent},
+ State = #vqstate{msg_store_clients = MSCState}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
- RamMsgCount1 = RamMsgCount + one_if(CountDiskToRam),
- {Msg, State #vqstate { ram_msg_count = RamMsgCount1,
- msg_store_clients = MSCState1 }};
-read_msg(#msg_status { msg = Msg }, _CountDiskToRam, State) ->
+ {Msg, State #vqstate {msg_store_clients = MSCState1}};
+read_msg(#msg_status{msg = Msg}, State) ->
{Msg, State}.
+inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
+ State#vqstate{ram_msg_count = RamMsgCount + 1}.
+
remove(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,
msg_id = MsgId,
@@ -1390,10 +1390,10 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
- {Msg, State1} = read_msg(MsgStatus, true, State),
- {MsgStatus#msg_status { msg = Msg }, State1};
-publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
- {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.
+ {Msg, State1} = read_msg(MsgStatus, State),
+ {MsgStatus#msg_status { msg = Msg }, inc_ram_msg_count(State1)};
+publish_alpha(MsgStatus, State) ->
+ {MsgStatus, inc_ram_msg_count(State)}.
publish_beta(MsgStatus, State) ->
{#msg_status { msg = Msg} = MsgStatus1,