diff options
author | kjnilsson <knilsson@pivotal.io> | 2020-06-30 10:29:45 +0100 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2020-09-07 09:42:10 +0100 |
commit | bd3827b0cf5f2199268a5b579f27483a9c581ab2 (patch) | |
tree | 069a21437f4af14f9bc2d6da39e8ba646ae8fec0 | |
parent | 613ca58f81b99643c14b944f1ea73896c79d9cf1 (diff) | |
download | rabbitmq-server-git-bd3827b0cf5f2199268a5b579f27483a9c581ab2.tar.gz |
rabbit_fifo: change messages map to queue
This results in a lower memory use per message.
-rw-r--r-- | src/rabbit_fifo.erl | 119 | ||||
-rw-r--r-- | src/rabbit_fifo.hrl | 6 | ||||
-rw-r--r-- | src/rabbit_fifo_v0.erl | 20 | ||||
-rw-r--r-- | test/rabbit_fifo_SUITE.erl | 37 |
4 files changed, 96 insertions, 86 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index fb1794cae4..13c532af47 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -137,16 +137,14 @@ update_config(Conf, State) -> competing end, Cfg = State#?MODULE.cfg, - SHICur = case State#?MODULE.cfg of - #cfg{release_cursor_interval = {_, C}} -> - C; - #cfg{release_cursor_interval = undefined} -> - SHI; - #cfg{release_cursor_interval = C} -> - C - end, + RCI = case State#?MODULE.cfg of + #cfg{release_cursor_interval = undefined} -> + {SHI, SHI}; + #cfg{release_cursor_interval = {_, C}} -> + {SHI, C} + end, - State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {SHI, SHICur}, + State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCI, dead_letter_handler = DLH, become_leader_handler = BLH, max_length = MaxLength, @@ -312,17 +310,17 @@ apply(#{index := RaftIdx}, #purge{}, messages = Messages} = State0) -> Total = messages_ready(State0), Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, - [I || {I, _} <- lists:sort(maps:values(Messages))]), + [I || {I, _} <- lists:sort(lqueue:to_list(Messages))]), Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1, [I || {_, {I, _}} <- lqueue:to_list(Returns)]), {State, _, Effects} = update_smallest_raft_index(RaftIdx, State0#?MODULE{ra_indexes = Indexes, - messages = #{}, + messages = lqueue:new(), returns = lqueue:new(), msg_bytes_enqueue = 0, prefix_msgs = {0, [], 0, []}, - low_msg_num = undefined, + % low_msg_num = undefined, msg_bytes_in_memory = 0, msgs_ready_in_memory = 0}, []), @@ -467,9 +465,13 @@ apply(_, #purge_nodes{nodes = Nodes}, State0) -> {State, ok, Effects}; apply(Meta, #update_config{config = Conf}, State) -> checkout(Meta, update_config(Conf, State), []); -apply(_Meta, {machine_version, 0, 1}, State) -> +apply(_Meta, {machine_version, 0, 1}, V0State0) -> + V0State = rabbit_fifo_v0:normalize_for_v1(V0State0), %% quick hack to "convert" the state from version one - {setelement(1, State, ?MODULE), ok, []}. + State = setelement(1, V0State, ?MODULE), + V0Msgs = rabbit_fifo_v0:messages_map(V0State), + V1Msgs = lqueue:from_list(lists:sort(maps:to_list(V0Msgs))), + {State#?MODULE{messages = V1Msgs}, ok, []}. purge_node(Node, State, Effects) -> lists:foldl(fun(Pid, {S0, E0}) -> @@ -816,7 +818,7 @@ messages_ready(#?MODULE{messages = M, %% prefix messages will rarely have anything in them during normal %% operations so length/1 is fine here - maps:size(M) + lqueue:len(R) + RCnt + PCnt. + lqueue:len(M) + lqueue:len(R) + RCnt + PCnt. messages_total(#?MODULE{ra_indexes = I, prefix_msgs = {RCnt, _R, PCnt, _P}}) -> @@ -1005,7 +1007,6 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> end. enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, - low_msg_num = LowMsgNum, next_msg_num = NextMsgNum} = State0) -> %% the initial header is an integer only - it will get expanded to a map %% when the next required key is added @@ -1020,10 +1021,10 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, {RaftIdx, {Header, RawMsg}}} % indexed message with header map end, State = add_bytes_enqueue(Header, State1), - State#?MODULE{messages = Messages#{NextMsgNum => Msg}, + State#?MODULE{messages = lqueue:in({NextMsgNum, Msg}, Messages), %% this is probably only done to record it when low_msg_num %% is undefined - low_msg_num = min(LowMsgNum, NextMsgNum), + % low_msg_num = min(LowMsgNum, NextMsgNum), next_msg_num = NextMsgNum + 1}. append_to_master_index(RaftIdx, @@ -1042,12 +1043,6 @@ incr_enqueue_count(#?MODULE{enqueue_count = C, %% A: Because it needs to be the very last thing we do and we %% first needs to run the checkout logic. State0#?MODULE{enqueue_count = 0}; -incr_enqueue_count(#?MODULE{cfg = #cfg{release_cursor_interval = C} = Cfg} - = State0) - when is_integer(C) -> - %% conversion to new release cursor interval format - State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}}, - incr_enqueue_count(State); incr_enqueue_count(#?MODULE{enqueue_count = C} = State) -> State#?MODULE{enqueue_count = C + 1}. @@ -1070,22 +1065,13 @@ maybe_store_dehydrated_state(RaftIdx, min(max(Total, Base), ?RELEASE_CURSOR_EVERY_MAX) end, - State = convert_prefix_msgs( - State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = - {Base, Interval}}}), + State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = + {Base, Interval}}}, Dehydrated = dehydrate_state(State), Cursor = {release_cursor, RaftIdx, Dehydrated}, Cursors = lqueue:in(Cursor, Cursors0), State#?MODULE{release_cursors = Cursors} end; -maybe_store_dehydrated_state(RaftIdx, - #?MODULE{cfg = - #cfg{release_cursor_interval = C} = Cfg} - = State0) - when is_integer(C) -> - %% convert to new format - State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}}, - maybe_store_dehydrated_state(RaftIdx, State); maybe_store_dehydrated_state(_RaftIdx, State) -> State. @@ -1406,8 +1392,7 @@ evaluate_limit(Result, max_bytes = undefined}} = State, Effects) -> {State, Result, Effects}; -evaluate_limit(Result, State00, Effects0) -> - State0 = convert_prefix_msgs(State00), +evaluate_limit(Result, State0, Effects0) -> case is_over_limit(State0) of true -> {State, Effects} = drop_head(State0, Effects0), @@ -1463,7 +1448,6 @@ take_next_msg(#?MODULE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) - {{'$prefix_msg', Header}, State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}}; take_next_msg(#?MODULE{returns = Returns, - low_msg_num = Low0, messages = Messages0, prefix_msgs = {NumR, R, NumP, P}} = State) -> %% use peek rather than out there as the most likely case is an empty @@ -1473,21 +1457,11 @@ take_next_msg(#?MODULE{returns = Returns, {NextMsg, State#?MODULE{returns = lqueue:drop(Returns)}}; empty when P == [] -> - case Low0 of - undefined -> + case lqueue:out(Messages0) of + {empty, _} -> empty; - _ -> - {Msg, Messages} = maps:take(Low0, Messages0), - case maps:size(Messages) of - 0 -> - {{Low0, Msg}, - State#?MODULE{messages = Messages, - low_msg_num = undefined}}; - _ -> - {{Low0, Msg}, - State#?MODULE{messages = Messages, - low_msg_num = Low0 + 1}} - end + {{value, {_, _} = SeqMsg}, Messages} -> + {SeqMsg, State#?MODULE{messages = Messages }} end; empty -> [Msg | Rem] = P, @@ -1584,7 +1558,7 @@ checkout_one(#?MODULE{service_queue = SQ0, {nochange, InitState} end; empty -> - case maps:size(Messages0) of + case lqueue:len(Messages0) of 0 -> {nochange, InitState}; _ -> {inactive, InitState} end @@ -1675,18 +1649,13 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, ServiceQueue0 end. -convert_prefix_msgs(#?MODULE{prefix_msgs = {R, P}} = State) -> - State#?MODULE{prefix_msgs = {length(R), R, length(P), P}}; -convert_prefix_msgs(State) -> - State. - %% creates a dehydrated version of the current state to be cached and %% potentially used to for a snaphot at a later point dehydrate_state(#?MODULE{messages = Messages, consumers = Consumers, returns = Returns, - low_msg_num = Low, - next_msg_num = Next, + % low_msg_num = Low, + % next_msg_num = Next, prefix_msgs = {PRCnt, PrefRet0, PPCnt, PrefMsg0}, waiting_consumers = Waiting0} = State) -> RCnt = lqueue:len(Returns), @@ -1703,34 +1672,34 @@ dehydrate_state(#?MODULE{messages = Messages, [], lqueue:to_list(Returns)), PrefRet = PrefRet0 ++ PrefRet1, - PrefMsgsSuff = dehydrate_messages(Low, Next - 1, Messages, []), + PrefMsgsSuff = dehydrate_messages(Messages, []), %% prefix messages are not populated in normal operation only after %% recovering from a snapshot PrefMsgs = PrefMsg0 ++ PrefMsgsSuff, Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0], - State#?MODULE{messages = #{}, + State#?MODULE{messages = lqueue:new(), ra_indexes = rabbit_fifo_index:empty(), release_cursors = lqueue:new(), - low_msg_num = undefined, + % low_msg_num = undefined, consumers = maps:map(fun (_, C) -> dehydrate_consumer(C) end, Consumers), returns = lqueue:new(), prefix_msgs = {PRCnt + RCnt, PrefRet, - PPCnt + maps:size(Messages), PrefMsgs}, + PPCnt + lqueue:len(Messages), PrefMsgs}, waiting_consumers = Waiting}. -dehydrate_messages(Low, Next, _Msgs, Acc) - when Next < Low -> - Acc; -dehydrate_messages(Low, Next, Msgs, Acc0) -> - Acc = case maps:get(Next, Msgs) of - {_RaftIdx, {_, 'empty'} = Msg} -> - [Msg | Acc0]; - {_RaftIdx, {Header, _}} -> - [Header | Acc0] - end, - dehydrate_messages(Low, Next - 1, Msgs, Acc). +%% TODO make body recursive to avoid lists:reverse +dehydrate_messages(Msgs0, Acc0) -> + {OutRes, Msgs} = lqueue:out(Msgs0), + case OutRes of + {value, {_, 'empty'} = Msg} -> + dehydrate_messages(Msgs, [Msg | Acc0]); + {value, {Header, _}} -> + dehydrate_messages(Msgs, [Header | Acc0]); + empty -> + lists:reverse(Acc0) + end. dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Checked = maps:map(fun (_, {'$prefix_msg', _} = M) -> diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index ebbaa9e1eb..ec93f480dd 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -110,9 +110,7 @@ -record(cfg, {name :: atom(), resource :: rabbit_types:r('queue'), - release_cursor_interval :: - undefined | non_neg_integer() | - {non_neg_integer(), non_neg_integer()}, + release_cursor_interval :: option({non_neg_integer(), non_neg_integer()}), dead_letter_handler :: option(applied_mfa()), become_leader_handler :: option(applied_mfa()), max_length :: option(non_neg_integer()), @@ -132,7 +130,7 @@ -record(rabbit_fifo, {cfg :: #cfg{}, % unassigned messages - messages = #{} :: #{msg_in_id() => indexed_msg()}, + messages = lqueue:new() :: lqueue:queue(), % defines the lowest message in id available in the messages map % that isn't a return low_msg_num :: option(msg_in_id()), diff --git a/src/rabbit_fifo_v0.erl b/src/rabbit_fifo_v0.erl index 01330fe54f..52706aec1f 100644 --- a/src/rabbit_fifo_v0.erl +++ b/src/rabbit_fifo_v0.erl @@ -53,6 +53,9 @@ %% misc dehydrate_state/1, normalize/1, + normalize_for_v1/1, + %% getters for coversions + messages_map/1, %% protocol helpers make_enqueue/3, @@ -124,6 +127,7 @@ -spec init(config()) -> state(). init(#{name := Name, queue_resource := Resource} = Conf) -> + rabbit_log:info("rabbit_fifo: init v0 ~p", [Conf]), update_config(Conf, #?MODULE{cfg = #cfg{name = Name, resource = Resource}}). @@ -1754,6 +1758,22 @@ is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength, messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes). +normalize_for_v1(#?MODULE{cfg = Cfg} = State) -> + %% run all v0 conversions so that v1 does not have to have this code + RCI = case Cfg of + #cfg{release_cursor_interval = {_, _} = R} -> + R; + #cfg{release_cursor_interval = undefined} -> + {?RELEASE_CURSOR_EVERY, ?RELEASE_CURSOR_EVERY}; + #cfg{release_cursor_interval = C} -> + {?RELEASE_CURSOR_EVERY, C} + end, + convert_prefix_msgs( + State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCI}}). + +messages_map(#?MODULE{messages = Messages}) -> + Messages. + -spec make_enqueue(option(pid()), option(msg_seqno()), raw_msg()) -> protocol(). make_enqueue(Pid, Seq, Msg) -> #enqueue{pid = Pid, seq = Seq, msg = Msg}. diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 9f2daac762..e3dfb29e7d 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -380,7 +380,7 @@ cancelled_checkout_out_test(_) -> {State1, _} = check_auto(Cid, 2, State0), % cancelled checkout should not return pending messages to queue {State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1), - ?assertEqual(1, maps:size(State2#rabbit_fifo.messages)), + ?assertEqual(1, lqueue:len(State2#rabbit_fifo.messages)), ?assertEqual(0, lqueue:len(State2#rabbit_fifo.returns)), {State3, {dequeue, empty}} = @@ -436,13 +436,13 @@ down_with_noconnection_returns_unack_test(_) -> Pid = spawn(fun() -> ok end), Cid = {<<"down_with_noconnect">>, Pid}, {State0, _} = enq(1, 1, second, test_init(test)), - ?assertEqual(1, maps:size(State0#rabbit_fifo.messages)), + ?assertEqual(1, lqueue:len(State0#rabbit_fifo.messages)), ?assertEqual(0, lqueue:len(State0#rabbit_fifo.returns)), {State1, {_, _}} = deq(2, Cid, unsettled, State0), - ?assertEqual(0, maps:size(State1#rabbit_fifo.messages)), + ?assertEqual(0, lqueue:len(State1#rabbit_fifo.messages)), ?assertEqual(0, lqueue:len(State1#rabbit_fifo.returns)), {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), - ?assertEqual(0, maps:size(State2a#rabbit_fifo.messages)), + ?assertEqual(0, lqueue:len(State2a#rabbit_fifo.messages)), ?assertEqual(1, lqueue:len(State2a#rabbit_fifo.returns)), ?assertMatch(#consumer{checked_out = Ch, status = suspected_down} @@ -539,7 +539,7 @@ duplicate_delivery_test(_) -> {#rabbit_fifo{ra_indexes = RaIdxs, messages = Messages}, _} = enq(2, 1, first, State0), ?assertEqual(1, rabbit_fifo_index:size(RaIdxs)), - ?assertEqual(1, maps:size(Messages)), + ?assertEqual(1, lqueue:len(Messages)), ok. state_enter_file_handle_leader_reservation_test(_) -> @@ -622,7 +622,7 @@ down_noproc_returns_checked_out_in_order_test(_) -> {FS, _} = enq(Num, Num, Num, FS0), FS end, S0, lists:seq(1, 100)), - ?assertEqual(100, maps:size(S1#rabbit_fifo.messages)), + ?assertEqual(100, lqueue:len(S1#rabbit_fifo.messages)), Cid = {<<"cid">>, self()}, {S2, _} = check(Cid, 101, 1000, S1), #consumer{checked_out = Checked} = maps:get(Cid, S2#rabbit_fifo.consumers), @@ -643,7 +643,7 @@ down_noconnection_returns_checked_out_test(_) -> {FS, _} = enq(Num, Num, Num, FS0), FS end, S0, lists:seq(1, NumMsgs)), - ?assertEqual(NumMsgs, maps:size(S1#rabbit_fifo.messages)), + ?assertEqual(NumMsgs, lqueue:len(S1#rabbit_fifo.messages)), Cid = {<<"cid">>, self()}, {S2, _} = check(Cid, 101, 1000, S1), #consumer{checked_out = Checked} = maps:get(Cid, S2#rabbit_fifo.consumers), @@ -1386,6 +1386,29 @@ aux_test(_) -> ?assert(X > 0.0), ok. + +%% machine version conversion test + +machine_version_test(_) -> + V0 = rabbit_fifo_v0, + S0 = V0:init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}), + Idx = 1, + {#rabbit_fifo{}, ok, []} = apply(meta(Idx), {machine_version, 0, 1}, S0), + + Cid = {atom_to_binary(?FUNCTION_NAME, utf8), self()}, + Entries = [ + {1, rabbit_fifo_v0:make_enqueue(self(), 1, banana)}, + {2, rabbit_fifo_v0:make_enqueue(self(), 2, apple)}, + {3, rabbit_fifo_v0:make_checkout(Cid, {auto, 1, unsettled}, #{})} + ], + {S1, _Effects} = rabbit_fifo_v0_SUITE:run_log(S0, Entries), + {#rabbit_fifo{messages = Msgs}, ok, []} = apply(meta(Idx), + {machine_version, 0, 1}, S1), + %% validate message conversion to lqueue + ?assertEqual(1, lqueue:len(Msgs)), + ok. + %% Utility init(Conf) -> rabbit_fifo:init(Conf). |