diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-07-23 16:20:17 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-07-23 16:20:17 +0100 |
commit | 003ef115e3aeecc9465992e27c1f0b1544791eea (patch) | |
tree | dcbdee99159a10c785fb60dac39a19d25a55173e | |
parent | 9fb2f35c1afa4c911850edcb2adf6f8e55e7f666 (diff) | |
download | rabbitmq-server-003ef115e3aeecc9465992e27c1f0b1544791eea.tar.gz |
First pass at queue-size-in-bytes. Only lightly tested, and with no upgrade step for QI. But I wanted to commit something...
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 7 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 53 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 242 |
4 files changed, 188 insertions, 116 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 7a40f9eb..5e41ea93 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -87,7 +87,7 @@ -record(event, {type, props, reference = undefined, timestamp}). --record(message_properties, {expiry, needs_confirming = false}). +-record(message_properties, {expiry, needs_confirming = false, size}). -record(plugin, {name, %% atom() version, %% string() diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4082c53d..04a38b78 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -665,9 +665,12 @@ subtract_acks(ChPid, AckTags, State = #q{consumers = Consumers}, Fun) -> run_message_queue(true, Fun(State1)) end. -message_properties(Message, Confirm, #q{ttl = TTL}) -> +message_properties(Message = #basic_message{content = Content}, + Confirm, #q{ttl = TTL}) -> + #content{payload_fragments_rev = PFR} = Content, #message_properties{expiry = calculate_msg_expiry(Message, TTL), - needs_confirming = Confirm == eventually}. + needs_confirming = Confirm == eventually, + size = iolist_size(PFR)}. calculate_msg_expiry(#basic_message{content = Content}, TTL) -> #content{properties = Props} = diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 56c19d3f..322c0f8a 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -140,8 +140,11 @@ -define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes -define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)). -%% 16 bytes for md5sum + 8 for expiry --define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES)). +-define(SIZE_BYTES, 4). +-define(SIZE_BITS, (?SIZE_BYTES * 8)). + +%% 16 bytes for md5sum + 8 for expiry + 4 for size +-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)). %% + 2 for seq, bits and prefix -define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)). @@ -199,7 +202,8 @@ -spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). -spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), contains_predicate(), on_sync_fun()) -> - {'undefined' | non_neg_integer(), qistate()}). + {'undefined' | non_neg_integer(), + 'undefined' | non_neg_integer(), qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). -spec(publish/5 :: (rabbit_types:msg_id(), seq_id(), @@ -410,12 +414,13 @@ init_clean(RecoveredCounts, State) -> lists:foldl( fun ({Seg, UnackedCount}, SegmentsN) -> Segment = segment_find_or_new(Seg, Dir, SegmentsN), - segment_store(Segment #segment { unacked = UnackedCount }, - SegmentsN) + segment_store( + Segment #segment { unacked = UnackedCount }, + SegmentsN) end, Segments, RecoveredCounts), %% the counts above include transient messages, which would be the %% wrong thing to return - {undefined, State1 # qistate { segments = Segments1 }}. + {undefined, undefined, State1 # qistate { segments = Segments1 }}. init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% Recover the journal completely. This will also load segments @@ -424,7 +429,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% and the journal. State1 = #qistate { dir = Dir, segments = Segments } = recover_journal(State), - {Segments1, Count, DirtyCount} = + {Segments1, Count, Bytes, DirtyCount} = %% Load each segment in turn and filter out messages that are %% not in the msg_store, by adding acks to the journal. These %% acks only go to the RAM journal as it doesn't matter if we @@ -433,16 +438,18 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% dirty count here, so we can call maybe_flush_journal below %% and avoid unnecessary file system operations. lists:foldl( - fun (Seg, {Segments2, CountAcc, DirtyCount}) -> - {Segment = #segment { unacked = UnackedCount }, Dirty} = + fun (Seg, {Segments2, CountAcc, BytesAcc, DirtyCount}) -> + {{Segment = #segment { unacked = UnackedCount }, Dirty}, + UnackedBytes} = recover_segment(ContainsCheckFun, CleanShutdown, segment_find_or_new(Seg, Dir, Segments2)), {segment_store(Segment, Segments2), - CountAcc + UnackedCount, DirtyCount + Dirty} - end, {Segments, 0, 0}, all_segment_nums(State1)), + CountAcc + UnackedCount, + BytesAcc + UnackedBytes, DirtyCount + Dirty} + end, {Segments, 0, 0, 0}, all_segment_nums(State1)), State2 = maybe_flush_journal(State1 #qistate { segments = Segments1, dirty_count = DirtyCount }), - {Count, State2}. + {Count, Bytes, State2}. terminate(State = #qistate { journal_handle = JournalHdl, segments = Segments }) -> @@ -464,12 +471,13 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack}, - SegmentAndDirtyCount) -> - recover_message(ContainsCheckFun(MsgId), CleanShutdown, - Del, RelSeq, SegmentAndDirtyCount) + fun (RelSeq, {{MsgId, MsgProps, _IsPersistent}, Del, no_ack}, + {SegmentAndDirtyCount, Bytes}) -> + {recover_message(ContainsCheckFun(MsgId), CleanShutdown, + Del, RelSeq, SegmentAndDirtyCount), + Bytes + MsgProps#message_properties.size} end, - {Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, + {{Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, 0}, SegEntries1). recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount) -> @@ -549,13 +557,15 @@ scan_segments(Fun, Acc, State) -> %% expiry/binary manipulation %%---------------------------------------------------------------------------- -create_pub_record_body(MsgId, #message_properties { expiry = Expiry }) -> - [MsgId, expiry_to_binary(Expiry)]. +create_pub_record_body(MsgId, #message_properties { expiry = Expiry, + size = Size}) -> + [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>]. expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. -parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) -> +parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, + Size:?SIZE_BITS>>) -> %% work around for binary data fragmentation. See %% rabbit_msg_file:read_next/2 <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>, @@ -563,7 +573,8 @@ parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) -> ?NO_EXPIRY -> undefined; X -> X end, - {MsgId, #message_properties { expiry = Exp }}. + {MsgId, #message_properties { expiry = Exp, + size = Size }}. %%---------------------------------------------------------------------------- %% journal manipulation diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ede69748..1465e884 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -255,12 +255,15 @@ transient_threshold, len, + bytes, persistent_count, + persistent_bytes, target_ram_count, ram_msg_count, ram_msg_count_prev, ram_ack_count_prev, + ram_msg_bytes, out_counter, in_counter, rates, @@ -343,7 +346,9 @@ transient_threshold :: non_neg_integer(), len :: non_neg_integer(), + bytes :: non_neg_integer(), persistent_count :: non_neg_integer(), + persistent_bytes :: non_neg_integer(), target_ram_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), @@ -425,7 +430,7 @@ init(Queue, Recover, AsyncCallback) -> init(#amqqueue { name = QueueName, durable = IsDurable }, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), - init(IsDurable, IndexState, 0, [], + init(IsDurable, IndexState, 0, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, MsgOnDiskFun, AsyncCallback); @@ -440,7 +445,7 @@ init(#amqqueue { name = QueueName, durable = true }, Terms, MsgOnDiskFun, AsyncCallback), TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback), - {DeltaCount, IndexState} = + {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), @@ -448,7 +453,7 @@ init(#amqqueue { name = QueueName, durable = true }, Terms, rabbit_msg_store:contains(MsgId, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, RecoveryTerms, + init(true, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient). process_recovery_terms(Terms=non_clean_shutdown) -> @@ -461,6 +466,7 @@ process_recovery_terms(Terms) -> terminate(_Reason, State) -> State1 = #vqstate { persistent_count = PCount, + persistent_bytes = PBytes, index_state = IndexState, msg_store_clients = {MSCStateP, MSCStateT} } = purge_pending_ack(true, State), @@ -470,7 +476,9 @@ terminate(_Reason, State) -> rabbit_msg_store:client_ref(MSCStateP) end, ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT), - Terms = [{persistent_ref, PRef}, {persistent_count, PCount}], + Terms = [{persistent_ref, PRef}, + {persistent_count, PCount}, + {persistent_bytes, PBytes}], a(State1 #vqstate { index_state = rabbit_queue_index:terminate( Terms, IndexState), msg_store_clients = undefined }). @@ -498,38 +506,45 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, msg_store_clients = MSCState, len = Len, - persistent_count = PCount }) -> + persistent_count = PCount, + persistent_bytes = PBytes }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. - {LensByStore, IndexState1} = remove_queue_entries( - fun ?QUEUE:foldl/3, Q4, - orddict:new(), IndexState, MSCState), - {LensByStore1, State1 = #vqstate { q1 = Q1, - index_state = IndexState2, - msg_store_clients = MSCState1 }} = - purge_betas_and_deltas(LensByStore, - State #vqstate { q4 = ?QUEUE:new(), - index_state = IndexState1 }), - {LensByStore2, IndexState3} = remove_queue_entries( - fun ?QUEUE:foldl/3, Q1, - LensByStore1, IndexState2, MSCState1), - PCount1 = PCount - find_persistent_count(LensByStore2), + {PCount1, PBytes1, IndexState1} = + remove_queue_entries(Q4, PCount, PBytes, IndexState, MSCState), + + {PCount2, PBytes2, State1 = #vqstate { q1 = Q1, + index_state = IndexState2, + msg_store_clients = MSCState1 }} = + purge_betas_and_deltas( + PCount1, PBytes1,State #vqstate { q4 = ?QUEUE:new(), + index_state = IndexState1 }), + + {PCount3, PBytes3, IndexState3} = + remove_queue_entries(Q1, PCount2, PBytes2, IndexState2, MSCState1), + {Len, a(State1 #vqstate { q1 = ?QUEUE:new(), index_state = IndexState3, len = 0, + bytes = 0, ram_msg_count = 0, - persistent_count = PCount1 })}. + ram_msg_bytes = 0, + persistent_count = PCount3, + persistent_bytes = PBytes3 })}. purge_acks(State) -> a(purge_pending_ack(false, State)). publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, - MsgProps = #message_properties { needs_confirming = NeedsConfirming }, + MsgProps = #message_properties { needs_confirming = NeedsConfirming, + size = Size}, IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, len = Len, + bytes = Bytes, in_counter = InCount, persistent_count = PCount, + persistent_bytes = PBytes, durable = IsDurable, unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, @@ -541,22 +556,28 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, end, InCount1 = InCount + 1, PCount1 = PCount + one_if(IsPersistent1), + PBytes1 = PBytes + Size * one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount1, - persistent_count = PCount1, - unconfirmed = UC1 }), + State3 = upd_ram_counts(1, MsgStatus, + State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + bytes = Bytes + Size, + in_counter = InCount1, + persistent_count = PCount1, + persistent_bytes = PBytes1, + unconfirmed = UC1 }), a(reduce_memory_use(maybe_update_rates(State3))). publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { - needs_confirming = NeedsConfirming }, + needs_confirming = NeedsConfirming, + size = Size }, _ChPid, State = #vqstate { next_seq_id = SeqId, out_counter = OutCount, in_counter = InCount, persistent_count = PCount, + persistent_bytes = PBytes, durable = IsDurable, unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, @@ -564,11 +585,13 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), + PBytes1 = PBytes + Size * one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), State3 = State2 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, persistent_count = PCount1, + persistent_bytes = PBytes1, unconfirmed = UC1 }, {SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}. @@ -638,7 +661,6 @@ ack([SeqId], State) -> index_on_disk = IndexOnDisk }, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, - persistent_count = PCount, ack_out_counter = AckOutCount }} = remove_pending_ack(SeqId, State), IndexState1 = case IndexOnDisk of @@ -649,16 +671,13 @@ ack([SeqId], State) -> true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); false -> ok end, - PCount1 = PCount - one_if(IsPersistent), {[MsgId], a(State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, ack_out_counter = AckOutCount + 1 })}; ack(AckTags, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, - persistent_count = PCount, ack_out_counter = AckOutCount }} = lists:foldl( fun (SeqId, {Acc, State2}) -> @@ -668,11 +687,8 @@ ack(AckTags, State) -> IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], - PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( - orddict:new(), MsgIdsByStore)), {lists:reverse(AllMsgIds), a(State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) })}. requeue(AckTags, #vqstate { delta = Delta, @@ -824,12 +840,15 @@ msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, + bytes = Bytes, ram_pending_ack = RPA, disk_pending_ack = DPA, target_ram_count = TargetRamCount, ram_msg_count = RamMsgCount, + ram_msg_bytes = RamMsgBytes, next_seq_id = NextSeqId, persistent_count = PersistentCount, + persistent_bytes = PersistentBytes, rates = #rates { in = AvgIngressRate, out = AvgEgressRate, ack_in = AvgAckIngressRate, @@ -841,12 +860,15 @@ status(#vqstate { {q3 , ?QUEUE:len(Q3)}, {q4 , ?QUEUE:len(Q4)}, {len , Len}, + {bytes , Bytes}, {pending_acks , gb_trees:size(RPA) + gb_trees:size(DPA)}, {target_ram_count , TargetRamCount}, {ram_msg_count , RamMsgCount}, + {ram_msg_bytes , RamMsgBytes}, {ram_ack_count , gb_trees:size(RPA)}, {next_seq_id , NextSeqId}, {persistent_count , PersistentCount}, + {persistent_bytes , PersistentBytes}, {avg_ingress_rate , AvgIngressRate}, {avg_egress_rate , AvgEgressRate}, {avg_ack_ingress_rate, AvgAckIngressRate}, @@ -863,8 +885,10 @@ is_duplicate(_Msg, State) -> {false, State}. a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, + bytes = Bytes, persistent_count = PersistentCount, - ram_msg_count = RamMsgCount }) -> + ram_msg_count = RamMsgCount, + ram_msg_bytes = RamMsgBytes}) -> E1 = ?QUEUE:is_empty(Q1), E2 = ?QUEUE:is_empty(Q2), ED = Delta#delta.count == 0, @@ -878,9 +902,12 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, true = LZ == (E3 and E4), true = Len >= 0, + true = Bytes >= 0, true = PersistentCount >= 0, true = RamMsgCount >= 0, true = RamMsgCount =< Len, + true = RamMsgBytes >= 0, + true = RamMsgBytes =< Bytes, State. @@ -1028,15 +1055,17 @@ expand_delta(_SeqId, #delta { count = Count } = Delta) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -init(IsDurable, IndexState, DeltaCount, Terms, +init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), - DeltaCount1 = + {DeltaCount1, DeltaBytes1} = case Terms of - non_clean_shutdown -> DeltaCount; - _ -> proplists:get_value(persistent_count, - Terms, DeltaCount) + non_clean_shutdown -> {DeltaCount, DeltaBytes}; + _ -> {proplists:get_value(persistent_count, + Terms, DeltaCount), + proplists:get_value(persistent_bytes, + Terms, DeltaBytes)} end, Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of true -> ?BLANK_DELTA; @@ -1061,11 +1090,14 @@ init(IsDurable, IndexState, DeltaCount, Terms, len = DeltaCount1, persistent_count = DeltaCount1, + bytes = DeltaBytes1, + persistent_bytes = DeltaBytes1, target_ram_count = infinity, ram_msg_count = 0, ram_msg_count_prev = 0, ram_ack_count_prev = 0, + ram_msg_bytes = 0, out_counter = 0, in_counter = 0, rates = blank_rates(Now), @@ -1090,7 +1122,8 @@ in_r(MsgStatus = #msg_status { msg = undefined }, true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {Msg, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), - inc_ram_msg_count( + upd_ram_counts( + 1, MsgStatus, State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { msg = Msg }, Q4a) }) end; @@ -1120,8 +1153,15 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) -> msg_store_read(MSCState, IsPersistent, MsgId), {Msg, State #vqstate {msg_store_clients = MSCState1}}. -inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) -> - State#vqstate{ram_msg_count = RamMsgCount + 1}. +upd_ram_counts(Sign, #msg_status{msg_props = #message_properties{size = Size}}, + State = #vqstate{ram_msg_count = RamMsgCount, + ram_msg_bytes = RamMsgBytes}) -> + State#vqstate{ram_msg_count = RamMsgCount + Sign, + ram_msg_bytes = RamMsgBytes + Sign * Size}. + +inc_bytes(#msg_status{msg_props = #message_properties{size = Size}}, + State = #vqstate{bytes = Bytes}) -> + State#vqstate{bytes = Bytes + Size}. remove(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, @@ -1130,13 +1170,17 @@ remove(AckRequired, MsgStatus = #msg_status { is_persistent = IsPersistent, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, - index_on_disk = IndexOnDisk }, + index_on_disk = IndexOnDisk, + msg_props = #message_properties{ size = Size }}, State = #vqstate {ram_msg_count = RamMsgCount, + ram_msg_bytes = RamMsgBytes, out_counter = OutCount, index_state = IndexState, msg_store_clients = MSCState, len = Len, - persistent_count = PCount}) -> + bytes = Bytes, + persistent_count = PCount, + persistent_bytes = PBytes}) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, @@ -1162,60 +1206,67 @@ remove(AckRequired, MsgStatus = #msg_status { false -> {undefined, State} end, - PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), + MaybeOne = one_if(IsPersistent andalso not AckRequired), + PCount1 = PCount - MaybeOne, + PBytes1 = PBytes - Size * MaybeOne, RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), + RamMsgBytes1 = RamMsgBytes - Size * one_if(Msg =/= undefined), {AckTag, maybe_update_rates( State1 #vqstate {ram_msg_count = RamMsgCount1, + ram_msg_bytes = RamMsgBytes1, out_counter = OutCount + 1, index_state = IndexState2, len = Len - 1, - persistent_count = PCount1})}. + bytes = Bytes - Size, + persistent_count = PCount1, + persistent_bytes = PBytes1})}. -purge_betas_and_deltas(LensByStore, +purge_betas_and_deltas(PCount, PBytes, State = #vqstate { q3 = Q3, index_state = IndexState, msg_store_clients = MSCState }) -> case ?QUEUE:is_empty(Q3) of - true -> {LensByStore, State}; - false -> {LensByStore1, IndexState1} = - remove_queue_entries(fun ?QUEUE:foldl/3, Q3, - LensByStore, IndexState, MSCState), - purge_betas_and_deltas(LensByStore1, + true -> {PCount, PBytes, State}; + false -> {PCount1, PBytes1, IndexState1} = + remove_queue_entries( + Q3, PCount, PBytes, IndexState, MSCState), + purge_betas_and_deltas(PCount1, PBytes1, maybe_deltas_to_betas( State #vqstate { q3 = ?QUEUE:new(), index_state = IndexState1 })) end. -remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) -> - {MsgIdsByStore, Delivers, Acks} = - Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q), +remove_queue_entries(Q, PCount, PBytes, IndexState, MSCState) -> + {MsgIdsByStore, PBytes1, Delivers, Acks} = + ?QUEUE:foldl(fun remove_queue_entries1/2, + {orddict:new(), PBytes, [], []}, Q), ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> msg_store_remove(MSCState, IsPersistent, MsgIds) end, ok, MsgIdsByStore), - {sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore), + {PCount - case orddict:find(true, MsgIdsByStore) of + error -> 0; + {ok, Ids} -> length(Ids) + end, + PBytes1, rabbit_queue_index:ack(Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. remove_queue_entries1( #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, - index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, - {MsgIdsByStore, Delivers, Acks}) -> + index_on_disk = IndexOnDisk, is_persistent = IsPersistent, + msg_props = #message_properties { size = Size } }, + {MsgIdsByStore, PBytes, Delivers, Acks}) -> {case MsgOnDisk of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, + PBytes - Size * one_if(IsPersistent), cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), cons_if(IndexOnDisk, SeqId, Acks)}. -sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) -> - orddict:fold( - fun (IsPersistent, MsgIds, LensByStore1) -> - orddict:update_counter(IsPersistent, length(MsgIds), LensByStore1) - end, LensByStore, MsgIdsByStore). - %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- @@ -1287,8 +1338,20 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, none -> gb_trees:get(SeqId, DPA) end. -remove_pending_ack(SeqId, State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> +remove_pending_ack(SeqId, State) -> + {MsgStatus, State1 = #vqstate { persistent_count = PCount, + persistent_bytes = PBytes }} = + remove_pending_ack0(SeqId, State), + #msg_status { msg_props = #message_properties{ size = Size }, + is_persistent = IsPersistent } = MsgStatus, + case IsPersistent of + false -> {MsgStatus, State1}; + true -> {MsgStatus, State1#vqstate{ persistent_count = PCount - 1, + persistent_bytes = PBytes - Size }} + end. + +remove_pending_ack0(SeqId, State = #vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA }) -> case gb_trees:lookup(SeqId, RPA) of {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA), {V, State #vqstate { ram_pending_ack = RPA1 }}; @@ -1338,12 +1401,6 @@ accumulate_ack(#msg_status { seq_id = SeqId, end, [MsgId | AllMsgIds]}. -find_persistent_count(LensByStore) -> - case orddict:find(true, LensByStore) of - error -> 0; - {ok, Len} -> Len - end. - %%---------------------------------------------------------------------------- %% Internal plumbing for confirms (aka publisher acks) %%---------------------------------------------------------------------------- @@ -1391,9 +1448,9 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> {Msg, State1} = read_msg(MsgStatus, State), - {MsgStatus#msg_status { msg = Msg }, inc_ram_msg_count(State1)}; + {MsgStatus#msg_status { msg = Msg }, upd_ram_counts(1, MsgStatus, State1)}; publish_alpha(MsgStatus, State) -> - {MsgStatus, inc_ram_msg_count(State)}. + {MsgStatus, upd_ram_counts(1, MsgStatus, State)}. publish_beta(MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), @@ -1419,7 +1476,7 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = PubFun(MsgStatus, State1), queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], - Limit, PubFun, State2) + Limit, PubFun, inc_bytes(MsgStatus1, State2)) end; queue_merge(SeqIds, Q, Front, MsgIds, _Limit, _PubFun, State) -> @@ -1433,13 +1490,14 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = maybe_write_to_disk(true, true, MsgStatus, State1), - {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2} + {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], + inc_bytes(MsgStatus, State2)} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 msg_from_pending_ack(SeqId, State) -> {#msg_status { msg_props = MsgProps } = MsgStatus, State1} = - remove_pending_ack(SeqId, State), + remove_pending_ack0(SeqId, State), {MsgStatus #msg_status { msg_props = MsgProps #message_properties { needs_confirming = false } }, State1}. @@ -1719,21 +1777,21 @@ push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, {Quota, State}; push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> case credit_flow:blocked() of - true -> {Quota, State}; - false -> case Generator(Q) of - {empty, _Q} -> - {Quota, State}; - {{value, MsgStatus}, Qa} -> - {MsgStatus1 = #msg_status { msg_on_disk = true }, - State1 = #vqstate { ram_msg_count = RamMsgCount }} = - maybe_write_to_disk(true, false, MsgStatus, State), - MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = Consumer(MsgStatus2, Qa, - State1 #vqstate { - ram_msg_count = RamMsgCount - 1 }), - push_alphas_to_betas(Generator, Consumer, Quota - 1, - Qa, State2) - end + true -> + {Quota, State}; + false -> + case Generator(Q) of + {empty, _Q} -> + {Quota, State}; + {{value, MsgStatus}, Qa} -> + {MsgStatus1 = #msg_status { msg_on_disk = true }, State1} = + maybe_write_to_disk(true, false, MsgStatus, State), + MsgStatus2 = m(trim_msg_status(MsgStatus1)), + State2 = Consumer(MsgStatus2, Qa, + upd_ram_counts(1, MsgStatus2, State1)), + push_alphas_to_betas(Generator, Consumer, Quota - 1, + Qa, State2) + end end. push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, |