summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-07-23 16:20:17 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-07-23 16:20:17 +0100
commit003ef115e3aeecc9465992e27c1f0b1544791eea (patch)
treedcbdee99159a10c785fb60dac39a19d25a55173e
parent9fb2f35c1afa4c911850edcb2adf6f8e55e7f666 (diff)
downloadrabbitmq-server-003ef115e3aeecc9465992e27c1f0b1544791eea.tar.gz
First pass at queue-size-in-bytes. Only lightly tested, and with no upgrade step for QI. But I wanted to commit something...
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_amqqueue_process.erl7
-rw-r--r--src/rabbit_queue_index.erl53
-rw-r--r--src/rabbit_variable_queue.erl242
4 files changed, 188 insertions, 116 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 7a40f9eb..5e41ea93 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -87,7 +87,7 @@
-record(event, {type, props, reference = undefined, timestamp}).
--record(message_properties, {expiry, needs_confirming = false}).
+-record(message_properties, {expiry, needs_confirming = false, size}).
-record(plugin, {name, %% atom()
version, %% string()
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4082c53d..04a38b78 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -665,9 +665,12 @@ subtract_acks(ChPid, AckTags, State = #q{consumers = Consumers}, Fun) ->
run_message_queue(true, Fun(State1))
end.
-message_properties(Message, Confirm, #q{ttl = TTL}) ->
+message_properties(Message = #basic_message{content = Content},
+ Confirm, #q{ttl = TTL}) ->
+ #content{payload_fragments_rev = PFR} = Content,
#message_properties{expiry = calculate_msg_expiry(Message, TTL),
- needs_confirming = Confirm == eventually}.
+ needs_confirming = Confirm == eventually,
+ size = iolist_size(PFR)}.
calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
#content{properties = Props} =
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 56c19d3f..322c0f8a 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -140,8 +140,11 @@
-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
-%% 16 bytes for md5sum + 8 for expiry
--define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES)).
+-define(SIZE_BYTES, 4).
+-define(SIZE_BITS, (?SIZE_BYTES * 8)).
+
+%% 16 bytes for md5sum + 8 for expiry + 4 for size
+-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)).
%% + 2 for seq, bits and prefix
-define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)).
@@ -199,7 +202,8 @@
-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
-spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
contains_predicate(), on_sync_fun()) ->
- {'undefined' | non_neg_integer(), qistate()}).
+ {'undefined' | non_neg_integer(),
+ 'undefined' | non_neg_integer(), qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
-spec(publish/5 :: (rabbit_types:msg_id(), seq_id(),
@@ -410,12 +414,13 @@ init_clean(RecoveredCounts, State) ->
lists:foldl(
fun ({Seg, UnackedCount}, SegmentsN) ->
Segment = segment_find_or_new(Seg, Dir, SegmentsN),
- segment_store(Segment #segment { unacked = UnackedCount },
- SegmentsN)
+ segment_store(
+ Segment #segment { unacked = UnackedCount },
+ SegmentsN)
end, Segments, RecoveredCounts),
%% the counts above include transient messages, which would be the
%% wrong thing to return
- {undefined, State1 # qistate { segments = Segments1 }}.
+ {undefined, undefined, State1 # qistate { segments = Segments1 }}.
init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% Recover the journal completely. This will also load segments
@@ -424,7 +429,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% and the journal.
State1 = #qistate { dir = Dir, segments = Segments } =
recover_journal(State),
- {Segments1, Count, DirtyCount} =
+ {Segments1, Count, Bytes, DirtyCount} =
%% Load each segment in turn and filter out messages that are
%% not in the msg_store, by adding acks to the journal. These
%% acks only go to the RAM journal as it doesn't matter if we
@@ -433,16 +438,18 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% dirty count here, so we can call maybe_flush_journal below
%% and avoid unnecessary file system operations.
lists:foldl(
- fun (Seg, {Segments2, CountAcc, DirtyCount}) ->
- {Segment = #segment { unacked = UnackedCount }, Dirty} =
+ fun (Seg, {Segments2, CountAcc, BytesAcc, DirtyCount}) ->
+ {{Segment = #segment { unacked = UnackedCount }, Dirty},
+ UnackedBytes} =
recover_segment(ContainsCheckFun, CleanShutdown,
segment_find_or_new(Seg, Dir, Segments2)),
{segment_store(Segment, Segments2),
- CountAcc + UnackedCount, DirtyCount + Dirty}
- end, {Segments, 0, 0}, all_segment_nums(State1)),
+ CountAcc + UnackedCount,
+ BytesAcc + UnackedBytes, DirtyCount + Dirty}
+ end, {Segments, 0, 0, 0}, all_segment_nums(State1)),
State2 = maybe_flush_journal(State1 #qistate { segments = Segments1,
dirty_count = DirtyCount }),
- {Count, State2}.
+ {Count, Bytes, State2}.
terminate(State = #qistate { journal_handle = JournalHdl,
segments = Segments }) ->
@@ -464,12 +471,13 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack},
- SegmentAndDirtyCount) ->
- recover_message(ContainsCheckFun(MsgId), CleanShutdown,
- Del, RelSeq, SegmentAndDirtyCount)
+ fun (RelSeq, {{MsgId, MsgProps, _IsPersistent}, Del, no_ack},
+ {SegmentAndDirtyCount, Bytes}) ->
+ {recover_message(ContainsCheckFun(MsgId), CleanShutdown,
+ Del, RelSeq, SegmentAndDirtyCount),
+ Bytes + MsgProps#message_properties.size}
end,
- {Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0},
+ {{Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, 0},
SegEntries1).
recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount) ->
@@ -549,13 +557,15 @@ scan_segments(Fun, Acc, State) ->
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
-create_pub_record_body(MsgId, #message_properties { expiry = Expiry }) ->
- [MsgId, expiry_to_binary(Expiry)].
+create_pub_record_body(MsgId, #message_properties { expiry = Expiry,
+ size = Size}) ->
+ [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>].
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
-parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) ->
+parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
+ Size:?SIZE_BITS>>) ->
%% work around for binary data fragmentation. See
%% rabbit_msg_file:read_next/2
<<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
@@ -563,7 +573,8 @@ parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) ->
?NO_EXPIRY -> undefined;
X -> X
end,
- {MsgId, #message_properties { expiry = Exp }}.
+ {MsgId, #message_properties { expiry = Exp,
+ size = Size }}.
%%----------------------------------------------------------------------------
%% journal manipulation
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ede69748..1465e884 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -255,12 +255,15 @@
transient_threshold,
len,
+ bytes,
persistent_count,
+ persistent_bytes,
target_ram_count,
ram_msg_count,
ram_msg_count_prev,
ram_ack_count_prev,
+ ram_msg_bytes,
out_counter,
in_counter,
rates,
@@ -343,7 +346,9 @@
transient_threshold :: non_neg_integer(),
len :: non_neg_integer(),
+ bytes :: non_neg_integer(),
persistent_count :: non_neg_integer(),
+ persistent_bytes :: non_neg_integer(),
target_ram_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
@@ -425,7 +430,7 @@ init(Queue, Recover, AsyncCallback) ->
init(#amqqueue { name = QueueName, durable = IsDurable }, new,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
- init(IsDurable, IndexState, 0, [],
+ init(IsDurable, IndexState, 0, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
MsgOnDiskFun, AsyncCallback);
@@ -440,7 +445,7 @@ init(#amqqueue { name = QueueName, durable = true }, Terms,
MsgOnDiskFun, AsyncCallback),
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
undefined, AsyncCallback),
- {DeltaCount, IndexState} =
+ {DeltaCount, DeltaBytes, IndexState} =
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
@@ -448,7 +453,7 @@ init(#amqqueue { name = QueueName, durable = true }, Terms,
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, RecoveryTerms,
+ init(true, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
PersistentClient, TransientClient).
process_recovery_terms(Terms=non_clean_shutdown) ->
@@ -461,6 +466,7 @@ process_recovery_terms(Terms) ->
terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
+ persistent_bytes = PBytes,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
purge_pending_ack(true, State),
@@ -470,7 +476,9 @@ terminate(_Reason, State) ->
rabbit_msg_store:client_ref(MSCStateP)
end,
ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT),
- Terms = [{persistent_ref, PRef}, {persistent_count, PCount}],
+ Terms = [{persistent_ref, PRef},
+ {persistent_count, PCount},
+ {persistent_bytes, PBytes}],
a(State1 #vqstate { index_state = rabbit_queue_index:terminate(
Terms, IndexState),
msg_store_clients = undefined }).
@@ -498,38 +506,45 @@ purge(State = #vqstate { q4 = Q4,
index_state = IndexState,
msg_store_clients = MSCState,
len = Len,
- persistent_count = PCount }) ->
+ persistent_count = PCount,
+ persistent_bytes = PBytes }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
- {LensByStore, IndexState1} = remove_queue_entries(
- fun ?QUEUE:foldl/3, Q4,
- orddict:new(), IndexState, MSCState),
- {LensByStore1, State1 = #vqstate { q1 = Q1,
- index_state = IndexState2,
- msg_store_clients = MSCState1 }} =
- purge_betas_and_deltas(LensByStore,
- State #vqstate { q4 = ?QUEUE:new(),
- index_state = IndexState1 }),
- {LensByStore2, IndexState3} = remove_queue_entries(
- fun ?QUEUE:foldl/3, Q1,
- LensByStore1, IndexState2, MSCState1),
- PCount1 = PCount - find_persistent_count(LensByStore2),
+ {PCount1, PBytes1, IndexState1} =
+ remove_queue_entries(Q4, PCount, PBytes, IndexState, MSCState),
+
+ {PCount2, PBytes2, State1 = #vqstate { q1 = Q1,
+ index_state = IndexState2,
+ msg_store_clients = MSCState1 }} =
+ purge_betas_and_deltas(
+ PCount1, PBytes1,State #vqstate { q4 = ?QUEUE:new(),
+ index_state = IndexState1 }),
+
+ {PCount3, PBytes3, IndexState3} =
+ remove_queue_entries(Q1, PCount2, PBytes2, IndexState2, MSCState1),
+
{Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
index_state = IndexState3,
len = 0,
+ bytes = 0,
ram_msg_count = 0,
- persistent_count = PCount1 })}.
+ ram_msg_bytes = 0,
+ persistent_count = PCount3,
+ persistent_bytes = PBytes3 })}.
purge_acks(State) -> a(purge_pending_ack(false, State)).
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
- MsgProps = #message_properties { needs_confirming = NeedsConfirming },
+ MsgProps = #message_properties { needs_confirming = NeedsConfirming,
+ size = Size},
IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
next_seq_id = SeqId,
len = Len,
+ bytes = Bytes,
in_counter = InCount,
persistent_count = PCount,
+ persistent_bytes = PBytes,
durable = IsDurable,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
@@ -541,22 +556,28 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
end,
InCount1 = InCount + 1,
PCount1 = PCount + one_if(IsPersistent1),
+ PBytes1 = PBytes + Size * one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount1,
- persistent_count = PCount1,
- unconfirmed = UC1 }),
+ State3 = upd_ram_counts(1, MsgStatus,
+ State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ bytes = Bytes + Size,
+ in_counter = InCount1,
+ persistent_count = PCount1,
+ persistent_bytes = PBytes1,
+ unconfirmed = UC1 }),
a(reduce_memory_use(maybe_update_rates(State3))).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
MsgProps = #message_properties {
- needs_confirming = NeedsConfirming },
+ needs_confirming = NeedsConfirming,
+ size = Size },
_ChPid, State = #vqstate { next_seq_id = SeqId,
out_counter = OutCount,
in_counter = InCount,
persistent_count = PCount,
+ persistent_bytes = PBytes,
durable = IsDurable,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
@@ -564,11 +585,13 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
+ PBytes1 = PBytes + Size * one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
State3 = State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
+ persistent_bytes = PBytes1,
unconfirmed = UC1 },
{SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}.
@@ -638,7 +661,6 @@ ack([SeqId], State) ->
index_on_disk = IndexOnDisk },
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
- persistent_count = PCount,
ack_out_counter = AckOutCount }} =
remove_pending_ack(SeqId, State),
IndexState1 = case IndexOnDisk of
@@ -649,16 +671,13 @@ ack([SeqId], State) ->
true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
false -> ok
end,
- PCount1 = PCount - one_if(IsPersistent),
{[MsgId],
a(State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
ack_out_counter = AckOutCount + 1 })};
ack(AckTags, State) ->
{{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
- persistent_count = PCount,
ack_out_counter = AckOutCount }} =
lists:foldl(
fun (SeqId, {Acc, State2}) ->
@@ -668,11 +687,8 @@ ack(AckTags, State) ->
IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
- PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
- orddict:new(), MsgIdsByStore)),
{lists:reverse(AllMsgIds),
a(State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
requeue(AckTags, #vqstate { delta = Delta,
@@ -824,12 +840,15 @@ msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
status(#vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
+ bytes = Bytes,
ram_pending_ack = RPA,
disk_pending_ack = DPA,
target_ram_count = TargetRamCount,
ram_msg_count = RamMsgCount,
+ ram_msg_bytes = RamMsgBytes,
next_seq_id = NextSeqId,
persistent_count = PersistentCount,
+ persistent_bytes = PersistentBytes,
rates = #rates { in = AvgIngressRate,
out = AvgEgressRate,
ack_in = AvgAckIngressRate,
@@ -841,12 +860,15 @@ status(#vqstate {
{q3 , ?QUEUE:len(Q3)},
{q4 , ?QUEUE:len(Q4)},
{len , Len},
+ {bytes , Bytes},
{pending_acks , gb_trees:size(RPA) + gb_trees:size(DPA)},
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
+ {ram_msg_bytes , RamMsgBytes},
{ram_ack_count , gb_trees:size(RPA)},
{next_seq_id , NextSeqId},
{persistent_count , PersistentCount},
+ {persistent_bytes , PersistentBytes},
{avg_ingress_rate , AvgIngressRate},
{avg_egress_rate , AvgEgressRate},
{avg_ack_ingress_rate, AvgAckIngressRate},
@@ -863,8 +885,10 @@ is_duplicate(_Msg, State) -> {false, State}.
a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
+ bytes = Bytes,
persistent_count = PersistentCount,
- ram_msg_count = RamMsgCount }) ->
+ ram_msg_count = RamMsgCount,
+ ram_msg_bytes = RamMsgBytes}) ->
E1 = ?QUEUE:is_empty(Q1),
E2 = ?QUEUE:is_empty(Q2),
ED = Delta#delta.count == 0,
@@ -878,9 +902,12 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
true = LZ == (E3 and E4),
true = Len >= 0,
+ true = Bytes >= 0,
true = PersistentCount >= 0,
true = RamMsgCount >= 0,
true = RamMsgCount =< Len,
+ true = RamMsgBytes >= 0,
+ true = RamMsgBytes =< Bytes,
State.
@@ -1028,15 +1055,17 @@ expand_delta(_SeqId, #delta { count = Count } = Delta) ->
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-init(IsDurable, IndexState, DeltaCount, Terms,
+init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
- DeltaCount1 =
+ {DeltaCount1, DeltaBytes1} =
case Terms of
- non_clean_shutdown -> DeltaCount;
- _ -> proplists:get_value(persistent_count,
- Terms, DeltaCount)
+ non_clean_shutdown -> {DeltaCount, DeltaBytes};
+ _ -> {proplists:get_value(persistent_count,
+ Terms, DeltaCount),
+ proplists:get_value(persistent_bytes,
+ Terms, DeltaBytes)}
end,
Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
true -> ?BLANK_DELTA;
@@ -1061,11 +1090,14 @@ init(IsDurable, IndexState, DeltaCount, Terms,
len = DeltaCount1,
persistent_count = DeltaCount1,
+ bytes = DeltaBytes1,
+ persistent_bytes = DeltaBytes1,
target_ram_count = infinity,
ram_msg_count = 0,
ram_msg_count_prev = 0,
ram_ack_count_prev = 0,
+ ram_msg_bytes = 0,
out_counter = 0,
in_counter = 0,
rates = blank_rates(Now),
@@ -1090,7 +1122,8 @@ in_r(MsgStatus = #msg_status { msg = undefined },
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
- inc_ram_msg_count(
+ upd_ram_counts(
+ 1, MsgStatus,
State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
msg = Msg }, Q4a) })
end;
@@ -1120,8 +1153,15 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
msg_store_read(MSCState, IsPersistent, MsgId),
{Msg, State #vqstate {msg_store_clients = MSCState1}}.
-inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
- State#vqstate{ram_msg_count = RamMsgCount + 1}.
+upd_ram_counts(Sign, #msg_status{msg_props = #message_properties{size = Size}},
+ State = #vqstate{ram_msg_count = RamMsgCount,
+ ram_msg_bytes = RamMsgBytes}) ->
+ State#vqstate{ram_msg_count = RamMsgCount + Sign,
+ ram_msg_bytes = RamMsgBytes + Sign * Size}.
+
+inc_bytes(#msg_status{msg_props = #message_properties{size = Size}},
+ State = #vqstate{bytes = Bytes}) ->
+ State#vqstate{bytes = Bytes + Size}.
remove(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,
@@ -1130,13 +1170,17 @@ remove(AckRequired, MsgStatus = #msg_status {
is_persistent = IsPersistent,
is_delivered = IsDelivered,
msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk },
+ index_on_disk = IndexOnDisk,
+ msg_props = #message_properties{ size = Size }},
State = #vqstate {ram_msg_count = RamMsgCount,
+ ram_msg_bytes = RamMsgBytes,
out_counter = OutCount,
index_state = IndexState,
msg_store_clients = MSCState,
len = Len,
- persistent_count = PCount}) ->
+ bytes = Bytes,
+ persistent_count = PCount,
+ persistent_bytes = PBytes}) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
@@ -1162,60 +1206,67 @@ remove(AckRequired, MsgStatus = #msg_status {
false -> {undefined, State}
end,
- PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
+ MaybeOne = one_if(IsPersistent andalso not AckRequired),
+ PCount1 = PCount - MaybeOne,
+ PBytes1 = PBytes - Size * MaybeOne,
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
+ RamMsgBytes1 = RamMsgBytes - Size * one_if(Msg =/= undefined),
{AckTag, maybe_update_rates(
State1 #vqstate {ram_msg_count = RamMsgCount1,
+ ram_msg_bytes = RamMsgBytes1,
out_counter = OutCount + 1,
index_state = IndexState2,
len = Len - 1,
- persistent_count = PCount1})}.
+ bytes = Bytes - Size,
+ persistent_count = PCount1,
+ persistent_bytes = PBytes1})}.
-purge_betas_and_deltas(LensByStore,
+purge_betas_and_deltas(PCount, PBytes,
State = #vqstate { q3 = Q3,
index_state = IndexState,
msg_store_clients = MSCState }) ->
case ?QUEUE:is_empty(Q3) of
- true -> {LensByStore, State};
- false -> {LensByStore1, IndexState1} =
- remove_queue_entries(fun ?QUEUE:foldl/3, Q3,
- LensByStore, IndexState, MSCState),
- purge_betas_and_deltas(LensByStore1,
+ true -> {PCount, PBytes, State};
+ false -> {PCount1, PBytes1, IndexState1} =
+ remove_queue_entries(
+ Q3, PCount, PBytes, IndexState, MSCState),
+ purge_betas_and_deltas(PCount1, PBytes1,
maybe_deltas_to_betas(
State #vqstate {
q3 = ?QUEUE:new(),
index_state = IndexState1 }))
end.
-remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) ->
- {MsgIdsByStore, Delivers, Acks} =
- Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q),
+remove_queue_entries(Q, PCount, PBytes, IndexState, MSCState) ->
+ {MsgIdsByStore, PBytes1, Delivers, Acks} =
+ ?QUEUE:foldl(fun remove_queue_entries1/2,
+ {orddict:new(), PBytes, [], []}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
- {sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore),
+ {PCount - case orddict:find(true, MsgIdsByStore) of
+ error -> 0;
+ {ok, Ids} -> length(Ids)
+ end,
+ PBytes1,
rabbit_queue_index:ack(Acks,
rabbit_queue_index:deliver(Delivers, IndexState))}.
remove_queue_entries1(
#msg_status { msg_id = MsgId, seq_id = SeqId,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
- {MsgIdsByStore, Delivers, Acks}) ->
+ index_on_disk = IndexOnDisk, is_persistent = IsPersistent,
+ msg_props = #message_properties { size = Size } },
+ {MsgIdsByStore, PBytes, Delivers, Acks}) ->
{case MsgOnDisk of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
+ PBytes - Size * one_if(IsPersistent),
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
cons_if(IndexOnDisk, SeqId, Acks)}.
-sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) ->
- orddict:fold(
- fun (IsPersistent, MsgIds, LensByStore1) ->
- orddict:update_counter(IsPersistent, length(MsgIds), LensByStore1)
- end, LensByStore, MsgIdsByStore).
-
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
@@ -1287,8 +1338,20 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
none -> gb_trees:get(SeqId, DPA)
end.
-remove_pending_ack(SeqId, State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA }) ->
+remove_pending_ack(SeqId, State) ->
+ {MsgStatus, State1 = #vqstate { persistent_count = PCount,
+ persistent_bytes = PBytes }} =
+ remove_pending_ack0(SeqId, State),
+ #msg_status { msg_props = #message_properties{ size = Size },
+ is_persistent = IsPersistent } = MsgStatus,
+ case IsPersistent of
+ false -> {MsgStatus, State1};
+ true -> {MsgStatus, State1#vqstate{ persistent_count = PCount - 1,
+ persistent_bytes = PBytes - Size }}
+ end.
+
+remove_pending_ack0(SeqId, State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
case gb_trees:lookup(SeqId, RPA) of
{value, V} -> RPA1 = gb_trees:delete(SeqId, RPA),
{V, State #vqstate { ram_pending_ack = RPA1 }};
@@ -1338,12 +1401,6 @@ accumulate_ack(#msg_status { seq_id = SeqId,
end,
[MsgId | AllMsgIds]}.
-find_persistent_count(LensByStore) ->
- case orddict:find(true, LensByStore) of
- error -> 0;
- {ok, Len} -> Len
- end.
-
%%----------------------------------------------------------------------------
%% Internal plumbing for confirms (aka publisher acks)
%%----------------------------------------------------------------------------
@@ -1391,9 +1448,9 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
{Msg, State1} = read_msg(MsgStatus, State),
- {MsgStatus#msg_status { msg = Msg }, inc_ram_msg_count(State1)};
+ {MsgStatus#msg_status { msg = Msg }, upd_ram_counts(1, MsgStatus, State1)};
publish_alpha(MsgStatus, State) ->
- {MsgStatus, inc_ram_msg_count(State)}.
+ {MsgStatus, upd_ram_counts(1, MsgStatus, State)}.
publish_beta(MsgStatus, State) ->
{MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
@@ -1419,7 +1476,7 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
{#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
PubFun(MsgStatus, State1),
queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
- Limit, PubFun, State2)
+ Limit, PubFun, inc_bytes(MsgStatus1, State2))
end;
queue_merge(SeqIds, Q, Front, MsgIds,
_Limit, _PubFun, State) ->
@@ -1433,13 +1490,14 @@ delta_merge(SeqIds, Delta, MsgIds, State) ->
msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
maybe_write_to_disk(true, true, MsgStatus, State1),
- {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2}
+ {expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
+ inc_bytes(MsgStatus, State2)}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
msg_from_pending_ack(SeqId, State) ->
{#msg_status { msg_props = MsgProps } = MsgStatus, State1} =
- remove_pending_ack(SeqId, State),
+ remove_pending_ack0(SeqId, State),
{MsgStatus #msg_status {
msg_props = MsgProps #message_properties { needs_confirming = false } },
State1}.
@@ -1719,21 +1777,21 @@ push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
{Quota, State};
push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
case credit_flow:blocked() of
- true -> {Quota, State};
- false -> case Generator(Q) of
- {empty, _Q} ->
- {Quota, State};
- {{value, MsgStatus}, Qa} ->
- {MsgStatus1 = #msg_status { msg_on_disk = true },
- State1 = #vqstate { ram_msg_count = RamMsgCount }} =
- maybe_write_to_disk(true, false, MsgStatus, State),
- MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = Consumer(MsgStatus2, Qa,
- State1 #vqstate {
- ram_msg_count = RamMsgCount - 1 }),
- push_alphas_to_betas(Generator, Consumer, Quota - 1,
- Qa, State2)
- end
+ true ->
+ {Quota, State};
+ false ->
+ case Generator(Q) of
+ {empty, _Q} ->
+ {Quota, State};
+ {{value, MsgStatus}, Qa} ->
+ {MsgStatus1 = #msg_status { msg_on_disk = true }, State1} =
+ maybe_write_to_disk(true, false, MsgStatus, State),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ State2 = Consumer(MsgStatus2, Qa,
+ upd_ram_counts(1, MsgStatus2, State1)),
+ push_alphas_to_betas(Generator, Consumer, Quota - 1,
+ Qa, State2)
+ end
end.
push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,