summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-07-29 17:16:16 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-07-29 17:16:16 +0100
commit5a43c2153aed157eeeed5abd7405b0d017815b8a (patch)
tree741763ae4389baae08564cdd7f3026da7e401fcf
parentb35fadf7a75c0f92aea7f12fdd9535ac3d1fbb5e (diff)
downloadrabbitmq-server-5a43c2153aed157eeeed5abd7405b0d017815b8a.tar.gz
Rethink how we do this. Previously we tried to manage bytes exactly the same as len, and ram_msg_bytes the same as ram_msg_count. The trouble is that means they only account for ready messages. It would be a bit of overkill to start tracking bytes used for unacked messages separately, so instead we now only track bytes for all messages.
-rw-r--r--src/rabbit_variable_queue.erl250
1 files changed, 128 insertions, 122 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 37bb4ff8..3f41f2cc 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -254,16 +254,16 @@
durable,
transient_threshold,
- len,
- bytes,
- persistent_count,
- persistent_bytes,
+ len, %% w/o unacked
+ bytes, %% w unacked
+ persistent_count, %% w unacked
+ persistent_bytes, %% w unacked
target_ram_count,
- ram_msg_count,
+ ram_msg_count, %% w/o unacked
ram_msg_count_prev,
ram_ack_count_prev,
- ram_msg_bytes,
+ ram_bytes, %% w unacked
out_counter,
in_counter,
rates,
@@ -506,45 +506,45 @@ purge(State = #vqstate { q4 = Q4,
index_state = IndexState,
msg_store_clients = MSCState,
len = Len,
+ bytes = Bytes,
+ ram_bytes = RamBytes,
persistent_count = PCount,
persistent_bytes = PBytes }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
- {PCount1, PBytes1, IndexState1} =
- remove_queue_entries(Q4, PCount, PBytes, IndexState, MSCState),
+ Stats = {Bytes, RamBytes, PCount, PBytes},
+ {Stats1, IndexState1} =
+ remove_queue_entries(Q4, Stats, IndexState, MSCState),
- {PCount2, PBytes2, State1 = #vqstate { q1 = Q1,
- index_state = IndexState2,
- msg_store_clients = MSCState1 }} =
+ {Stats2, State1 = #vqstate { q1 = Q1,
+ index_state = IndexState2,
+ msg_store_clients = MSCState1 }} =
purge_betas_and_deltas(
- PCount1, PBytes1,State #vqstate { q4 = ?QUEUE:new(),
- index_state = IndexState1 }),
+ Stats1, State #vqstate { q4 = ?QUEUE:new(),
+ index_state = IndexState1 }),
- {PCount3, PBytes3, IndexState3} =
- remove_queue_entries(Q1, PCount2, PBytes2, IndexState2, MSCState1),
+ {{Bytes3, RamBytes3, PCount3, PBytes3}, IndexState3} =
+ remove_queue_entries(Q1, Stats2, IndexState2, MSCState1),
{Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
index_state = IndexState3,
len = 0,
- bytes = 0,
+ bytes = Bytes3,
ram_msg_count = 0,
- ram_msg_bytes = 0,
+ ram_bytes = RamBytes3,
persistent_count = PCount3,
persistent_bytes = PBytes3 })}.
purge_acks(State) -> a(purge_pending_ack(false, State)).
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
- MsgProps = #message_properties { needs_confirming = NeedsConfirming,
- size = Size},
+ MsgProps = #message_properties { needs_confirming = NeedsConfirming },
IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
next_seq_id = SeqId,
len = Len,
- bytes = Bytes,
in_counter = InCount,
persistent_count = PCount,
- persistent_bytes = PBytes,
durable = IsDurable,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
@@ -556,28 +556,24 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
end,
InCount1 = InCount + 1,
PCount1 = PCount + one_if(IsPersistent1),
- PBytes1 = PBytes + Size * one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = upd_ram_counts(1, MsgStatus,
- State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- bytes = Bytes + Size,
- in_counter = InCount1,
- persistent_count = PCount1,
- persistent_bytes = PBytes1,
- unconfirmed = UC1 }),
+ State3 = upd_bytes(
+ 1, MsgStatus1,
+ inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 })),
a(reduce_memory_use(maybe_update_rates(State3))).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
MsgProps = #message_properties {
- needs_confirming = NeedsConfirming,
- size = Size },
+ needs_confirming = NeedsConfirming },
_ChPid, State = #vqstate { next_seq_id = SeqId,
out_counter = OutCount,
in_counter = InCount,
persistent_count = PCount,
- persistent_bytes = PBytes,
durable = IsDurable,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
@@ -585,14 +581,13 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
- PBytes1 = PBytes + Size * one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = State2 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- persistent_bytes = PBytes1,
- unconfirmed = UC1 },
+ State3 = upd_bytes(1, MsgStatus,
+ State2 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 }),
{SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}.
discard(_MsgId, _ChPid, State) -> State.
@@ -846,9 +841,9 @@ info(messages_ram, State) ->
info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
PersistentCount;
info(message_bytes, #vqstate{bytes = Bytes}) ->
- Bytes; %% TODO this is really message_bytes_ready
-info(message_bytes_ram, #vqstate{ram_msg_bytes = RamMsgBytes}) ->
- RamMsgBytes; %% TODO this is really message_bytes_ready_ram!
+ Bytes;
+info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
+ RamBytes;
info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
PersistentBytes;
info(backing_queue_status, #vqstate {
@@ -890,7 +885,7 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
bytes = Bytes,
persistent_count = PersistentCount,
ram_msg_count = RamMsgCount,
- ram_msg_bytes = RamMsgBytes}) ->
+ ram_bytes = RamBytes}) ->
E1 = ?QUEUE:is_empty(Q1),
E2 = ?QUEUE:is_empty(Q2),
ED = Delta#delta.count == 0,
@@ -908,8 +903,8 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
true = PersistentCount >= 0,
true = RamMsgCount >= 0,
true = RamMsgCount =< Len,
- true = RamMsgBytes >= 0,
- true = RamMsgBytes =< Bytes,
+ true = RamBytes >= 0,
+ true = RamBytes =< Bytes,
State.
@@ -1099,7 +1094,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
ram_msg_count = 0,
ram_msg_count_prev = 0,
ram_ack_count_prev = 0,
- ram_msg_bytes = 0,
+ ram_bytes = 0,
out_counter = 0,
in_counter = 0,
rates = blank_rates(Now),
@@ -1124,10 +1119,11 @@ in_r(MsgStatus = #msg_status { msg = undefined },
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
- upd_ram_counts(
+ upd_ram_bytes(
1, MsgStatus,
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
- msg = Msg }, Q4a) })
+ inc_ram_msg_count(
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
+ msg = Msg }, Q4a) }))
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1155,15 +1151,25 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
msg_store_read(MSCState, IsPersistent, MsgId),
{Msg, State #vqstate {msg_store_clients = MSCState1}}.
-upd_ram_counts(Sign, #msg_status{msg_props = #message_properties{size = Size}},
- State = #vqstate{ram_msg_count = RamMsgCount,
- ram_msg_bytes = RamMsgBytes}) ->
- State#vqstate{ram_msg_count = RamMsgCount + Sign,
- ram_msg_bytes = RamMsgBytes + Sign * Size}.
+inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
+ State#vqstate{ram_msg_count = RamMsgCount + 1}.
+
+upd_bytes(Sign, MsgStatus = #msg_status{msg = undefined}, State) ->
+ upd_bytes0(Sign, MsgStatus, State);
+upd_bytes(Sign, MsgStatus = #msg_status{msg = _}, State) ->
+ upd_ram_bytes(Sign, MsgStatus, upd_bytes0(Sign, MsgStatus, State)).
+
+upd_bytes0(Sign, MsgStatus = #msg_status{is_persistent = IsPersistent},
+ State = #vqstate{bytes = Bytes,
+ persistent_bytes = PBytes}) ->
+ Diff = Sign * msg_size(MsgStatus),
+ State#vqstate{bytes = Bytes + Diff,
+ persistent_bytes = PBytes + one_if(IsPersistent) * Diff}.
-inc_bytes(#msg_status{msg_props = #message_properties{size = Size}},
- State = #vqstate{bytes = Bytes}) ->
- State#vqstate{bytes = Bytes + Size}.
+upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) ->
+ State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}.
+
+msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size.
remove(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,
@@ -1172,17 +1178,13 @@ remove(AckRequired, MsgStatus = #msg_status {
is_persistent = IsPersistent,
is_delivered = IsDelivered,
msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk,
- msg_props = #message_properties{ size = Size }},
+ index_on_disk = IndexOnDisk },
State = #vqstate {ram_msg_count = RamMsgCount,
- ram_msg_bytes = RamMsgBytes,
out_counter = OutCount,
index_state = IndexState,
msg_store_clients = MSCState,
len = Len,
- bytes = Bytes,
- persistent_count = PCount,
- persistent_bytes = PBytes}) ->
+ persistent_count = PCount}) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
@@ -1208,64 +1210,65 @@ remove(AckRequired, MsgStatus = #msg_status {
false -> {undefined, State}
end,
- MaybeOne = one_if(IsPersistent andalso not AckRequired),
- PCount1 = PCount - MaybeOne,
- PBytes1 = PBytes - Size * MaybeOne,
+ PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
- RamMsgBytes1 = RamMsgBytes - Size * one_if(Msg =/= undefined),
-
+ State2 = case AckRequired of
+ false -> upd_bytes(-1, MsgStatus, State1);
+ true -> State1
+ end,
{AckTag, maybe_update_rates(
- State1 #vqstate {ram_msg_count = RamMsgCount1,
- ram_msg_bytes = RamMsgBytes1,
+ State2 #vqstate {ram_msg_count = RamMsgCount1,
out_counter = OutCount + 1,
index_state = IndexState2,
len = Len - 1,
- bytes = Bytes - Size,
- persistent_count = PCount1,
- persistent_bytes = PBytes1})}.
+ persistent_count = PCount1})}.
-purge_betas_and_deltas(PCount, PBytes,
+purge_betas_and_deltas(Stats,
State = #vqstate { q3 = Q3,
index_state = IndexState,
msg_store_clients = MSCState }) ->
case ?QUEUE:is_empty(Q3) of
- true -> {PCount, PBytes, State};
- false -> {PCount1, PBytes1, IndexState1} =
- remove_queue_entries(
- Q3, PCount, PBytes, IndexState, MSCState),
- purge_betas_and_deltas(PCount1, PBytes1,
+ true -> {Stats, State};
+ false -> {Stats1, IndexState1} = remove_queue_entries(
+ Q3, Stats, IndexState, MSCState),
+ purge_betas_and_deltas(Stats1,
maybe_deltas_to_betas(
State #vqstate {
q3 = ?QUEUE:new(),
index_state = IndexState1 }))
end.
-remove_queue_entries(Q, PCount, PBytes, IndexState, MSCState) ->
- {MsgIdsByStore, PBytes1, Delivers, Acks} =
+remove_queue_entries(Q, {Bytes, RamBytes, PCount, PBytes},
+ IndexState, MSCState) ->
+ {MsgIdsByStore, Bytes1, RamBytes1, PBytes1, Delivers, Acks} =
?QUEUE:foldl(fun remove_queue_entries1/2,
- {orddict:new(), PBytes, [], []}, Q),
+ {orddict:new(), Bytes, RamBytes, PBytes, [], []}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
- {PCount - case orddict:find(true, MsgIdsByStore) of
- error -> 0;
- {ok, Ids} -> length(Ids)
- end,
- PBytes1,
+ {{Bytes1,
+ RamBytes1,
+ PCount - case orddict:find(true, MsgIdsByStore) of
+ error -> 0;
+ {ok, Ids} -> length(Ids)
+ end,
+ PBytes1},
rabbit_queue_index:ack(Acks,
rabbit_queue_index:deliver(Delivers, IndexState))}.
remove_queue_entries1(
- #msg_status { msg_id = MsgId, seq_id = SeqId,
+ #msg_status { msg_id = MsgId, seq_id = SeqId, msg = Msg,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent,
msg_props = #message_properties { size = Size } },
- {MsgIdsByStore, PBytes, Delivers, Acks}) ->
+ {MsgIdsByStore, Bytes, RamBytes, PBytes, Delivers, Acks}) ->
{case MsgOnDisk of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
- PBytes - Size * one_if(IsPersistent),
+ Bytes - Size,
+ RamBytes - Size * one_if(Msg =/= undefined),
+ PBytes - Size * one_if(IsPersistent),
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
cons_if(IndexOnDisk, SeqId, Acks)}.
@@ -1341,16 +1344,11 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
end.
remove_pending_ack(SeqId, State) ->
- {MsgStatus, State1 = #vqstate { persistent_count = PCount,
- persistent_bytes = PBytes }} =
+ {MsgStatus, State1 = #vqstate { persistent_count = PCount }} =
remove_pending_ack0(SeqId, State),
- #msg_status { msg_props = #message_properties{ size = Size },
- is_persistent = IsPersistent } = MsgStatus,
- case IsPersistent of
- false -> {MsgStatus, State1};
- true -> {MsgStatus, State1#vqstate{ persistent_count = PCount - 1,
- persistent_bytes = PBytes - Size }}
- end.
+ PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent),
+ {MsgStatus, upd_bytes(-1, MsgStatus,
+ State1 # vqstate{ persistent_count = PCount1 })}.
remove_pending_ack0(SeqId, State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA }) ->
@@ -1450,9 +1448,13 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
{Msg, State1} = read_msg(MsgStatus, State),
- {MsgStatus#msg_status { msg = Msg }, upd_ram_counts(1, MsgStatus, State1)};
+ {MsgStatus#msg_status { msg = Msg },
+ upd_ram_bytes(1, MsgStatus, inc_ram_msg_count(State1))}; %% [1]
publish_alpha(MsgStatus, State) ->
- {MsgStatus, upd_ram_counts(1, MsgStatus, State)}.
+ {MsgStatus, inc_ram_msg_count(State)}.
+%% [1] We increase the ram_bytes here because we paged the message in
+%% to requeue it, not purely because we requeued it. Hence in the
+%% second head it's already accounted for as already in memory.
publish_beta(MsgStatus, State) ->
{MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
@@ -1478,7 +1480,7 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
{#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
PubFun(MsgStatus, State1),
queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
- Limit, PubFun, inc_bytes(MsgStatus1, State2))
+ Limit, PubFun, State2)
end;
queue_merge(SeqIds, Q, Front, MsgIds,
_Limit, _PubFun, State) ->
@@ -1492,8 +1494,7 @@ delta_merge(SeqIds, Delta, MsgIds, State) ->
msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
maybe_write_to_disk(true, true, MsgStatus, State1),
- {expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
- inc_bytes(MsgStatus, State2)}
+ {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
@@ -1651,8 +1652,10 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
maybe_write_to_disk(true, false, MsgStatus, State),
DPA1 = gb_trees:insert(SeqId, m(trim_msg_status(MsgStatus1)), DPA),
limit_ram_acks(Quota - 1,
- State1 #vqstate { ram_pending_ack = RPA1,
- disk_pending_ack = DPA1 })
+ upd_ram_bytes(
+ -1, MsgStatus1,
+ State1 #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1 }))
end.
permitted_beta_count(#vqstate { len = 0 }) ->
@@ -1779,21 +1782,24 @@ push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
{Quota, State};
push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
case credit_flow:blocked() of
- true ->
- {Quota, State};
- false ->
- case Generator(Q) of
- {empty, _Q} ->
- {Quota, State};
- {{value, MsgStatus}, Qa} ->
- {MsgStatus1 = #msg_status { msg_on_disk = true }, State1} =
- maybe_write_to_disk(true, false, MsgStatus, State),
- MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = Consumer(MsgStatus2, Qa,
- upd_ram_counts(-1, MsgStatus2, State1)),
- push_alphas_to_betas(Generator, Consumer, Quota - 1,
- Qa, State2)
- end
+ true -> {Quota, State};
+ false -> case Generator(Q) of
+ {empty, _Q} ->
+ {Quota, State};
+ {{value, MsgStatus}, Qa} ->
+ {MsgStatus1 = #msg_status { msg_on_disk = true },
+ State1 = #vqstate { ram_msg_count = RamMsgCount }} =
+ maybe_write_to_disk(true, false, MsgStatus, State),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ State2 = Consumer(
+ MsgStatus2, Qa,
+ upd_ram_bytes(
+ -1, MsgStatus2,
+ State1 #vqstate {
+ ram_msg_count = RamMsgCount - 1})),
+ push_alphas_to_betas(Generator, Consumer, Quota - 1,
+ Qa, State2)
+ end
end.
push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,