diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-07-29 17:16:16 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-07-29 17:16:16 +0100 |
commit | 5a43c2153aed157eeeed5abd7405b0d017815b8a (patch) | |
tree | 741763ae4389baae08564cdd7f3026da7e401fcf | |
parent | b35fadf7a75c0f92aea7f12fdd9535ac3d1fbb5e (diff) | |
download | rabbitmq-server-5a43c2153aed157eeeed5abd7405b0d017815b8a.tar.gz |
Rethink how we do this. Previously we tried to manage bytes exactly the same as len, and ram_msg_bytes the same as ram_msg_count. The trouble is that means they only account for ready messages. It would be a bit of overkill to start tracking bytes used for unacked messages separately, so instead we now only track bytes for all messages.
-rw-r--r-- | src/rabbit_variable_queue.erl | 250 |
1 files changed, 128 insertions, 122 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 37bb4ff8..3f41f2cc 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -254,16 +254,16 @@ durable, transient_threshold, - len, - bytes, - persistent_count, - persistent_bytes, + len, %% w/o unacked + bytes, %% w unacked + persistent_count, %% w unacked + persistent_bytes, %% w unacked target_ram_count, - ram_msg_count, + ram_msg_count, %% w/o unacked ram_msg_count_prev, ram_ack_count_prev, - ram_msg_bytes, + ram_bytes, %% w unacked out_counter, in_counter, rates, @@ -506,45 +506,45 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, msg_store_clients = MSCState, len = Len, + bytes = Bytes, + ram_bytes = RamBytes, persistent_count = PCount, persistent_bytes = PBytes }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. - {PCount1, PBytes1, IndexState1} = - remove_queue_entries(Q4, PCount, PBytes, IndexState, MSCState), + Stats = {Bytes, RamBytes, PCount, PBytes}, + {Stats1, IndexState1} = + remove_queue_entries(Q4, Stats, IndexState, MSCState), - {PCount2, PBytes2, State1 = #vqstate { q1 = Q1, - index_state = IndexState2, - msg_store_clients = MSCState1 }} = + {Stats2, State1 = #vqstate { q1 = Q1, + index_state = IndexState2, + msg_store_clients = MSCState1 }} = purge_betas_and_deltas( - PCount1, PBytes1,State #vqstate { q4 = ?QUEUE:new(), - index_state = IndexState1 }), + Stats1, State #vqstate { q4 = ?QUEUE:new(), + index_state = IndexState1 }), - {PCount3, PBytes3, IndexState3} = - remove_queue_entries(Q1, PCount2, PBytes2, IndexState2, MSCState1), + {{Bytes3, RamBytes3, PCount3, PBytes3}, IndexState3} = + remove_queue_entries(Q1, Stats2, IndexState2, MSCState1), {Len, a(State1 #vqstate { q1 = ?QUEUE:new(), index_state = IndexState3, len = 0, - bytes = 0, + bytes = Bytes3, ram_msg_count = 0, - ram_msg_bytes = 0, + ram_bytes = RamBytes3, persistent_count = PCount3, persistent_bytes = PBytes3 })}. purge_acks(State) -> a(purge_pending_ack(false, State)). publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, - MsgProps = #message_properties { needs_confirming = NeedsConfirming, - size = Size}, + MsgProps = #message_properties { needs_confirming = NeedsConfirming }, IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, len = Len, - bytes = Bytes, in_counter = InCount, persistent_count = PCount, - persistent_bytes = PBytes, durable = IsDurable, unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, @@ -556,28 +556,24 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, end, InCount1 = InCount + 1, PCount1 = PCount + one_if(IsPersistent1), - PBytes1 = PBytes + Size * one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = upd_ram_counts(1, MsgStatus, - State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - bytes = Bytes + Size, - in_counter = InCount1, - persistent_count = PCount1, - persistent_bytes = PBytes1, - unconfirmed = UC1 }), + State3 = upd_bytes( + 1, MsgStatus1, + inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount1, + persistent_count = PCount1, + unconfirmed = UC1 })), a(reduce_memory_use(maybe_update_rates(State3))). publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { - needs_confirming = NeedsConfirming, - size = Size }, + needs_confirming = NeedsConfirming }, _ChPid, State = #vqstate { next_seq_id = SeqId, out_counter = OutCount, in_counter = InCount, persistent_count = PCount, - persistent_bytes = PBytes, durable = IsDurable, unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, @@ -585,14 +581,13 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), - PBytes1 = PBytes + Size * one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = State2 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - persistent_bytes = PBytes1, - unconfirmed = UC1 }, + State3 = upd_bytes(1, MsgStatus, + State2 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + unconfirmed = UC1 }), {SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}. discard(_MsgId, _ChPid, State) -> State. @@ -846,9 +841,9 @@ info(messages_ram, State) -> info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> PersistentCount; info(message_bytes, #vqstate{bytes = Bytes}) -> - Bytes; %% TODO this is really message_bytes_ready -info(message_bytes_ram, #vqstate{ram_msg_bytes = RamMsgBytes}) -> - RamMsgBytes; %% TODO this is really message_bytes_ready_ram! + Bytes; +info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> + RamBytes; info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> PersistentBytes; info(backing_queue_status, #vqstate { @@ -890,7 +885,7 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, bytes = Bytes, persistent_count = PersistentCount, ram_msg_count = RamMsgCount, - ram_msg_bytes = RamMsgBytes}) -> + ram_bytes = RamBytes}) -> E1 = ?QUEUE:is_empty(Q1), E2 = ?QUEUE:is_empty(Q2), ED = Delta#delta.count == 0, @@ -908,8 +903,8 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, true = PersistentCount >= 0, true = RamMsgCount >= 0, true = RamMsgCount =< Len, - true = RamMsgBytes >= 0, - true = RamMsgBytes =< Bytes, + true = RamBytes >= 0, + true = RamBytes =< Bytes, State. @@ -1099,7 +1094,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, ram_msg_count = 0, ram_msg_count_prev = 0, ram_ack_count_prev = 0, - ram_msg_bytes = 0, + ram_bytes = 0, out_counter = 0, in_counter = 0, rates = blank_rates(Now), @@ -1124,10 +1119,11 @@ in_r(MsgStatus = #msg_status { msg = undefined }, true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {Msg, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), - upd_ram_counts( + upd_ram_bytes( 1, MsgStatus, - State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { - msg = Msg }, Q4a) }) + inc_ram_msg_count( + State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { + msg = Msg }, Q4a) })) end; in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. @@ -1155,15 +1151,25 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) -> msg_store_read(MSCState, IsPersistent, MsgId), {Msg, State #vqstate {msg_store_clients = MSCState1}}. -upd_ram_counts(Sign, #msg_status{msg_props = #message_properties{size = Size}}, - State = #vqstate{ram_msg_count = RamMsgCount, - ram_msg_bytes = RamMsgBytes}) -> - State#vqstate{ram_msg_count = RamMsgCount + Sign, - ram_msg_bytes = RamMsgBytes + Sign * Size}. +inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) -> + State#vqstate{ram_msg_count = RamMsgCount + 1}. + +upd_bytes(Sign, MsgStatus = #msg_status{msg = undefined}, State) -> + upd_bytes0(Sign, MsgStatus, State); +upd_bytes(Sign, MsgStatus = #msg_status{msg = _}, State) -> + upd_ram_bytes(Sign, MsgStatus, upd_bytes0(Sign, MsgStatus, State)). + +upd_bytes0(Sign, MsgStatus = #msg_status{is_persistent = IsPersistent}, + State = #vqstate{bytes = Bytes, + persistent_bytes = PBytes}) -> + Diff = Sign * msg_size(MsgStatus), + State#vqstate{bytes = Bytes + Diff, + persistent_bytes = PBytes + one_if(IsPersistent) * Diff}. -inc_bytes(#msg_status{msg_props = #message_properties{size = Size}}, - State = #vqstate{bytes = Bytes}) -> - State#vqstate{bytes = Bytes + Size}. +upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) -> + State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}. + +msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. remove(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, @@ -1172,17 +1178,13 @@ remove(AckRequired, MsgStatus = #msg_status { is_persistent = IsPersistent, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, - index_on_disk = IndexOnDisk, - msg_props = #message_properties{ size = Size }}, + index_on_disk = IndexOnDisk }, State = #vqstate {ram_msg_count = RamMsgCount, - ram_msg_bytes = RamMsgBytes, out_counter = OutCount, index_state = IndexState, msg_store_clients = MSCState, len = Len, - bytes = Bytes, - persistent_count = PCount, - persistent_bytes = PBytes}) -> + persistent_count = PCount}) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, @@ -1208,64 +1210,65 @@ remove(AckRequired, MsgStatus = #msg_status { false -> {undefined, State} end, - MaybeOne = one_if(IsPersistent andalso not AckRequired), - PCount1 = PCount - MaybeOne, - PBytes1 = PBytes - Size * MaybeOne, + PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - RamMsgBytes1 = RamMsgBytes - Size * one_if(Msg =/= undefined), - + State2 = case AckRequired of + false -> upd_bytes(-1, MsgStatus, State1); + true -> State1 + end, {AckTag, maybe_update_rates( - State1 #vqstate {ram_msg_count = RamMsgCount1, - ram_msg_bytes = RamMsgBytes1, + State2 #vqstate {ram_msg_count = RamMsgCount1, out_counter = OutCount + 1, index_state = IndexState2, len = Len - 1, - bytes = Bytes - Size, - persistent_count = PCount1, - persistent_bytes = PBytes1})}. + persistent_count = PCount1})}. -purge_betas_and_deltas(PCount, PBytes, +purge_betas_and_deltas(Stats, State = #vqstate { q3 = Q3, index_state = IndexState, msg_store_clients = MSCState }) -> case ?QUEUE:is_empty(Q3) of - true -> {PCount, PBytes, State}; - false -> {PCount1, PBytes1, IndexState1} = - remove_queue_entries( - Q3, PCount, PBytes, IndexState, MSCState), - purge_betas_and_deltas(PCount1, PBytes1, + true -> {Stats, State}; + false -> {Stats1, IndexState1} = remove_queue_entries( + Q3, Stats, IndexState, MSCState), + purge_betas_and_deltas(Stats1, maybe_deltas_to_betas( State #vqstate { q3 = ?QUEUE:new(), index_state = IndexState1 })) end. -remove_queue_entries(Q, PCount, PBytes, IndexState, MSCState) -> - {MsgIdsByStore, PBytes1, Delivers, Acks} = +remove_queue_entries(Q, {Bytes, RamBytes, PCount, PBytes}, + IndexState, MSCState) -> + {MsgIdsByStore, Bytes1, RamBytes1, PBytes1, Delivers, Acks} = ?QUEUE:foldl(fun remove_queue_entries1/2, - {orddict:new(), PBytes, [], []}, Q), + {orddict:new(), Bytes, RamBytes, PBytes, [], []}, Q), ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> msg_store_remove(MSCState, IsPersistent, MsgIds) end, ok, MsgIdsByStore), - {PCount - case orddict:find(true, MsgIdsByStore) of - error -> 0; - {ok, Ids} -> length(Ids) - end, - PBytes1, + {{Bytes1, + RamBytes1, + PCount - case orddict:find(true, MsgIdsByStore) of + error -> 0; + {ok, Ids} -> length(Ids) + end, + PBytes1}, rabbit_queue_index:ack(Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. remove_queue_entries1( - #msg_status { msg_id = MsgId, seq_id = SeqId, + #msg_status { msg_id = MsgId, seq_id = SeqId, msg = Msg, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, is_persistent = IsPersistent, msg_props = #message_properties { size = Size } }, - {MsgIdsByStore, PBytes, Delivers, Acks}) -> + {MsgIdsByStore, Bytes, RamBytes, PBytes, Delivers, Acks}) -> {case MsgOnDisk of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, - PBytes - Size * one_if(IsPersistent), + Bytes - Size, + RamBytes - Size * one_if(Msg =/= undefined), + PBytes - Size * one_if(IsPersistent), cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), cons_if(IndexOnDisk, SeqId, Acks)}. @@ -1341,16 +1344,11 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, end. remove_pending_ack(SeqId, State) -> - {MsgStatus, State1 = #vqstate { persistent_count = PCount, - persistent_bytes = PBytes }} = + {MsgStatus, State1 = #vqstate { persistent_count = PCount }} = remove_pending_ack0(SeqId, State), - #msg_status { msg_props = #message_properties{ size = Size }, - is_persistent = IsPersistent } = MsgStatus, - case IsPersistent of - false -> {MsgStatus, State1}; - true -> {MsgStatus, State1#vqstate{ persistent_count = PCount - 1, - persistent_bytes = PBytes - Size }} - end. + PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent), + {MsgStatus, upd_bytes(-1, MsgStatus, + State1 # vqstate{ persistent_count = PCount1 })}. remove_pending_ack0(SeqId, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) -> @@ -1450,9 +1448,13 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> {Msg, State1} = read_msg(MsgStatus, State), - {MsgStatus#msg_status { msg = Msg }, upd_ram_counts(1, MsgStatus, State1)}; + {MsgStatus#msg_status { msg = Msg }, + upd_ram_bytes(1, MsgStatus, inc_ram_msg_count(State1))}; %% [1] publish_alpha(MsgStatus, State) -> - {MsgStatus, upd_ram_counts(1, MsgStatus, State)}. + {MsgStatus, inc_ram_msg_count(State)}. +%% [1] We increase the ram_bytes here because we paged the message in +%% to requeue it, not purely because we requeued it. Hence in the +%% second head it's already accounted for as already in memory. publish_beta(MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), @@ -1478,7 +1480,7 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = PubFun(MsgStatus, State1), queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], - Limit, PubFun, inc_bytes(MsgStatus1, State2)) + Limit, PubFun, State2) end; queue_merge(SeqIds, Q, Front, MsgIds, _Limit, _PubFun, State) -> @@ -1492,8 +1494,7 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = maybe_write_to_disk(true, true, MsgStatus, State1), - {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], - inc_bytes(MsgStatus, State2)} + {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 @@ -1651,8 +1652,10 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, maybe_write_to_disk(true, false, MsgStatus, State), DPA1 = gb_trees:insert(SeqId, m(trim_msg_status(MsgStatus1)), DPA), limit_ram_acks(Quota - 1, - State1 #vqstate { ram_pending_ack = RPA1, - disk_pending_ack = DPA1 }) + upd_ram_bytes( + -1, MsgStatus1, + State1 #vqstate { ram_pending_ack = RPA1, + disk_pending_ack = DPA1 })) end. permitted_beta_count(#vqstate { len = 0 }) -> @@ -1779,21 +1782,24 @@ push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, {Quota, State}; push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> case credit_flow:blocked() of - true -> - {Quota, State}; - false -> - case Generator(Q) of - {empty, _Q} -> - {Quota, State}; - {{value, MsgStatus}, Qa} -> - {MsgStatus1 = #msg_status { msg_on_disk = true }, State1} = - maybe_write_to_disk(true, false, MsgStatus, State), - MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = Consumer(MsgStatus2, Qa, - upd_ram_counts(-1, MsgStatus2, State1)), - push_alphas_to_betas(Generator, Consumer, Quota - 1, - Qa, State2) - end + true -> {Quota, State}; + false -> case Generator(Q) of + {empty, _Q} -> + {Quota, State}; + {{value, MsgStatus}, Qa} -> + {MsgStatus1 = #msg_status { msg_on_disk = true }, + State1 = #vqstate { ram_msg_count = RamMsgCount }} = + maybe_write_to_disk(true, false, MsgStatus, State), + MsgStatus2 = m(trim_msg_status(MsgStatus1)), + State2 = Consumer( + MsgStatus2, Qa, + upd_ram_bytes( + -1, MsgStatus2, + State1 #vqstate { + ram_msg_count = RamMsgCount - 1})), + push_alphas_to_betas(Generator, Consumer, Quota - 1, + Qa, State2) + end end. push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, |