diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-07 14:38:21 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-07 14:38:21 +0100 |
commit | a526f7c8995b0c575bf0200aed3b21083f2b8b07 (patch) | |
tree | b5677d5fb9b8e790026d9e24ff29ee2deea4c9e7 | |
parent | 62467ca12d8d3e4c1bd41a41aa22fc5a94180c2e (diff) | |
parent | 2c9b9305342270852d171ba7035242fe6d3986ac (diff) | |
download | rabbitmq-server-a526f7c8995b0c575bf0200aed3b21083f2b8b07.tar.gz |
Merge in default
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 7 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 1 | ||||
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 5 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 99 | ||||
-rw-r--r-- | src/rabbit_recovery_terms.erl | 22 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 23 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 232 |
8 files changed, 266 insertions, 125 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 7a40f9eb..5e41ea93 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -87,7 +87,7 @@ -record(event, {type, props, reference = undefined, timestamp}). --record(message_properties, {expiry, needs_confirming = false}). +-record(message_properties, {expiry, needs_confirming = false, size}). -record(plugin, {name, %% atom() version, %% string() diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 63b18655..8f44d761 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -666,9 +666,12 @@ subtract_acks(ChPid, AckTags, State = #q{consumers = Consumers}, Fun) -> run_message_queue(true, Fun(State1)) end. -message_properties(Message, Confirm, #q{ttl = TTL}) -> +message_properties(Message = #basic_message{content = Content}, + Confirm, #q{ttl = TTL}) -> + #content{payload_fragments_rev = PFR} = Content, #message_properties{expiry = calculate_msg_expiry(Message, TTL), - needs_confirming = Confirm == eventually}. + needs_confirming = Confirm == eventually, + size = iolist_size(PFR)}. calculate_msg_expiry(#basic_message{content = Content}, TTL) -> #content{properties = Props} = diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 9e5f0813..595a05d3 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -20,6 +20,7 @@ -define(INFO_KEYS, [messages_ram, messages_ready_ram, messages_unacknowledged_ram, messages_persistent, + message_bytes, message_bytes_ram, message_bytes_persistent, backing_queue_status]). -ifdef(use_specs). diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 49b71122..622b1b16 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -116,7 +116,8 @@ qc_publish(#state{bqstate = BQ}) -> [qc_message(), #message_properties{needs_confirming = frequency([{1, true}, {20, false}]), - expiry = oneof([undefined | lists:seq(1, 10)])}, + expiry = oneof([undefined | lists:seq(1, 10)]), + size = 10}, false, self(), BQ]}. qc_publish_multiple(#state{}) -> @@ -124,7 +125,7 @@ qc_publish_multiple(#state{}) -> qc_publish_delivered(#state{bqstate = BQ}) -> {call, ?BQMOD, publish_delivered, - [qc_message(), #message_properties{}, self(), BQ]}. + [qc_message(), #message_properties{size = 10}, self(), BQ]}. qc_fetch(#state{bqstate = BQ}) -> {call, ?BQMOD, fetch, [boolean(), BQ]}. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 56c19d3f..923abb17 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -21,7 +21,7 @@ publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). --export([add_queue_ttl/0, avoid_zeroes/0]). +-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0]). -define(CLEAN_FILENAME, "clean.dot"). @@ -140,8 +140,11 @@ -define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes -define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)). -%% 16 bytes for md5sum + 8 for expiry --define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES)). +-define(SIZE_BYTES, 4). +-define(SIZE_BITS, (?SIZE_BYTES * 8)). + +%% 16 bytes for md5sum + 8 for expiry + 4 for size +-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)). %% + 2 for seq, bits and prefix -define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)). @@ -168,8 +171,9 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({add_queue_ttl, local, []}). --rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}). +-rabbit_upgrade({add_queue_ttl, local, []}). +-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}). +-rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}). -ifdef(use_specs). @@ -199,7 +203,8 @@ -spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). -spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), contains_predicate(), on_sync_fun()) -> - {'undefined' | non_neg_integer(), qistate()}). + {'undefined' | non_neg_integer(), + 'undefined' | non_neg_integer(), qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). -spec(publish/5 :: (rabbit_types:msg_id(), seq_id(), @@ -410,12 +415,13 @@ init_clean(RecoveredCounts, State) -> lists:foldl( fun ({Seg, UnackedCount}, SegmentsN) -> Segment = segment_find_or_new(Seg, Dir, SegmentsN), - segment_store(Segment #segment { unacked = UnackedCount }, - SegmentsN) + segment_store( + Segment #segment { unacked = UnackedCount }, + SegmentsN) end, Segments, RecoveredCounts), %% the counts above include transient messages, which would be the %% wrong thing to return - {undefined, State1 # qistate { segments = Segments1 }}. + {undefined, undefined, State1 # qistate { segments = Segments1 }}. init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% Recover the journal completely. This will also load segments @@ -424,7 +430,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% and the journal. State1 = #qistate { dir = Dir, segments = Segments } = recover_journal(State), - {Segments1, Count, DirtyCount} = + {Segments1, Count, Bytes, DirtyCount} = %% Load each segment in turn and filter out messages that are %% not in the msg_store, by adding acks to the journal. These %% acks only go to the RAM journal as it doesn't matter if we @@ -433,16 +439,18 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% dirty count here, so we can call maybe_flush_journal below %% and avoid unnecessary file system operations. lists:foldl( - fun (Seg, {Segments2, CountAcc, DirtyCount}) -> - {Segment = #segment { unacked = UnackedCount }, Dirty} = + fun (Seg, {Segments2, CountAcc, BytesAcc, DirtyCount}) -> + {{Segment = #segment { unacked = UnackedCount }, Dirty}, + UnackedBytes} = recover_segment(ContainsCheckFun, CleanShutdown, segment_find_or_new(Seg, Dir, Segments2)), {segment_store(Segment, Segments2), - CountAcc + UnackedCount, DirtyCount + Dirty} - end, {Segments, 0, 0}, all_segment_nums(State1)), + CountAcc + UnackedCount, + BytesAcc + UnackedBytes, DirtyCount + Dirty} + end, {Segments, 0, 0, 0}, all_segment_nums(State1)), State2 = maybe_flush_journal(State1 #qistate { segments = Segments1, dirty_count = DirtyCount }), - {Count, State2}. + {Count, Bytes, State2}. terminate(State = #qistate { journal_handle = JournalHdl, segments = Segments }) -> @@ -464,12 +472,16 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack}, - SegmentAndDirtyCount) -> - recover_message(ContainsCheckFun(MsgId), CleanShutdown, - Del, RelSeq, SegmentAndDirtyCount) + fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, Del, no_ack}, + {SegmentAndDirtyCount, Bytes}) -> + {recover_message(ContainsCheckFun(MsgId), CleanShutdown, + Del, RelSeq, SegmentAndDirtyCount), + Bytes + case IsPersistent of + true -> MsgProps#message_properties.size; + false -> 0 + end} end, - {Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, + {{Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, 0}, SegEntries1). recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount) -> @@ -549,13 +561,15 @@ scan_segments(Fun, Acc, State) -> %% expiry/binary manipulation %%---------------------------------------------------------------------------- -create_pub_record_body(MsgId, #message_properties { expiry = Expiry }) -> - [MsgId, expiry_to_binary(Expiry)]. +create_pub_record_body(MsgId, #message_properties { expiry = Expiry, + size = Size}) -> + [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>]. expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. -parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) -> +parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, + Size:?SIZE_BITS>>) -> %% work around for binary data fragmentation. See %% rabbit_msg_file:read_next/2 <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>, @@ -563,7 +577,8 @@ parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) -> ?NO_EXPIRY -> undefined; X -> X end, - {MsgId, #message_properties { expiry = Exp }}. + {MsgId, #message_properties { expiry = Exp, + size = Size }}. %%---------------------------------------------------------------------------- %% journal manipulation @@ -1064,6 +1079,42 @@ avoid_zeroes_segment(<<0:?REL_SEQ_ONLY_PREFIX_BITS, avoid_zeroes_segment(_) -> stop. +%% At upgrade time we just define every message's size as 0 - that +%% will save us a load of faff with the message store, and means we +%% can actually use the clean recovery terms in VQ. It does mean we +%% don't count message bodies from before the migration, but we can +%% live with that. +store_msg_size() -> + foreach_queue_index({fun store_msg_size_journal/1, + fun store_msg_size_segment/1}). + +store_msg_size_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +store_msg_size_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +store_msg_size_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, + Rest/binary>>) -> + {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest}; +store_msg_size_journal(_) -> + stop. + +store_msg_size_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, Rest/binary>>) -> + {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest}; +store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> + {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, + Rest}; +store_msg_size_segment(_) -> + stop. + + %%---------------------------------------------------------------------------- foreach_queue_index(Funs) -> diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index bbf38f58..f169e13d 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -23,11 +23,14 @@ -export([start/0, stop/0, store/2, read/1, clear/0]). --export([upgrade_recovery_terms/0, start_link/0]). +-export([start_link/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-export([upgrade_recovery_terms/0, persistent_bytes/0]). + -rabbit_upgrade({upgrade_recovery_terms, local, []}). +-rabbit_upgrade({persistent_bytes, local, [upgrade_recovery_terms]}). %%---------------------------------------------------------------------------- @@ -61,6 +64,8 @@ clear() -> dets:delete_all_objects(?MODULE), flush(). +start_link() -> gen_server:start_link(?MODULE, [], []). + %%---------------------------------------------------------------------------- upgrade_recovery_terms() -> @@ -84,7 +89,20 @@ upgrade_recovery_terms() -> close_table() end. -start_link() -> gen_server:start_link(?MODULE, [], []). +persistent_bytes() -> dets_upgrade(fun persistent_bytes/1). +persistent_bytes(Props) -> Props ++ [{persistent_bytes, 0}]. + +dets_upgrade(Fun)-> + open_table(), + try + ok = dets:foldl(fun ({DirBaseName, Terms}, Acc) -> + store(DirBaseName, Fun(Terms)), + Acc + end, ok, ?MODULE), + ok + after + close_table() + end. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 34a8cc5c..9eddb51d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2210,13 +2210,13 @@ restart_test_queue(Qi) -> empty_test_queue() -> ok = rabbit_variable_queue:stop(), {ok, _} = rabbit_variable_queue:start([]), - {0, Qi} = init_test_queue(), + {0, 0, Qi} = init_test_queue(), _ = rabbit_queue_index:delete_and_terminate(Qi), ok. with_empty_test_queue(Fun) -> ok = empty_test_queue(), - {0, Qi} = init_test_queue(), + {0, 0, Qi} = init_test_queue(), rabbit_queue_index:delete_and_terminate(Fun(Qi)). restart_app() -> @@ -2235,7 +2235,8 @@ queue_index_publish(SeqIds, Persistent, Qi) -> fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) -> MsgId = rabbit_guid:gen(), QiM = rabbit_queue_index:publish( - MsgId, SeqId, #message_properties{}, Persistent, QiN), + MsgId, SeqId, #message_properties{size = 10}, + Persistent, QiN), ok = rabbit_msg_store:write(MsgId, MsgId, MSCState), {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]} end, {Qi, []}, SeqIds), @@ -2257,7 +2258,7 @@ test_queue_index_props() -> with_empty_test_queue( fun(Qi0) -> MsgId = rabbit_guid:gen(), - Props = #message_properties{expiry=12345}, + Props = #message_properties{expiry=12345, size = 10}, Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0), {[{MsgId, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1), @@ -2287,7 +2288,7 @@ test_queue_index() -> ok = verify_read_with_published(false, false, ReadA, lists:reverse(SeqIdsMsgIdsA)), %% should get length back as 0, as all the msgs were transient - {0, Qi6} = restart_test_queue(Qi4), + {0, 0, Qi6} = restart_test_queue(Qi4), {0, 0, Qi7} = rabbit_queue_index:bounds(Qi6), {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7), {0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8), @@ -2296,7 +2297,8 @@ test_queue_index() -> lists:reverse(SeqIdsMsgIdsB)), %% should get length back as MostOfASegment LenB = length(SeqIdsB), - {LenB, Qi12} = restart_test_queue(Qi10), + BytesB = LenB * 10, + {LenB, BytesB, Qi12} = restart_test_queue(Qi10), {0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12), Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13), {ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14), @@ -2308,7 +2310,7 @@ test_queue_index() -> {0, 0, Qi18} = rabbit_queue_index:bounds(Qi17), %% should get length back as 0 because all persistent %% msgs have been acked - {0, Qi19} = restart_test_queue(Qi18), + {0, 0, Qi19} = restart_test_queue(Qi18), Qi19 end), @@ -2380,11 +2382,11 @@ test_queue_index() -> true, Qi0), Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1), Qi3 = rabbit_queue_index:ack([0], Qi2), - {5, Qi4} = restart_test_queue(Qi3), + {5, 50, Qi4} = restart_test_queue(Qi3), {Qi5, _SeqIdsMsgIdsF} = queue_index_publish([3,6,8], true, Qi4), Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5), Qi7 = rabbit_queue_index:ack([1,2,3], Qi6), - {5, Qi8} = restart_test_queue(Qi7), + {5, 50, Qi8} = restart_test_queue(Qi7), Qi8 end), @@ -2419,7 +2421,8 @@ variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> false -> 1 end}, PayloadFun(N)), - PropFun(N, #message_properties{}), false, self(), VQN) + PropFun(N, #message_properties{size = 10}), + false, self(), VQN) end, VQ, lists:seq(Start, Start + Count - 1))). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 03b99562..a598dfab 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -254,13 +254,16 @@ durable, transient_threshold, - len, - persistent_count, + len, %% w/o unacked + bytes, %% w unacked + persistent_count, %% w unacked + persistent_bytes, %% w unacked target_ram_count, - ram_msg_count, + ram_msg_count, %% w/o unacked ram_msg_count_prev, ram_ack_count_prev, + ram_bytes, %% w unacked out_counter, in_counter, rates, @@ -343,7 +346,9 @@ transient_threshold :: non_neg_integer(), len :: non_neg_integer(), + bytes :: non_neg_integer(), persistent_count :: non_neg_integer(), + persistent_bytes :: non_neg_integer(), target_ram_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), @@ -425,7 +430,7 @@ init(Queue, Recover, AsyncCallback) -> init(#amqqueue { name = QueueName, durable = IsDurable }, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), - init(IsDurable, IndexState, 0, [], + init(IsDurable, IndexState, 0, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, MsgOnDiskFun, AsyncCallback); @@ -440,7 +445,7 @@ init(#amqqueue { name = QueueName, durable = true }, Terms, MsgOnDiskFun, AsyncCallback), TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback), - {DeltaCount, IndexState} = + {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), @@ -448,7 +453,7 @@ init(#amqqueue { name = QueueName, durable = true }, Terms, rabbit_msg_store:contains(MsgId, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, RecoveryTerms, + init(true, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient). process_recovery_terms(Terms=non_clean_shutdown) -> @@ -461,6 +466,7 @@ process_recovery_terms(Terms) -> terminate(_Reason, State) -> State1 = #vqstate { persistent_count = PCount, + persistent_bytes = PBytes, index_state = IndexState, msg_store_clients = {MSCStateP, MSCStateT} } = purge_pending_ack(true, State), @@ -470,7 +476,9 @@ terminate(_Reason, State) -> rabbit_msg_store:client_ref(MSCStateP) end, ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT), - Terms = [{persistent_ref, PRef}, {persistent_count, PCount}], + Terms = [{persistent_ref, PRef}, + {persistent_count, PCount}, + {persistent_bytes, PBytes}], a(State1 #vqstate { index_state = rabbit_queue_index:terminate( Terms, IndexState), msg_store_clients = undefined }). @@ -498,28 +506,35 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, msg_store_clients = MSCState, len = Len, - persistent_count = PCount }) -> + bytes = Bytes, + ram_bytes = RamBytes, + persistent_count = PCount, + persistent_bytes = PBytes }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. - {LensByStore, IndexState1} = remove_queue_entries( - fun ?QUEUE:foldl/3, Q4, - orddict:new(), IndexState, MSCState), - {LensByStore1, State1 = #vqstate { q1 = Q1, - index_state = IndexState2, - msg_store_clients = MSCState1 }} = - purge_betas_and_deltas(LensByStore, - State #vqstate { q4 = ?QUEUE:new(), - index_state = IndexState1 }), - {LensByStore2, IndexState3} = remove_queue_entries( - fun ?QUEUE:foldl/3, Q1, - LensByStore1, IndexState2, MSCState1), - PCount1 = PCount - find_persistent_count(LensByStore2), + Stats = {Bytes, RamBytes, PCount, PBytes}, + {Stats1, IndexState1} = + remove_queue_entries(Q4, Stats, IndexState, MSCState), + + {Stats2, State1 = #vqstate { q1 = Q1, + index_state = IndexState2, + msg_store_clients = MSCState1 }} = + purge_betas_and_deltas( + Stats1, State #vqstate { q4 = ?QUEUE:new(), + index_state = IndexState1 }), + + {{Bytes3, RamBytes3, PCount3, PBytes3}, IndexState3} = + remove_queue_entries(Q1, Stats2, IndexState2, MSCState1), + {Len, a(State1 #vqstate { q1 = ?QUEUE:new(), index_state = IndexState3, len = 0, + bytes = Bytes3, ram_msg_count = 0, - persistent_count = PCount1 })}. + ram_bytes = RamBytes3, + persistent_count = PCount3, + persistent_bytes = PBytes3 })}. purge_acks(State) -> a(purge_pending_ack(false, State)). @@ -542,11 +557,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, InCount1 = InCount + 1, PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount1, - persistent_count = PCount1, - unconfirmed = UC1 }), + State3 = upd_bytes( + 1, MsgStatus1, + inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount1, + persistent_count = PCount1, + unconfirmed = UC1 })), a(reduce_memory_use(maybe_update_rates(State3))). publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, @@ -565,11 +582,12 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = State2 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - unconfirmed = UC1 }, + State3 = upd_bytes(1, MsgStatus, + State2 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + unconfirmed = UC1 }), {SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}. discard(_MsgId, _ChPid, State) -> State. @@ -638,7 +656,6 @@ ack([SeqId], State) -> index_on_disk = IndexOnDisk }, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, - persistent_count = PCount, ack_out_counter = AckOutCount }} = remove_pending_ack(SeqId, State), IndexState1 = case IndexOnDisk of @@ -649,16 +666,13 @@ ack([SeqId], State) -> true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); false -> ok end, - PCount1 = PCount - one_if(IsPersistent), {[MsgId], a(State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, ack_out_counter = AckOutCount + 1 })}; ack(AckTags, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, - persistent_count = PCount, ack_out_counter = AckOutCount }} = lists:foldl( fun (SeqId, {Acc, State2}) -> @@ -668,11 +682,8 @@ ack(AckTags, State) -> IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], - PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( - orddict:new(), MsgIdsByStore)), {lists:reverse(AllMsgIds), a(State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) })}. requeue(AckTags, #vqstate { delta = Delta, @@ -829,6 +840,12 @@ info(messages_ram, State) -> info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State); info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> PersistentCount; +info(message_bytes, #vqstate{bytes = Bytes}) -> + Bytes; +info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> + RamBytes; +info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> + PersistentBytes; info(backing_queue_status, #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, @@ -865,8 +882,10 @@ is_duplicate(_Msg, State) -> {false, State}. a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, + bytes = Bytes, persistent_count = PersistentCount, - ram_msg_count = RamMsgCount }) -> + ram_msg_count = RamMsgCount, + ram_bytes = RamBytes}) -> E1 = ?QUEUE:is_empty(Q1), E2 = ?QUEUE:is_empty(Q2), ED = Delta#delta.count == 0, @@ -880,9 +899,12 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, true = LZ == (E3 and E4), true = Len >= 0, + true = Bytes >= 0, true = PersistentCount >= 0, true = RamMsgCount >= 0, true = RamMsgCount =< Len, + true = RamBytes >= 0, + true = RamBytes =< Bytes, State. @@ -1030,15 +1052,17 @@ expand_delta(_SeqId, #delta { count = Count } = Delta) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -init(IsDurable, IndexState, DeltaCount, Terms, +init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), - DeltaCount1 = + {DeltaCount1, DeltaBytes1} = case Terms of - non_clean_shutdown -> DeltaCount; - _ -> proplists:get_value(persistent_count, - Terms, DeltaCount) + non_clean_shutdown -> {DeltaCount, DeltaBytes}; + _ -> {proplists:get_value(persistent_count, + Terms, DeltaCount), + proplists:get_value(persistent_bytes, + Terms, DeltaBytes)} end, Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of true -> ?BLANK_DELTA; @@ -1063,11 +1087,14 @@ init(IsDurable, IndexState, DeltaCount, Terms, len = DeltaCount1, persistent_count = DeltaCount1, + bytes = DeltaBytes1, + persistent_bytes = DeltaBytes1, target_ram_count = infinity, ram_msg_count = 0, ram_msg_count_prev = 0, ram_ack_count_prev = 0, + ram_bytes = 0, out_counter = 0, in_counter = 0, rates = blank_rates(Now), @@ -1092,9 +1119,11 @@ in_r(MsgStatus = #msg_status { msg = undefined }, true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {Msg, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), - inc_ram_msg_count( - State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { - msg = Msg }, Q4a) }) + upd_ram_bytes( + 1, MsgStatus, + inc_ram_msg_count( + State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { + msg = Msg }, Q4a) })) end; in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. @@ -1125,6 +1154,23 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) -> inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) -> State#vqstate{ram_msg_count = RamMsgCount + 1}. +upd_bytes(Sign, MsgStatus = #msg_status{msg = undefined}, State) -> + upd_bytes0(Sign, MsgStatus, State); +upd_bytes(Sign, MsgStatus = #msg_status{msg = _}, State) -> + upd_ram_bytes(Sign, MsgStatus, upd_bytes0(Sign, MsgStatus, State)). + +upd_bytes0(Sign, MsgStatus = #msg_status{is_persistent = IsPersistent}, + State = #vqstate{bytes = Bytes, + persistent_bytes = PBytes}) -> + Diff = Sign * msg_size(MsgStatus), + State#vqstate{bytes = Bytes + Diff, + persistent_bytes = PBytes + one_if(IsPersistent) * Diff}. + +upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) -> + State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}. + +msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. + remove(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, msg_id = MsgId, @@ -1166,58 +1212,66 @@ remove(AckRequired, MsgStatus = #msg_status { PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - + State2 = case AckRequired of + false -> upd_bytes(-1, MsgStatus, State1); + true -> State1 + end, {AckTag, maybe_update_rates( - State1 #vqstate {ram_msg_count = RamMsgCount1, + State2 #vqstate {ram_msg_count = RamMsgCount1, out_counter = OutCount + 1, index_state = IndexState2, len = Len - 1, persistent_count = PCount1})}. -purge_betas_and_deltas(LensByStore, +purge_betas_and_deltas(Stats, State = #vqstate { q3 = Q3, index_state = IndexState, msg_store_clients = MSCState }) -> case ?QUEUE:is_empty(Q3) of - true -> {LensByStore, State}; - false -> {LensByStore1, IndexState1} = - remove_queue_entries(fun ?QUEUE:foldl/3, Q3, - LensByStore, IndexState, MSCState), - purge_betas_and_deltas(LensByStore1, + true -> {Stats, State}; + false -> {Stats1, IndexState1} = remove_queue_entries( + Q3, Stats, IndexState, MSCState), + purge_betas_and_deltas(Stats1, maybe_deltas_to_betas( State #vqstate { q3 = ?QUEUE:new(), index_state = IndexState1 })) end. -remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) -> - {MsgIdsByStore, Delivers, Acks} = - Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q), +remove_queue_entries(Q, {Bytes, RamBytes, PCount, PBytes}, + IndexState, MSCState) -> + {MsgIdsByStore, Bytes1, RamBytes1, PBytes1, Delivers, Acks} = + ?QUEUE:foldl(fun remove_queue_entries1/2, + {orddict:new(), Bytes, RamBytes, PBytes, [], []}, Q), ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> msg_store_remove(MSCState, IsPersistent, MsgIds) end, ok, MsgIdsByStore), - {sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore), + {{Bytes1, + RamBytes1, + PCount - case orddict:find(true, MsgIdsByStore) of + error -> 0; + {ok, Ids} -> length(Ids) + end, + PBytes1}, rabbit_queue_index:ack(Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. remove_queue_entries1( - #msg_status { msg_id = MsgId, seq_id = SeqId, + #msg_status { msg_id = MsgId, seq_id = SeqId, msg = Msg, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, - index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, - {MsgIdsByStore, Delivers, Acks}) -> + index_on_disk = IndexOnDisk, is_persistent = IsPersistent, + msg_props = #message_properties { size = Size } }, + {MsgIdsByStore, Bytes, RamBytes, PBytes, Delivers, Acks}) -> {case MsgOnDisk of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, + Bytes - Size, + RamBytes - Size * one_if(Msg =/= undefined), + PBytes - Size * one_if(IsPersistent), cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), cons_if(IndexOnDisk, SeqId, Acks)}. -sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) -> - orddict:fold( - fun (IsPersistent, MsgIds, LensByStore1) -> - orddict:update_counter(IsPersistent, length(MsgIds), LensByStore1) - end, LensByStore, MsgIdsByStore). - %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- @@ -1289,8 +1343,15 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, none -> gb_trees:get(SeqId, DPA) end. -remove_pending_ack(SeqId, State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> +remove_pending_ack(SeqId, State) -> + {MsgStatus, State1 = #vqstate { persistent_count = PCount }} = + remove_pending_ack0(SeqId, State), + PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent), + {MsgStatus, upd_bytes(-1, MsgStatus, + State1 # vqstate{ persistent_count = PCount1 })}. + +remove_pending_ack0(SeqId, State = #vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA }) -> case gb_trees:lookup(SeqId, RPA) of {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA), {V, State #vqstate { ram_pending_ack = RPA1 }}; @@ -1340,12 +1401,6 @@ accumulate_ack(#msg_status { seq_id = SeqId, end, [MsgId | AllMsgIds]}. -find_persistent_count(LensByStore) -> - case orddict:find(true, LensByStore) of - error -> 0; - {ok, Len} -> Len - end. - %%---------------------------------------------------------------------------- %% Internal plumbing for confirms (aka publisher acks) %%---------------------------------------------------------------------------- @@ -1393,9 +1448,13 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> {Msg, State1} = read_msg(MsgStatus, State), - {MsgStatus#msg_status { msg = Msg }, inc_ram_msg_count(State1)}; + {MsgStatus#msg_status { msg = Msg }, + upd_ram_bytes(1, MsgStatus, inc_ram_msg_count(State1))}; %% [1] publish_alpha(MsgStatus, State) -> {MsgStatus, inc_ram_msg_count(State)}. +%% [1] We increase the ram_bytes here because we paged the message in +%% to requeue it, not purely because we requeued it. Hence in the +%% second head it's already accounted for as already in memory. publish_beta(MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), @@ -1441,7 +1500,7 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> %% Mostly opposite of record_pending_ack/2 msg_from_pending_ack(SeqId, State) -> {#msg_status { msg_props = MsgProps } = MsgStatus, State1} = - remove_pending_ack(SeqId, State), + remove_pending_ack0(SeqId, State), {MsgStatus #msg_status { msg_props = MsgProps #message_properties { needs_confirming = false } }, State1}. @@ -1593,8 +1652,10 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, maybe_write_to_disk(true, false, MsgStatus, State), DPA1 = gb_trees:insert(SeqId, m(trim_msg_status(MsgStatus1)), DPA), limit_ram_acks(Quota - 1, - State1 #vqstate { ram_pending_ack = RPA1, - disk_pending_ack = DPA1 }) + upd_ram_bytes( + -1, MsgStatus1, + State1 #vqstate { ram_pending_ack = RPA1, + disk_pending_ack = DPA1 })) end. permitted_beta_count(#vqstate { len = 0 }) -> @@ -1730,9 +1791,12 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> State1 = #vqstate { ram_msg_count = RamMsgCount }} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = Consumer(MsgStatus2, Qa, - State1 #vqstate { - ram_msg_count = RamMsgCount - 1 }), + State2 = Consumer( + MsgStatus2, Qa, + upd_ram_bytes( + -1, MsgStatus2, + State1 #vqstate { + ram_msg_count = RamMsgCount - 1})), push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, State2) end |