diff options
author | kjnilsson <knilsson@pivotal.io> | 2020-02-26 14:10:44 +0000 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2020-02-26 17:14:14 +0000 |
commit | 7ee76ef6c393c6f8522905b9f52d9670497030e8 (patch) | |
tree | 37a5675f95a2317dcb70473fc7fb7cb6ad21411e | |
parent | 5ce141c9f5e50aaa61880ee54aff7f07287d2707 (diff) | |
download | rabbitmq-server-git-7ee76ef6c393c6f8522905b9f52d9670497030e8.tar.gz |
Optimise messages_ready function by keeping counts of prefix messages
rather than taking length/1. There could be a lot of prefix messages
during during recovery which could make recovery unbearably slow.
-rw-r--r-- | src/rabbit_fifo.erl | 58 | ||||
-rw-r--r-- | src/rabbit_fifo.hrl | 7 | ||||
-rw-r--r-- | src/rabbit_quorum_queue.erl | 2 |
3 files changed, 42 insertions, 25 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 251b370caa..72109cfbf4 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -327,7 +327,7 @@ apply(#{index := RaftIdx}, #purge{}, messages = #{}, returns = lqueue:new(), msg_bytes_enqueue = 0, - prefix_msgs = {[], []}, + prefix_msgs = {0, [], 0, []}, low_msg_num = undefined, msg_bytes_in_memory = 0, msgs_ready_in_memory = 0}, @@ -555,7 +555,7 @@ state_enter(leader, #?MODULE{consumers = Cons, cfg = #cfg{name = Name, resource = Resource, become_leader_handler = BLH}, - prefix_msgs = {[], []} + prefix_msgs = {0, [], 0, []} }) -> % return effects to monitor all current consumers and enqueuers Pids = lists:usort(maps:keys(Enqs) @@ -770,16 +770,16 @@ usage(Name) when is_atom(Name) -> %%% Internal messages_ready(#?MODULE{messages = M, - prefix_msgs = {PreR, PreM}, + prefix_msgs = {RCnt, _R, PCnt, _P}, returns = R}) -> %% prefix messages will rarely have anything in them during normal %% operations so length/1 is fine here - maps:size(M) + lqueue:len(R) + length(PreR) + length(PreM). + maps:size(M) + lqueue:len(R) + RCnt + PCnt. messages_total(#?MODULE{ra_indexes = I, - prefix_msgs = {PreR, PreM}}) -> - rabbit_fifo_index:size(I) + length(PreR) + length(PreM). + prefix_msgs = {RCnt, _R, PCnt, _P}}) -> + rabbit_fifo_index:size(I) + RCnt + PCnt. update_use({inactive, _, _, _} = CUInfo, inactive) -> CUInfo; @@ -1016,13 +1016,15 @@ maybe_store_dehydrated_state(RaftIdx, = Cfg, ra_indexes = Indexes, enqueue_count = 0, - release_cursors = Cursors0} = State) -> + release_cursors = Cursors0} = State0) -> case rabbit_fifo_index:exists(RaftIdx, Indexes) of false -> %% the incoming enqueue must already have been dropped - State; + State0; true -> - Dehydrated = dehydrate_state(State), + State = convert_prefix_msgs(State0), + {Time, Dehydrated} = timer:tc(fun () -> dehydrate_state(State) end), + rabbit_log:info("dehydrating state took ~bms", [Time div 1000]), Cursor = {release_cursor, RaftIdx, Dehydrated}, Cursors = lqueue:in(Cursor, Cursors0), Interval = lqueue:len(Cursors) * Base, @@ -1336,7 +1338,8 @@ evaluate_limit(Result, max_bytes = undefined}} = State, Effects) -> {State, Result, Effects}; -evaluate_limit(Result, State0, Effects0) -> +evaluate_limit(Result, State00, Effects0) -> + State0 = convert_prefix_msgs(State00), case is_over_limit(State0) of true -> {State, Effects} = drop_head(State0, Effects0), @@ -1380,17 +1383,21 @@ append_log_effects(Effects0, AccMap) -> %% %% When we return it is always done to the current return queue %% for both prefix messages and current messages -take_next_msg(#?MODULE{prefix_msgs = {[{'$empty_msg', _} = Msg | Rem], P}} = State) -> +take_next_msg(#?MODULE{prefix_msgs = {R, P}} = State) -> + %% conversion + take_next_msg(State#?MODULE{prefix_msgs = {length(R), R, length(P), P}}); +take_next_msg(#?MODULE{prefix_msgs = {NumR, [{'$empty_msg', _} = Msg | Rem], + NumP, P}} = State) -> %% there are prefix returns, these should be served first - {Msg, State#?MODULE{prefix_msgs = {Rem, P}}}; -take_next_msg(#?MODULE{prefix_msgs = {[Header | Rem], P}} = State) -> + {Msg, State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}}; +take_next_msg(#?MODULE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) -> %% there are prefix returns, these should be served first {{'$prefix_msg', Header}, - State#?MODULE{prefix_msgs = {Rem, P}}}; + State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}}; take_next_msg(#?MODULE{returns = Returns, low_msg_num = Low0, messages = Messages0, - prefix_msgs = {R, P}} = State) -> + prefix_msgs = {NumR, R, NumP, P}} = State) -> %% use peek rather than out there as the most likely case is an empty %% queue case lqueue:peek(Returns) of @@ -1420,10 +1427,10 @@ take_next_msg(#?MODULE{returns = Returns, {Header, 'empty'} -> %% There are prefix msgs {{'$empty_msg', Header}, - State#?MODULE{prefix_msgs = {R, Rem}}}; + State#?MODULE{prefix_msgs = {NumR, R, NumP-1, Rem}}}; Header -> {{'$prefix_msg', Header}, - State#?MODULE{prefix_msgs = {R, Rem}}} + State#?MODULE{prefix_msgs = {NumR, R, NumP-1, Rem}}} end end. @@ -1600,6 +1607,11 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, ServiceQueue0 end. +convert_prefix_msgs(#?MODULE{prefix_msgs = {R, P}} = State) -> + State#?MODULE{prefix_msgs = {length(R), R, length(P), P}}; +convert_prefix_msgs(State) -> + State. + %% creates a dehydrated version of the current state to be cached and %% potentially used to for a snaphot at a later point dehydrate_state(#?MODULE{messages = Messages, @@ -1607,10 +1619,11 @@ dehydrate_state(#?MODULE{messages = Messages, returns = Returns, low_msg_num = Low, next_msg_num = Next, - prefix_msgs = {PrefRet0, PrefMsg0}, + prefix_msgs = {PRCnt, PrefRet0, PPCnt, PrefMsg0}, waiting_consumers = Waiting0} = State) -> + RCnt = lqueue:len(Returns), %% TODO: optimise this function as far as possible - PrefRet = lists:foldl(fun ({'$prefix_msg', Header}, Acc) -> + PrefRet1 = lists:foldr(fun ({'$prefix_msg', Header}, Acc) -> [Header | Acc]; ({'$empty_msg', _} = Msg, Acc) -> [Msg | Acc]; @@ -1619,8 +1632,9 @@ dehydrate_state(#?MODULE{messages = Messages, ({_, {_, {Header, _}}}, Acc) -> [Header | Acc] end, - lists:reverse(PrefRet0), + [], lqueue:to_list(Returns)), + PrefRet = PrefRet0 ++ PrefRet1, PrefMsgsSuff = dehydrate_messages(Low, Next - 1, Messages, []), %% prefix messages are not populated in normal operation only after %% recovering from a snapshot @@ -1634,8 +1648,8 @@ dehydrate_state(#?MODULE{messages = Messages, dehydrate_consumer(C) end, Consumers), returns = lqueue:new(), - prefix_msgs = {lists:reverse(PrefRet), - PrefMsgs}, + prefix_msgs = {PRCnt + RCnt, PrefRet, + PPCnt + maps:size(Messages), PrefMsgs}, waiting_consumers = Waiting}. dehydrate_messages(Low, Next, _Msgs, Acc) diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index 2fae8c10ca..b9e967cbb1 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -119,6 +119,10 @@ max_in_memory_bytes :: option(non_neg_integer()) }). +-type prefix_msgs() :: {list(), list()} | + {non_neg_integer(), list(), + non_neg_integer(), list()}. + -record(rabbit_fifo, {cfg :: #cfg{}, % unassigned messages @@ -161,8 +165,7 @@ %% overflow calculations). %% This is done so that consumers are still served in a deterministic %% order on recovery. - prefix_msgs = {[], []} :: {Return :: [msg_header() | {'$empty_msg', msg_header()}], - PrefixMsgs :: [msg_header() | {msg_header(), 'empty'}]}, + prefix_msgs = {0, [], 0, []} :: prefix_msgs(), msg_bytes_enqueue = 0 :: non_neg_integer(), msg_bytes_checkout = 0 :: non_neg_integer(), %% waiting consumers, one is picked active consumer is cancelled or dies diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 972716a396..ca7a2eadb1 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -249,7 +249,7 @@ all_replica_states() -> list_with_minimum_quorum() -> filter_quorum_critical(rabbit_amqqueue:list_local_quorum_queues()). --spec list_with_minimum_quorum_for_cli() -> [amqqueue:amqqueue()]. +-spec list_with_minimum_quorum_for_cli() -> [#{binary() => term()}]. list_with_minimum_quorum_for_cli() -> QQs = list_with_minimum_quorum(), [begin |