summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-02-26 14:10:44 +0000
committerkjnilsson <knilsson@pivotal.io>2020-02-26 17:14:14 +0000
commit7ee76ef6c393c6f8522905b9f52d9670497030e8 (patch)
tree37a5675f95a2317dcb70473fc7fb7cb6ad21411e
parent5ce141c9f5e50aaa61880ee54aff7f07287d2707 (diff)
downloadrabbitmq-server-git-7ee76ef6c393c6f8522905b9f52d9670497030e8.tar.gz
Optimise messages_ready function by keeping counts of prefix messages
rather than taking length/1. There could be a lot of prefix messages during during recovery which could make recovery unbearably slow.
-rw-r--r--src/rabbit_fifo.erl58
-rw-r--r--src/rabbit_fifo.hrl7
-rw-r--r--src/rabbit_quorum_queue.erl2
3 files changed, 42 insertions, 25 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 251b370caa..72109cfbf4 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -327,7 +327,7 @@ apply(#{index := RaftIdx}, #purge{},
messages = #{},
returns = lqueue:new(),
msg_bytes_enqueue = 0,
- prefix_msgs = {[], []},
+ prefix_msgs = {0, [], 0, []},
low_msg_num = undefined,
msg_bytes_in_memory = 0,
msgs_ready_in_memory = 0},
@@ -555,7 +555,7 @@ state_enter(leader, #?MODULE{consumers = Cons,
cfg = #cfg{name = Name,
resource = Resource,
become_leader_handler = BLH},
- prefix_msgs = {[], []}
+ prefix_msgs = {0, [], 0, []}
}) ->
% return effects to monitor all current consumers and enqueuers
Pids = lists:usort(maps:keys(Enqs)
@@ -770,16 +770,16 @@ usage(Name) when is_atom(Name) ->
%%% Internal
messages_ready(#?MODULE{messages = M,
- prefix_msgs = {PreR, PreM},
+ prefix_msgs = {RCnt, _R, PCnt, _P},
returns = R}) ->
%% prefix messages will rarely have anything in them during normal
%% operations so length/1 is fine here
- maps:size(M) + lqueue:len(R) + length(PreR) + length(PreM).
+ maps:size(M) + lqueue:len(R) + RCnt + PCnt.
messages_total(#?MODULE{ra_indexes = I,
- prefix_msgs = {PreR, PreM}}) ->
- rabbit_fifo_index:size(I) + length(PreR) + length(PreM).
+ prefix_msgs = {RCnt, _R, PCnt, _P}}) ->
+ rabbit_fifo_index:size(I) + RCnt + PCnt.
update_use({inactive, _, _, _} = CUInfo, inactive) ->
CUInfo;
@@ -1016,13 +1016,15 @@ maybe_store_dehydrated_state(RaftIdx,
= Cfg,
ra_indexes = Indexes,
enqueue_count = 0,
- release_cursors = Cursors0} = State) ->
+ release_cursors = Cursors0} = State0) ->
case rabbit_fifo_index:exists(RaftIdx, Indexes) of
false ->
%% the incoming enqueue must already have been dropped
- State;
+ State0;
true ->
- Dehydrated = dehydrate_state(State),
+ State = convert_prefix_msgs(State0),
+ {Time, Dehydrated} = timer:tc(fun () -> dehydrate_state(State) end),
+ rabbit_log:info("dehydrating state took ~bms", [Time div 1000]),
Cursor = {release_cursor, RaftIdx, Dehydrated},
Cursors = lqueue:in(Cursor, Cursors0),
Interval = lqueue:len(Cursors) * Base,
@@ -1336,7 +1338,8 @@ evaluate_limit(Result,
max_bytes = undefined}} = State,
Effects) ->
{State, Result, Effects};
-evaluate_limit(Result, State0, Effects0) ->
+evaluate_limit(Result, State00, Effects0) ->
+ State0 = convert_prefix_msgs(State00),
case is_over_limit(State0) of
true ->
{State, Effects} = drop_head(State0, Effects0),
@@ -1380,17 +1383,21 @@ append_log_effects(Effects0, AccMap) ->
%%
%% When we return it is always done to the current return queue
%% for both prefix messages and current messages
-take_next_msg(#?MODULE{prefix_msgs = {[{'$empty_msg', _} = Msg | Rem], P}} = State) ->
+take_next_msg(#?MODULE{prefix_msgs = {R, P}} = State) ->
+ %% conversion
+ take_next_msg(State#?MODULE{prefix_msgs = {length(R), R, length(P), P}});
+take_next_msg(#?MODULE{prefix_msgs = {NumR, [{'$empty_msg', _} = Msg | Rem],
+ NumP, P}} = State) ->
%% there are prefix returns, these should be served first
- {Msg, State#?MODULE{prefix_msgs = {Rem, P}}};
-take_next_msg(#?MODULE{prefix_msgs = {[Header | Rem], P}} = State) ->
+ {Msg, State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
+take_next_msg(#?MODULE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) ->
%% there are prefix returns, these should be served first
{{'$prefix_msg', Header},
- State#?MODULE{prefix_msgs = {Rem, P}}};
+ State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
take_next_msg(#?MODULE{returns = Returns,
low_msg_num = Low0,
messages = Messages0,
- prefix_msgs = {R, P}} = State) ->
+ prefix_msgs = {NumR, R, NumP, P}} = State) ->
%% use peek rather than out there as the most likely case is an empty
%% queue
case lqueue:peek(Returns) of
@@ -1420,10 +1427,10 @@ take_next_msg(#?MODULE{returns = Returns,
{Header, 'empty'} ->
%% There are prefix msgs
{{'$empty_msg', Header},
- State#?MODULE{prefix_msgs = {R, Rem}}};
+ State#?MODULE{prefix_msgs = {NumR, R, NumP-1, Rem}}};
Header ->
{{'$prefix_msg', Header},
- State#?MODULE{prefix_msgs = {R, Rem}}}
+ State#?MODULE{prefix_msgs = {NumR, R, NumP-1, Rem}}}
end
end.
@@ -1600,6 +1607,11 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
ServiceQueue0
end.
+convert_prefix_msgs(#?MODULE{prefix_msgs = {R, P}} = State) ->
+ State#?MODULE{prefix_msgs = {length(R), R, length(P), P}};
+convert_prefix_msgs(State) ->
+ State.
+
%% creates a dehydrated version of the current state to be cached and
%% potentially used to for a snaphot at a later point
dehydrate_state(#?MODULE{messages = Messages,
@@ -1607,10 +1619,11 @@ dehydrate_state(#?MODULE{messages = Messages,
returns = Returns,
low_msg_num = Low,
next_msg_num = Next,
- prefix_msgs = {PrefRet0, PrefMsg0},
+ prefix_msgs = {PRCnt, PrefRet0, PPCnt, PrefMsg0},
waiting_consumers = Waiting0} = State) ->
+ RCnt = lqueue:len(Returns),
%% TODO: optimise this function as far as possible
- PrefRet = lists:foldl(fun ({'$prefix_msg', Header}, Acc) ->
+ PrefRet1 = lists:foldr(fun ({'$prefix_msg', Header}, Acc) ->
[Header | Acc];
({'$empty_msg', _} = Msg, Acc) ->
[Msg | Acc];
@@ -1619,8 +1632,9 @@ dehydrate_state(#?MODULE{messages = Messages,
({_, {_, {Header, _}}}, Acc) ->
[Header | Acc]
end,
- lists:reverse(PrefRet0),
+ [],
lqueue:to_list(Returns)),
+ PrefRet = PrefRet0 ++ PrefRet1,
PrefMsgsSuff = dehydrate_messages(Low, Next - 1, Messages, []),
%% prefix messages are not populated in normal operation only after
%% recovering from a snapshot
@@ -1634,8 +1648,8 @@ dehydrate_state(#?MODULE{messages = Messages,
dehydrate_consumer(C)
end, Consumers),
returns = lqueue:new(),
- prefix_msgs = {lists:reverse(PrefRet),
- PrefMsgs},
+ prefix_msgs = {PRCnt + RCnt, PrefRet,
+ PPCnt + maps:size(Messages), PrefMsgs},
waiting_consumers = Waiting}.
dehydrate_messages(Low, Next, _Msgs, Acc)
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index 2fae8c10ca..b9e967cbb1 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -119,6 +119,10 @@
max_in_memory_bytes :: option(non_neg_integer())
}).
+-type prefix_msgs() :: {list(), list()} |
+ {non_neg_integer(), list(),
+ non_neg_integer(), list()}.
+
-record(rabbit_fifo,
{cfg :: #cfg{},
% unassigned messages
@@ -161,8 +165,7 @@
%% overflow calculations).
%% This is done so that consumers are still served in a deterministic
%% order on recovery.
- prefix_msgs = {[], []} :: {Return :: [msg_header() | {'$empty_msg', msg_header()}],
- PrefixMsgs :: [msg_header() | {msg_header(), 'empty'}]},
+ prefix_msgs = {0, [], 0, []} :: prefix_msgs(),
msg_bytes_enqueue = 0 :: non_neg_integer(),
msg_bytes_checkout = 0 :: non_neg_integer(),
%% waiting consumers, one is picked active consumer is cancelled or dies
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 972716a396..ca7a2eadb1 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -249,7 +249,7 @@ all_replica_states() ->
list_with_minimum_quorum() ->
filter_quorum_critical(rabbit_amqqueue:list_local_quorum_queues()).
--spec list_with_minimum_quorum_for_cli() -> [amqqueue:amqqueue()].
+-spec list_with_minimum_quorum_for_cli() -> [#{binary() => term()}].
list_with_minimum_quorum_for_cli() ->
QQs = list_with_minimum_quorum(),
[begin