summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-12 14:26:43 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-12 14:26:43 +0100
commit1f72b4e9e82d3a1ecf053ae9e7274892d3989b83 (patch)
treec55294698a28c9e88b2e24223f797c2e8e9efe20
parent959a65ae92e47ba2fae416278fefb1cec330c61b (diff)
downloadrabbitmq-server-1f72b4e9e82d3a1ecf053ae9e7274892d3989b83.tar.gz
Track ready and unacked bytes separately.
-rw-r--r--src/rabbit_backing_queue.erl5
-rw-r--r--src/rabbit_variable_queue.erl72
2 files changed, 46 insertions, 31 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 595a05d3..098f5f43 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -20,8 +20,9 @@
-define(INFO_KEYS, [messages_ram, messages_ready_ram,
messages_unacknowledged_ram, messages_persistent,
- message_bytes, message_bytes_ram, message_bytes_persistent,
- backing_queue_status]).
+ message_bytes, message_bytes_ready,
+ message_bytes_unacknowledged, message_bytes_ram,
+ message_bytes_persistent, backing_queue_status]).
-ifdef(use_specs).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c52862ab..e97ed491 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -255,7 +255,8 @@
transient_threshold,
len, %% w/o unacked
- bytes, %% w unacked
+ bytes, %% w/o unacked
+ unacked_bytes,
persistent_count, %% w unacked
persistent_bytes, %% w unacked
@@ -347,6 +348,8 @@
len :: non_neg_integer(),
bytes :: non_neg_integer(),
+ unacked_bytes :: non_neg_integer(),
+
persistent_count :: non_neg_integer(),
persistent_bytes :: non_neg_integer(),
@@ -508,14 +511,13 @@ purge(State = #vqstate { q4 = Q4,
index_state = IndexState,
msg_store_clients = MSCState,
len = Len,
- bytes = Bytes,
ram_bytes = RamBytes,
persistent_count = PCount,
persistent_bytes = PBytes }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
- Stats = {Bytes, RamBytes, PCount, PBytes},
+ Stats = {RamBytes, PCount, PBytes},
{Stats1, IndexState1} =
remove_queue_entries(Q4, Stats, IndexState, MSCState),
@@ -527,13 +529,13 @@ purge(State = #vqstate { q4 = Q4,
Stats1, State #vqstate { q4 = ?QUEUE:new(),
index_state = IndexState1 }),
- {{Bytes3, RamBytes3, PCount3, PBytes3}, IndexState3} =
+ {{RamBytes3, PCount3, PBytes3}, IndexState3} =
remove_queue_entries(Q1, Stats2, IndexState2, MSCState1),
{Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
index_state = IndexState3,
len = 0,
- bytes = Bytes3,
+ bytes = 0,
ram_msg_count = 0,
ram_bytes = RamBytes3,
persistent_count = PCount3,
@@ -561,7 +563,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
State3 = upd_bytes(
- 1, MsgStatus1,
+ 1, 0, MsgStatus1,
inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
len = Len + 1,
in_counter = InCount1,
@@ -585,7 +587,7 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = upd_bytes(1, MsgStatus,
+ State3 = upd_bytes(0, 1, MsgStatus,
State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1,
@@ -843,8 +845,13 @@ info(messages_ram, State) ->
info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State);
info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
PersistentCount;
-info(message_bytes, #vqstate{bytes = Bytes}) ->
+info(message_bytes, #vqstate{bytes = Bytes,
+ unacked_bytes = UBytes}) ->
+ Bytes + UBytes;
+info(message_bytes_ready, #vqstate{bytes = Bytes}) ->
Bytes;
+info(message_bytes_unacknowledged, #vqstate{unacked_bytes = UBytes}) ->
+ UBytes;
info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
RamBytes;
info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
@@ -886,6 +893,7 @@ is_duplicate(_Msg, State) -> {false, State}.
a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
bytes = Bytes,
+ unacked_bytes = UnackedBytes,
persistent_count = PersistentCount,
persistent_bytes = PersistentBytes,
ram_msg_count = RamMsgCount,
@@ -904,12 +912,13 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
true = Len >= 0,
true = Bytes >= 0,
+ true = UnackedBytes >= 0,
true = PersistentCount >= 0,
true = PersistentBytes >= 0,
true = RamMsgCount >= 0,
true = RamMsgCount =< Len,
true = RamBytes >= 0,
- true = RamBytes =< Bytes,
+ true = RamBytes =< Bytes + UnackedBytes,
State.
@@ -1100,6 +1109,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
ram_msg_count_prev = 0,
ram_ack_count_prev = 0,
ram_bytes = 0,
+ unacked_bytes = 0,
out_counter = 0,
in_counter = 0,
rates = blank_rates(Now),
@@ -1159,17 +1169,22 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
State#vqstate{ram_msg_count = RamMsgCount + 1}.
-upd_bytes(Sign, MsgStatus = #msg_status{msg = undefined}, State) ->
- upd_bytes0(Sign, MsgStatus, State);
-upd_bytes(Sign, MsgStatus = #msg_status{msg = _}, State) ->
- upd_ram_bytes(Sign, MsgStatus, upd_bytes0(Sign, MsgStatus, State)).
+upd_bytes(SignReady, SignUnacked,
+ MsgStatus = #msg_status{msg = undefined}, State) ->
+ upd_bytes0(SignReady, SignUnacked, MsgStatus, State);
+upd_bytes(SignReady, SignUnacked, MsgStatus = #msg_status{msg = _}, State) ->
+ upd_ram_bytes(SignReady + SignUnacked, MsgStatus,
+ upd_bytes0(SignReady, SignUnacked, MsgStatus, State)).
-upd_bytes0(Sign, MsgStatus = #msg_status{is_persistent = IsPersistent},
+upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP},
State = #vqstate{bytes = Bytes,
+ unacked_bytes = UBytes,
persistent_bytes = PBytes}) ->
- Diff = Sign * msg_size(MsgStatus),
- State#vqstate{bytes = Bytes + Diff,
- persistent_bytes = PBytes + one_if(IsPersistent) * Diff}.
+ S = msg_size(MsgStatus),
+ SignTotal = SignReady + SignUnacked,
+ State#vqstate{bytes = Bytes + SignReady * S,
+ unacked_bytes = UBytes + SignUnacked * S,
+ persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}.
upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) ->
State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}.
@@ -1218,8 +1233,8 @@ remove(AckRequired, MsgStatus = #msg_status {
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
State2 = case AckRequired of
- false -> upd_bytes(-1, MsgStatus, State1);
- true -> State1
+ false -> upd_bytes(-1, 0, MsgStatus, State1);
+ true -> upd_bytes(-1, 1, MsgStatus, State1)
end,
{AckTag, maybe_update_rates(
State2 #vqstate {ram_msg_count = RamMsgCount1,
@@ -1243,16 +1258,15 @@ purge_betas_and_deltas(Stats,
index_state = IndexState1 }))
end.
-remove_queue_entries(Q, {Bytes, RamBytes, PCount, PBytes},
+remove_queue_entries(Q, {RamBytes, PCount, PBytes},
IndexState, MSCState) ->
- {MsgIdsByStore, Bytes1, RamBytes1, PBytes1, Delivers, Acks} =
+ {MsgIdsByStore, RamBytes1, PBytes1, Delivers, Acks} =
?QUEUE:foldl(fun remove_queue_entries1/2,
- {orddict:new(), Bytes, RamBytes, PBytes, [], []}, Q),
+ {orddict:new(), RamBytes, PBytes, [], []}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
- {{Bytes1,
- RamBytes1,
+ {{RamBytes1,
PCount - case orddict:find(true, MsgIdsByStore) of
error -> 0;
{ok, Ids} -> length(Ids)
@@ -1266,12 +1280,11 @@ remove_queue_entries1(
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent,
msg_props = #message_properties { size = Size } },
- {MsgIdsByStore, Bytes, RamBytes, PBytes, Delivers, Acks}) ->
+ {MsgIdsByStore, RamBytes, PBytes, Delivers, Acks}) ->
{case MsgOnDisk of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
- Bytes - Size,
RamBytes - Size * one_if(Msg =/= undefined),
PBytes - Size * one_if(IsPersistent),
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
@@ -1353,7 +1366,7 @@ remove_pending_ack(true, SeqId, State) ->
{MsgStatus, State1 = #vqstate { persistent_count = PCount }} =
remove_pending_ack(false, SeqId, State),
PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent),
- {MsgStatus, upd_bytes(-1, MsgStatus,
+ {MsgStatus, upd_bytes(0, -1, MsgStatus,
State1 # vqstate{ persistent_count = PCount1 })};
remove_pending_ack(false, SeqId, State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA }) ->
@@ -1487,7 +1500,7 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
{#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
PubFun(MsgStatus, State1),
queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
- Limit, PubFun, State2)
+ Limit, PubFun, upd_bytes(1, -1, MsgStatus, State2))
end;
queue_merge(SeqIds, Q, Front, MsgIds,
_Limit, _PubFun, State) ->
@@ -1501,7 +1514,8 @@ delta_merge(SeqIds, Delta, MsgIds, State) ->
msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
maybe_write_to_disk(true, true, MsgStatus, State1),
- {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2}
+ {expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
+ upd_bytes(1, -1, MsgStatus, State2)}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2