diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-12 14:26:43 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-12 14:26:43 +0100 |
commit | 1f72b4e9e82d3a1ecf053ae9e7274892d3989b83 (patch) | |
tree | c55294698a28c9e88b2e24223f797c2e8e9efe20 | |
parent | 959a65ae92e47ba2fae416278fefb1cec330c61b (diff) | |
download | rabbitmq-server-1f72b4e9e82d3a1ecf053ae9e7274892d3989b83.tar.gz |
Track ready and unacked bytes separately.
-rw-r--r-- | src/rabbit_backing_queue.erl | 5 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 72 |
2 files changed, 46 insertions, 31 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 595a05d3..098f5f43 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -20,8 +20,9 @@ -define(INFO_KEYS, [messages_ram, messages_ready_ram, messages_unacknowledged_ram, messages_persistent, - message_bytes, message_bytes_ram, message_bytes_persistent, - backing_queue_status]). + message_bytes, message_bytes_ready, + message_bytes_unacknowledged, message_bytes_ram, + message_bytes_persistent, backing_queue_status]). -ifdef(use_specs). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c52862ab..e97ed491 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -255,7 +255,8 @@ transient_threshold, len, %% w/o unacked - bytes, %% w unacked + bytes, %% w/o unacked + unacked_bytes, persistent_count, %% w unacked persistent_bytes, %% w unacked @@ -347,6 +348,8 @@ len :: non_neg_integer(), bytes :: non_neg_integer(), + unacked_bytes :: non_neg_integer(), + persistent_count :: non_neg_integer(), persistent_bytes :: non_neg_integer(), @@ -508,14 +511,13 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, msg_store_clients = MSCState, len = Len, - bytes = Bytes, ram_bytes = RamBytes, persistent_count = PCount, persistent_bytes = PBytes }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. - Stats = {Bytes, RamBytes, PCount, PBytes}, + Stats = {RamBytes, PCount, PBytes}, {Stats1, IndexState1} = remove_queue_entries(Q4, Stats, IndexState, MSCState), @@ -527,13 +529,13 @@ purge(State = #vqstate { q4 = Q4, Stats1, State #vqstate { q4 = ?QUEUE:new(), index_state = IndexState1 }), - {{Bytes3, RamBytes3, PCount3, PBytes3}, IndexState3} = + {{RamBytes3, PCount3, PBytes3}, IndexState3} = remove_queue_entries(Q1, Stats2, IndexState2, MSCState1), {Len, a(State1 #vqstate { q1 = ?QUEUE:new(), index_state = IndexState3, len = 0, - bytes = Bytes3, + bytes = 0, ram_msg_count = 0, ram_bytes = RamBytes3, persistent_count = PCount3, @@ -561,7 +563,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), State3 = upd_bytes( - 1, MsgStatus1, + 1, 0, MsgStatus1, inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, len = Len + 1, in_counter = InCount1, @@ -585,7 +587,7 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = upd_bytes(1, MsgStatus, + State3 = upd_bytes(0, 1, MsgStatus, State2 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, @@ -843,8 +845,13 @@ info(messages_ram, State) -> info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State); info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> PersistentCount; -info(message_bytes, #vqstate{bytes = Bytes}) -> +info(message_bytes, #vqstate{bytes = Bytes, + unacked_bytes = UBytes}) -> + Bytes + UBytes; +info(message_bytes_ready, #vqstate{bytes = Bytes}) -> Bytes; +info(message_bytes_unacknowledged, #vqstate{unacked_bytes = UBytes}) -> + UBytes; info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> RamBytes; info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> @@ -886,6 +893,7 @@ is_duplicate(_Msg, State) -> {false, State}. a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, bytes = Bytes, + unacked_bytes = UnackedBytes, persistent_count = PersistentCount, persistent_bytes = PersistentBytes, ram_msg_count = RamMsgCount, @@ -904,12 +912,13 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, true = Len >= 0, true = Bytes >= 0, + true = UnackedBytes >= 0, true = PersistentCount >= 0, true = PersistentBytes >= 0, true = RamMsgCount >= 0, true = RamMsgCount =< Len, true = RamBytes >= 0, - true = RamBytes =< Bytes, + true = RamBytes =< Bytes + UnackedBytes, State. @@ -1100,6 +1109,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, ram_msg_count_prev = 0, ram_ack_count_prev = 0, ram_bytes = 0, + unacked_bytes = 0, out_counter = 0, in_counter = 0, rates = blank_rates(Now), @@ -1159,17 +1169,22 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) -> inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) -> State#vqstate{ram_msg_count = RamMsgCount + 1}. -upd_bytes(Sign, MsgStatus = #msg_status{msg = undefined}, State) -> - upd_bytes0(Sign, MsgStatus, State); -upd_bytes(Sign, MsgStatus = #msg_status{msg = _}, State) -> - upd_ram_bytes(Sign, MsgStatus, upd_bytes0(Sign, MsgStatus, State)). +upd_bytes(SignReady, SignUnacked, + MsgStatus = #msg_status{msg = undefined}, State) -> + upd_bytes0(SignReady, SignUnacked, MsgStatus, State); +upd_bytes(SignReady, SignUnacked, MsgStatus = #msg_status{msg = _}, State) -> + upd_ram_bytes(SignReady + SignUnacked, MsgStatus, + upd_bytes0(SignReady, SignUnacked, MsgStatus, State)). -upd_bytes0(Sign, MsgStatus = #msg_status{is_persistent = IsPersistent}, +upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP}, State = #vqstate{bytes = Bytes, + unacked_bytes = UBytes, persistent_bytes = PBytes}) -> - Diff = Sign * msg_size(MsgStatus), - State#vqstate{bytes = Bytes + Diff, - persistent_bytes = PBytes + one_if(IsPersistent) * Diff}. + S = msg_size(MsgStatus), + SignTotal = SignReady + SignUnacked, + State#vqstate{bytes = Bytes + SignReady * S, + unacked_bytes = UBytes + SignUnacked * S, + persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}. upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) -> State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}. @@ -1218,8 +1233,8 @@ remove(AckRequired, MsgStatus = #msg_status { PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), State2 = case AckRequired of - false -> upd_bytes(-1, MsgStatus, State1); - true -> State1 + false -> upd_bytes(-1, 0, MsgStatus, State1); + true -> upd_bytes(-1, 1, MsgStatus, State1) end, {AckTag, maybe_update_rates( State2 #vqstate {ram_msg_count = RamMsgCount1, @@ -1243,16 +1258,15 @@ purge_betas_and_deltas(Stats, index_state = IndexState1 })) end. -remove_queue_entries(Q, {Bytes, RamBytes, PCount, PBytes}, +remove_queue_entries(Q, {RamBytes, PCount, PBytes}, IndexState, MSCState) -> - {MsgIdsByStore, Bytes1, RamBytes1, PBytes1, Delivers, Acks} = + {MsgIdsByStore, RamBytes1, PBytes1, Delivers, Acks} = ?QUEUE:foldl(fun remove_queue_entries1/2, - {orddict:new(), Bytes, RamBytes, PBytes, [], []}, Q), + {orddict:new(), RamBytes, PBytes, [], []}, Q), ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> msg_store_remove(MSCState, IsPersistent, MsgIds) end, ok, MsgIdsByStore), - {{Bytes1, - RamBytes1, + {{RamBytes1, PCount - case orddict:find(true, MsgIdsByStore) of error -> 0; {ok, Ids} -> length(Ids) @@ -1266,12 +1280,11 @@ remove_queue_entries1( is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, is_persistent = IsPersistent, msg_props = #message_properties { size = Size } }, - {MsgIdsByStore, Bytes, RamBytes, PBytes, Delivers, Acks}) -> + {MsgIdsByStore, RamBytes, PBytes, Delivers, Acks}) -> {case MsgOnDisk of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, - Bytes - Size, RamBytes - Size * one_if(Msg =/= undefined), PBytes - Size * one_if(IsPersistent), cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), @@ -1353,7 +1366,7 @@ remove_pending_ack(true, SeqId, State) -> {MsgStatus, State1 = #vqstate { persistent_count = PCount }} = remove_pending_ack(false, SeqId, State), PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent), - {MsgStatus, upd_bytes(-1, MsgStatus, + {MsgStatus, upd_bytes(0, -1, MsgStatus, State1 # vqstate{ persistent_count = PCount1 })}; remove_pending_ack(false, SeqId, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> @@ -1487,7 +1500,7 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = PubFun(MsgStatus, State1), queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], - Limit, PubFun, State2) + Limit, PubFun, upd_bytes(1, -1, MsgStatus, State2)) end; queue_merge(SeqIds, Q, Front, MsgIds, _Limit, _PubFun, State) -> @@ -1501,7 +1514,8 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = maybe_write_to_disk(true, true, MsgStatus, State1), - {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2} + {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], + upd_bytes(1, -1, MsgStatus, State2)} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 |