summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-11 16:14:34 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-11 16:14:34 +0100
commit310be9454560f2385dbd3cce7b801aa56204ae6d (patch)
treeb91efaafa840dc44f3620e9c484bce789774411f
parentfb79e947c03f0e52586a0e3a787b91b25e4581c0 (diff)
parent4ba779a41434a20534bef1f959af5e2845a8a2eb (diff)
downloadrabbitmq-server-310be9454560f2385dbd3cce7b801aa56204ae6d.tar.gz
Merge bug25666
-rw-r--r--docs/rabbitmqctl.1.xml12
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_amqqueue_process.erl7
-rw-r--r--src/rabbit_backing_queue.erl1
-rw-r--r--src/rabbit_backing_queue_qc.erl5
-rw-r--r--src/rabbit_queue_index.erl94
-rw-r--r--src/rabbit_recovery_terms.erl22
-rw-r--r--src/rabbit_tests.erl23
-rw-r--r--src/rabbit_variable_queue.erl204
9 files changed, 270 insertions, 100 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 908dad03..afc46e8e 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1206,6 +1206,18 @@
<listitem><para>Total number of persistent messages in the queue (will always be 0 for transient queues).</para></listitem>
</varlistentry>
<varlistentry>
+ <term>message_bytes</term>
+ <listitem><para>Sum of the size of all message bodies in the queue. This does not include the message properties (including headers) or any overhead.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>message_bytes_ram</term>
+ <listitem><para>Like <command>message_bytes</command> but counting only those messages which are in RAM.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>message_bytes_persistent</term>
+ <listitem><para>Like <command>message_bytes</command> but counting only those messages which are persistent.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>consumers</term>
<listitem><para>Number of consumers.</para></listitem>
</varlistentry>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 7a40f9eb..5e41ea93 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -87,7 +87,7 @@
-record(event, {type, props, reference = undefined, timestamp}).
--record(message_properties, {expiry, needs_confirming = false}).
+-record(message_properties, {expiry, needs_confirming = false, size}).
-record(plugin, {name, %% atom()
version, %% string()
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 89b3e554..ba1517af 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -664,9 +664,12 @@ subtract_acks(ChPid, AckTags, State = #q{consumers = Consumers}, Fun) ->
run_message_queue(true, Fun(State1))
end.
-message_properties(Message, Confirm, #q{ttl = TTL}) ->
+message_properties(Message = #basic_message{content = Content},
+ Confirm, #q{ttl = TTL}) ->
+ #content{payload_fragments_rev = PFR} = Content,
#message_properties{expiry = calculate_msg_expiry(Message, TTL),
- needs_confirming = Confirm == eventually}.
+ needs_confirming = Confirm == eventually,
+ size = iolist_size(PFR)}.
calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
#content{properties = Props} =
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 9e5f0813..595a05d3 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -20,6 +20,7 @@
-define(INFO_KEYS, [messages_ram, messages_ready_ram,
messages_unacknowledged_ram, messages_persistent,
+ message_bytes, message_bytes_ram, message_bytes_persistent,
backing_queue_status]).
-ifdef(use_specs).
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 49b71122..622b1b16 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -116,7 +116,8 @@ qc_publish(#state{bqstate = BQ}) ->
[qc_message(),
#message_properties{needs_confirming = frequency([{1, true},
{20, false}]),
- expiry = oneof([undefined | lists:seq(1, 10)])},
+ expiry = oneof([undefined | lists:seq(1, 10)]),
+ size = 10},
false, self(), BQ]}.
qc_publish_multiple(#state{}) ->
@@ -124,7 +125,7 @@ qc_publish_multiple(#state{}) ->
qc_publish_delivered(#state{bqstate = BQ}) ->
{call, ?BQMOD, publish_delivered,
- [qc_message(), #message_properties{}, self(), BQ]}.
+ [qc_message(), #message_properties{size = 10}, self(), BQ]}.
qc_fetch(#state{bqstate = BQ}) ->
{call, ?BQMOD, fetch, [boolean(), BQ]}.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 56c19d3f..0f572866 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -21,7 +21,7 @@
publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
--export([add_queue_ttl/0, avoid_zeroes/0]).
+-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -140,8 +140,11 @@
-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
-%% 16 bytes for md5sum + 8 for expiry
--define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES)).
+-define(SIZE_BYTES, 4).
+-define(SIZE_BITS, (?SIZE_BYTES * 8)).
+
+%% 16 bytes for md5sum + 8 for expiry + 4 for size
+-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)).
%% + 2 for seq, bits and prefix
-define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)).
@@ -168,8 +171,9 @@
%%----------------------------------------------------------------------------
--rabbit_upgrade({add_queue_ttl, local, []}).
--rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}).
+-rabbit_upgrade({add_queue_ttl, local, []}).
+-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}).
+-rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}).
-ifdef(use_specs).
@@ -199,7 +203,8 @@
-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
-spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
contains_predicate(), on_sync_fun()) ->
- {'undefined' | non_neg_integer(), qistate()}).
+ {'undefined' | non_neg_integer(),
+ 'undefined' | non_neg_integer(), qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
-spec(publish/5 :: (rabbit_types:msg_id(), seq_id(),
@@ -415,7 +420,7 @@ init_clean(RecoveredCounts, State) ->
end, Segments, RecoveredCounts),
%% the counts above include transient messages, which would be the
%% wrong thing to return
- {undefined, State1 # qistate { segments = Segments1 }}.
+ {undefined, undefined, State1 # qistate { segments = Segments1 }}.
init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% Recover the journal completely. This will also load segments
@@ -424,7 +429,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% and the journal.
State1 = #qistate { dir = Dir, segments = Segments } =
recover_journal(State),
- {Segments1, Count, DirtyCount} =
+ {Segments1, Count, Bytes, DirtyCount} =
%% Load each segment in turn and filter out messages that are
%% not in the msg_store, by adding acks to the journal. These
%% acks only go to the RAM journal as it doesn't matter if we
@@ -433,16 +438,18 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% dirty count here, so we can call maybe_flush_journal below
%% and avoid unnecessary file system operations.
lists:foldl(
- fun (Seg, {Segments2, CountAcc, DirtyCount}) ->
- {Segment = #segment { unacked = UnackedCount }, Dirty} =
+ fun (Seg, {Segments2, CountAcc, BytesAcc, DirtyCount}) ->
+ {{Segment = #segment { unacked = UnackedCount }, Dirty},
+ UnackedBytes} =
recover_segment(ContainsCheckFun, CleanShutdown,
segment_find_or_new(Seg, Dir, Segments2)),
{segment_store(Segment, Segments2),
- CountAcc + UnackedCount, DirtyCount + Dirty}
- end, {Segments, 0, 0}, all_segment_nums(State1)),
+ CountAcc + UnackedCount,
+ BytesAcc + UnackedBytes, DirtyCount + Dirty}
+ end, {Segments, 0, 0, 0}, all_segment_nums(State1)),
State2 = maybe_flush_journal(State1 #qistate { segments = Segments1,
dirty_count = DirtyCount }),
- {Count, State2}.
+ {Count, Bytes, State2}.
terminate(State = #qistate { journal_handle = JournalHdl,
segments = Segments }) ->
@@ -464,12 +471,16 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack},
- SegmentAndDirtyCount) ->
- recover_message(ContainsCheckFun(MsgId), CleanShutdown,
- Del, RelSeq, SegmentAndDirtyCount)
+ fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, Del, no_ack},
+ {SegmentAndDirtyCount, Bytes}) ->
+ {recover_message(ContainsCheckFun(MsgId), CleanShutdown,
+ Del, RelSeq, SegmentAndDirtyCount),
+ Bytes + case IsPersistent of
+ true -> MsgProps#message_properties.size;
+ false -> 0
+ end}
end,
- {Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0},
+ {{Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, 0},
SegEntries1).
recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount) ->
@@ -549,13 +560,15 @@ scan_segments(Fun, Acc, State) ->
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
-create_pub_record_body(MsgId, #message_properties { expiry = Expiry }) ->
- [MsgId, expiry_to_binary(Expiry)].
+create_pub_record_body(MsgId, #message_properties { expiry = Expiry,
+ size = Size }) ->
+ [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>].
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
-parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) ->
+parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
+ Size:?SIZE_BITS>>) ->
%% work around for binary data fragmentation. See
%% rabbit_msg_file:read_next/2
<<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
@@ -563,7 +576,8 @@ parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) ->
?NO_EXPIRY -> undefined;
X -> X
end,
- {MsgId, #message_properties { expiry = Exp }}.
+ {MsgId, #message_properties { expiry = Exp,
+ size = Size }}.
%%----------------------------------------------------------------------------
%% journal manipulation
@@ -1064,6 +1078,42 @@ avoid_zeroes_segment(<<0:?REL_SEQ_ONLY_PREFIX_BITS,
avoid_zeroes_segment(_) ->
stop.
+%% At upgrade time we just define every message's size as 0 - that
+%% will save us a load of faff with the message store, and means we
+%% can actually use the clean recovery terms in VQ. It does mean we
+%% don't count message bodies from before the migration, but we can
+%% live with that.
+store_msg_size() ->
+ foreach_queue_index({fun store_msg_size_journal/1,
+ fun store_msg_size_segment/1}).
+
+store_msg_size_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_size_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_size_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
+ Rest/binary>>) ->
+ {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest};
+store_msg_size_journal(_) ->
+ stop.
+
+store_msg_size_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Rest/binary>>) ->
+ {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest};
+store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+store_msg_size_segment(_) ->
+ stop.
+
+
%%----------------------------------------------------------------------------
foreach_queue_index(Funs) ->
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index bbf38f58..f169e13d 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -23,11 +23,14 @@
-export([start/0, stop/0, store/2, read/1, clear/0]).
--export([upgrade_recovery_terms/0, start_link/0]).
+-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-export([upgrade_recovery_terms/0, persistent_bytes/0]).
+
-rabbit_upgrade({upgrade_recovery_terms, local, []}).
+-rabbit_upgrade({persistent_bytes, local, [upgrade_recovery_terms]}).
%%----------------------------------------------------------------------------
@@ -61,6 +64,8 @@ clear() ->
dets:delete_all_objects(?MODULE),
flush().
+start_link() -> gen_server:start_link(?MODULE, [], []).
+
%%----------------------------------------------------------------------------
upgrade_recovery_terms() ->
@@ -84,7 +89,20 @@ upgrade_recovery_terms() ->
close_table()
end.
-start_link() -> gen_server:start_link(?MODULE, [], []).
+persistent_bytes() -> dets_upgrade(fun persistent_bytes/1).
+persistent_bytes(Props) -> Props ++ [{persistent_bytes, 0}].
+
+dets_upgrade(Fun)->
+ open_table(),
+ try
+ ok = dets:foldl(fun ({DirBaseName, Terms}, Acc) ->
+ store(DirBaseName, Fun(Terms)),
+ Acc
+ end, ok, ?MODULE),
+ ok
+ after
+ close_table()
+ end.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 21ac3bd4..a186fb7a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2208,13 +2208,13 @@ restart_test_queue(Qi) ->
empty_test_queue() ->
ok = rabbit_variable_queue:stop(),
{ok, _} = rabbit_variable_queue:start([]),
- {0, Qi} = init_test_queue(),
+ {0, 0, Qi} = init_test_queue(),
_ = rabbit_queue_index:delete_and_terminate(Qi),
ok.
with_empty_test_queue(Fun) ->
ok = empty_test_queue(),
- {0, Qi} = init_test_queue(),
+ {0, 0, Qi} = init_test_queue(),
rabbit_queue_index:delete_and_terminate(Fun(Qi)).
restart_app() ->
@@ -2233,7 +2233,8 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) ->
MsgId = rabbit_guid:gen(),
QiM = rabbit_queue_index:publish(
- MsgId, SeqId, #message_properties{}, Persistent, QiN),
+ MsgId, SeqId, #message_properties{size = 10},
+ Persistent, QiN),
ok = rabbit_msg_store:write(MsgId, MsgId, MSCState),
{QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]}
end, {Qi, []}, SeqIds),
@@ -2255,7 +2256,7 @@ test_queue_index_props() ->
with_empty_test_queue(
fun(Qi0) ->
MsgId = rabbit_guid:gen(),
- Props = #message_properties{expiry=12345},
+ Props = #message_properties{expiry=12345, size = 10},
Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0),
{[{MsgId, 1, Props, _, _}], Qi2} =
rabbit_queue_index:read(1, 2, Qi1),
@@ -2285,7 +2286,7 @@ test_queue_index() ->
ok = verify_read_with_published(false, false, ReadA,
lists:reverse(SeqIdsMsgIdsA)),
%% should get length back as 0, as all the msgs were transient
- {0, Qi6} = restart_test_queue(Qi4),
+ {0, 0, Qi6} = restart_test_queue(Qi4),
{0, 0, Qi7} = rabbit_queue_index:bounds(Qi6),
{Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7),
{0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8),
@@ -2294,7 +2295,8 @@ test_queue_index() ->
lists:reverse(SeqIdsMsgIdsB)),
%% should get length back as MostOfASegment
LenB = length(SeqIdsB),
- {LenB, Qi12} = restart_test_queue(Qi10),
+ BytesB = LenB * 10,
+ {LenB, BytesB, Qi12} = restart_test_queue(Qi10),
{0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12),
Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13),
{ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14),
@@ -2306,7 +2308,7 @@ test_queue_index() ->
{0, 0, Qi18} = rabbit_queue_index:bounds(Qi17),
%% should get length back as 0 because all persistent
%% msgs have been acked
- {0, Qi19} = restart_test_queue(Qi18),
+ {0, 0, Qi19} = restart_test_queue(Qi18),
Qi19
end),
@@ -2378,11 +2380,11 @@ test_queue_index() ->
true, Qi0),
Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1),
Qi3 = rabbit_queue_index:ack([0], Qi2),
- {5, Qi4} = restart_test_queue(Qi3),
+ {5, 50, Qi4} = restart_test_queue(Qi3),
{Qi5, _SeqIdsMsgIdsF} = queue_index_publish([3,6,8], true, Qi4),
Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5),
Qi7 = rabbit_queue_index:ack([1,2,3], Qi6),
- {5, Qi8} = restart_test_queue(Qi7),
+ {5, 50, Qi8} = restart_test_queue(Qi7),
Qi8
end),
@@ -2417,7 +2419,8 @@ variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) ->
false -> 1
end},
PayloadFun(N)),
- PropFun(N, #message_properties{}), false, self(), VQN)
+ PropFun(N, #message_properties{size = 10}),
+ false, self(), VQN)
end, VQ, lists:seq(Start, Start + Count - 1))).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 871f909b..c52862ab 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -254,13 +254,16 @@
durable,
transient_threshold,
- len,
- persistent_count,
+ len, %% w/o unacked
+ bytes, %% w unacked
+ persistent_count, %% w unacked
+ persistent_bytes, %% w unacked
target_ram_count,
- ram_msg_count,
+ ram_msg_count, %% w/o unacked
ram_msg_count_prev,
ram_ack_count_prev,
+ ram_bytes, %% w unacked
out_counter,
in_counter,
rates,
@@ -343,11 +346,15 @@
transient_threshold :: non_neg_integer(),
len :: non_neg_integer(),
+ bytes :: non_neg_integer(),
persistent_count :: non_neg_integer(),
+ persistent_bytes :: non_neg_integer(),
target_ram_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
+ ram_ack_count_prev :: non_neg_integer(),
+ ram_bytes :: non_neg_integer(),
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
rates :: rates(),
@@ -425,7 +432,7 @@ init(Queue, Recover, AsyncCallback) ->
init(#amqqueue { name = QueueName, durable = IsDurable }, new,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
- init(IsDurable, IndexState, 0, [],
+ init(IsDurable, IndexState, 0, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
MsgOnDiskFun, AsyncCallback);
@@ -440,7 +447,7 @@ init(#amqqueue { name = QueueName, durable = true }, Terms,
MsgOnDiskFun, AsyncCallback),
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
undefined, AsyncCallback),
- {DeltaCount, IndexState} =
+ {DeltaCount, DeltaBytes, IndexState} =
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
@@ -448,7 +455,7 @@ init(#amqqueue { name = QueueName, durable = true }, Terms,
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, RecoveryTerms,
+ init(true, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
PersistentClient, TransientClient).
process_recovery_terms(Terms=non_clean_shutdown) ->
@@ -461,6 +468,7 @@ process_recovery_terms(Terms) ->
terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
+ persistent_bytes = PBytes,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
purge_pending_ack(true, State),
@@ -470,7 +478,9 @@ terminate(_Reason, State) ->
rabbit_msg_store:client_ref(MSCStateP)
end,
ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT),
- Terms = [{persistent_ref, PRef}, {persistent_count, PCount}],
+ Terms = [{persistent_ref, PRef},
+ {persistent_count, PCount},
+ {persistent_bytes, PBytes}],
a(State1 #vqstate { index_state = rabbit_queue_index:terminate(
Terms, IndexState),
msg_store_clients = undefined }).
@@ -498,28 +508,36 @@ purge(State = #vqstate { q4 = Q4,
index_state = IndexState,
msg_store_clients = MSCState,
len = Len,
- persistent_count = PCount }) ->
+ bytes = Bytes,
+ ram_bytes = RamBytes,
+ persistent_count = PCount,
+ persistent_bytes = PBytes }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
- {PCount1, IndexState1} =
- remove_queue_entries(Q4, PCount, IndexState, MSCState),
+ Stats = {Bytes, RamBytes, PCount, PBytes},
+ {Stats1, IndexState1} =
+ remove_queue_entries(Q4, Stats, IndexState, MSCState),
+
+ {Stats2, State1 = #vqstate { q1 = Q1,
+ index_state = IndexState2,
+ msg_store_clients = MSCState1 }} =
- {PCount2, State1 = #vqstate { q1 = Q1,
- index_state = IndexState2,
- msg_store_clients = MSCState1 }} =
purge_betas_and_deltas(
- PCount1, State #vqstate { q4 = ?QUEUE:new(),
- index_state = IndexState1 }),
+ Stats1, State #vqstate { q4 = ?QUEUE:new(),
+ index_state = IndexState1 }),
- {PCount3, IndexState3} =
- remove_queue_entries(Q1, PCount2, IndexState2, MSCState1),
+ {{Bytes3, RamBytes3, PCount3, PBytes3}, IndexState3} =
+ remove_queue_entries(Q1, Stats2, IndexState2, MSCState1),
{Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
index_state = IndexState3,
len = 0,
+ bytes = Bytes3,
ram_msg_count = 0,
- persistent_count = PCount3 })}.
+ ram_bytes = RamBytes3,
+ persistent_count = PCount3,
+ persistent_bytes = PBytes3 })}.
purge_acks(State) -> a(purge_pending_ack(false, State)).
@@ -542,11 +560,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
InCount1 = InCount + 1,
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount1,
- persistent_count = PCount1,
- unconfirmed = UC1 }),
+ State3 = upd_bytes(
+ 1, MsgStatus1,
+ inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 })),
a(reduce_memory_use(maybe_update_rates(State3))).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
@@ -565,11 +585,12 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = State2 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- unconfirmed = UC1 },
+ State3 = upd_bytes(1, MsgStatus,
+ State2 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 }),
{SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}.
discard(_MsgId, _ChPid, State) -> State.
@@ -822,6 +843,12 @@ info(messages_ram, State) ->
info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State);
info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
PersistentCount;
+info(message_bytes, #vqstate{bytes = Bytes}) ->
+ Bytes;
+info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
+ RamBytes;
+info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
+ PersistentBytes;
info(backing_queue_status, #vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
@@ -858,8 +885,11 @@ is_duplicate(_Msg, State) -> {false, State}.
a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
+ bytes = Bytes,
persistent_count = PersistentCount,
- ram_msg_count = RamMsgCount }) ->
+ persistent_bytes = PersistentBytes,
+ ram_msg_count = RamMsgCount,
+ ram_bytes = RamBytes}) ->
E1 = ?QUEUE:is_empty(Q1),
E2 = ?QUEUE:is_empty(Q2),
ED = Delta#delta.count == 0,
@@ -873,9 +903,13 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
true = LZ == (E3 and E4),
true = Len >= 0,
+ true = Bytes >= 0,
true = PersistentCount >= 0,
+ true = PersistentBytes >= 0,
true = RamMsgCount >= 0,
true = RamMsgCount =< Len,
+ true = RamBytes >= 0,
+ true = RamBytes =< Bytes,
State.
@@ -1023,15 +1057,17 @@ expand_delta(_SeqId, #delta { count = Count } = Delta) ->
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-init(IsDurable, IndexState, DeltaCount, Terms,
+init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
- DeltaCount1 =
+ {DeltaCount1, DeltaBytes1} =
case Terms of
- non_clean_shutdown -> DeltaCount;
- _ -> proplists:get_value(persistent_count,
- Terms, DeltaCount)
+ non_clean_shutdown -> {DeltaCount, DeltaBytes};
+ _ -> {proplists:get_value(persistent_count,
+ Terms, DeltaCount),
+ proplists:get_value(persistent_bytes,
+ Terms, DeltaBytes)}
end,
Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
true -> ?BLANK_DELTA;
@@ -1056,11 +1092,14 @@ init(IsDurable, IndexState, DeltaCount, Terms,
len = DeltaCount1,
persistent_count = DeltaCount1,
+ bytes = DeltaBytes1,
+ persistent_bytes = DeltaBytes1,
target_ram_count = infinity,
ram_msg_count = 0,
ram_msg_count_prev = 0,
ram_ack_count_prev = 0,
+ ram_bytes = 0,
out_counter = 0,
in_counter = 0,
rates = blank_rates(Now),
@@ -1085,9 +1124,11 @@ in_r(MsgStatus = #msg_status { msg = undefined },
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
- inc_ram_msg_count(
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
- msg = Msg }, Q4a) })
+ upd_ram_bytes(
+ 1, MsgStatus,
+ inc_ram_msg_count(
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
+ msg = Msg }, Q4a) }))
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1118,6 +1159,23 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
State#vqstate{ram_msg_count = RamMsgCount + 1}.
+upd_bytes(Sign, MsgStatus = #msg_status{msg = undefined}, State) ->
+ upd_bytes0(Sign, MsgStatus, State);
+upd_bytes(Sign, MsgStatus = #msg_status{msg = _}, State) ->
+ upd_ram_bytes(Sign, MsgStatus, upd_bytes0(Sign, MsgStatus, State)).
+
+upd_bytes0(Sign, MsgStatus = #msg_status{is_persistent = IsPersistent},
+ State = #vqstate{bytes = Bytes,
+ persistent_bytes = PBytes}) ->
+ Diff = Sign * msg_size(MsgStatus),
+ State#vqstate{bytes = Bytes + Diff,
+ persistent_bytes = PBytes + one_if(IsPersistent) * Diff}.
+
+upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) ->
+ State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}.
+
+msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size.
+
remove(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,
msg_id = MsgId,
@@ -1159,51 +1217,63 @@ remove(AckRequired, MsgStatus = #msg_status {
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
-
+ State2 = case AckRequired of
+ false -> upd_bytes(-1, MsgStatus, State1);
+ true -> State1
+ end,
{AckTag, maybe_update_rates(
- State1 #vqstate {ram_msg_count = RamMsgCount1,
+ State2 #vqstate {ram_msg_count = RamMsgCount1,
out_counter = OutCount + 1,
index_state = IndexState2,
len = Len - 1,
persistent_count = PCount1})}.
-purge_betas_and_deltas(PCount,
+purge_betas_and_deltas(Stats,
State = #vqstate { q3 = Q3,
index_state = IndexState,
msg_store_clients = MSCState }) ->
case ?QUEUE:is_empty(Q3) of
- true -> {PCount, State};
- false -> {PCount1, IndexState1} = remove_queue_entries(
- Q3, PCount, IndexState, MSCState),
- purge_betas_and_deltas(PCount1,
+ true -> {Stats, State};
+ false -> {Stats1, IndexState1} = remove_queue_entries(
+ Q3, Stats, IndexState, MSCState),
+ purge_betas_and_deltas(Stats1,
maybe_deltas_to_betas(
State #vqstate {
q3 = ?QUEUE:new(),
index_state = IndexState1 }))
end.
-remove_queue_entries(Q, PCount, IndexState, MSCState) ->
- {MsgIdsByStore, Delivers, Acks} =
- ?QUEUE:foldl(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q),
+remove_queue_entries(Q, {Bytes, RamBytes, PCount, PBytes},
+ IndexState, MSCState) ->
+ {MsgIdsByStore, Bytes1, RamBytes1, PBytes1, Delivers, Acks} =
+ ?QUEUE:foldl(fun remove_queue_entries1/2,
+ {orddict:new(), Bytes, RamBytes, PBytes, [], []}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
- {PCount - case orddict:find(true, MsgIdsByStore) of
- error -> 0;
- {ok, Ids} -> length(Ids)
- end,
+ {{Bytes1,
+ RamBytes1,
+ PCount - case orddict:find(true, MsgIdsByStore) of
+ error -> 0;
+ {ok, Ids} -> length(Ids)
+ end,
+ PBytes1},
rabbit_queue_index:ack(Acks,
rabbit_queue_index:deliver(Delivers, IndexState))}.
remove_queue_entries1(
- #msg_status { msg_id = MsgId, seq_id = SeqId,
+ #msg_status { msg_id = MsgId, seq_id = SeqId, msg = Msg,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
- {MsgIdsByStore, Delivers, Acks}) ->
+ index_on_disk = IndexOnDisk, is_persistent = IsPersistent,
+ msg_props = #message_properties { size = Size } },
+ {MsgIdsByStore, Bytes, RamBytes, PBytes, Delivers, Acks}) ->
{case MsgOnDisk of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
+ Bytes - Size,
+ RamBytes - Size * one_if(Msg =/= undefined),
+ PBytes - Size * one_if(IsPersistent),
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
cons_if(IndexOnDisk, SeqId, Acks)}.
@@ -1283,7 +1353,8 @@ remove_pending_ack(true, SeqId, State) ->
{MsgStatus, State1 = #vqstate { persistent_count = PCount }} =
remove_pending_ack(false, SeqId, State),
PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent),
- {MsgStatus, State1 # vqstate{ persistent_count = PCount1 }};
+ {MsgStatus, upd_bytes(-1, MsgStatus,
+ State1 # vqstate{ persistent_count = PCount1 })};
remove_pending_ack(false, SeqId, State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA }) ->
case gb_trees:lookup(SeqId, RPA) of
@@ -1382,9 +1453,15 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
{Msg, State1} = read_msg(MsgStatus, State),
- {MsgStatus#msg_status { msg = Msg }, inc_ram_msg_count(State1)};
+ {MsgStatus#msg_status { msg = Msg },
+ upd_ram_bytes(1, MsgStatus, inc_ram_msg_count(State1))}; %% [1]
publish_alpha(MsgStatus, State) ->
{MsgStatus, inc_ram_msg_count(State)}.
+%% [1] We increase the ram_bytes here because we paged the message in
+%% to requeue it, not purely because we requeued it. Hence in the
+%% second head it's already accounted for as already in memory. OTOH
+%% ram_msg_count does not include unacked messages, so it needs
+%% incrementing in both heads.
publish_beta(MsgStatus, State) ->
{MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
@@ -1582,8 +1659,10 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
maybe_write_to_disk(true, false, MsgStatus, State),
DPA1 = gb_trees:insert(SeqId, m(trim_msg_status(MsgStatus1)), DPA),
limit_ram_acks(Quota - 1,
- State1 #vqstate { ram_pending_ack = RPA1,
- disk_pending_ack = DPA1 })
+ upd_ram_bytes(
+ -1, MsgStatus1,
+ State1 #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1 }))
end.
permitted_beta_count(#vqstate { len = 0 }) ->
@@ -1719,9 +1798,12 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
State1 = #vqstate { ram_msg_count = RamMsgCount }} =
maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = Consumer(MsgStatus2, Qa,
- State1 #vqstate {
- ram_msg_count = RamMsgCount - 1 }),
+ State2 = Consumer(
+ MsgStatus2, Qa,
+ upd_ram_bytes(
+ -1, MsgStatus2,
+ State1 #vqstate {
+ ram_msg_count = RamMsgCount - 1})),
push_alphas_to_betas(Generator, Consumer, Quota - 1,
Qa, State2)
end