summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-06-30 10:29:45 +0100
committerkjnilsson <knilsson@pivotal.io>2020-09-07 09:42:10 +0100
commitbd3827b0cf5f2199268a5b579f27483a9c581ab2 (patch)
tree069a21437f4af14f9bc2d6da39e8ba646ae8fec0
parent613ca58f81b99643c14b944f1ea73896c79d9cf1 (diff)
downloadrabbitmq-server-git-bd3827b0cf5f2199268a5b579f27483a9c581ab2.tar.gz
rabbit_fifo: change messages map to queue
This results in a lower memory use per message.
-rw-r--r--src/rabbit_fifo.erl119
-rw-r--r--src/rabbit_fifo.hrl6
-rw-r--r--src/rabbit_fifo_v0.erl20
-rw-r--r--test/rabbit_fifo_SUITE.erl37
4 files changed, 96 insertions, 86 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index fb1794cae4..13c532af47 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -137,16 +137,14 @@ update_config(Conf, State) ->
competing
end,
Cfg = State#?MODULE.cfg,
- SHICur = case State#?MODULE.cfg of
- #cfg{release_cursor_interval = {_, C}} ->
- C;
- #cfg{release_cursor_interval = undefined} ->
- SHI;
- #cfg{release_cursor_interval = C} ->
- C
- end,
+ RCI = case State#?MODULE.cfg of
+ #cfg{release_cursor_interval = undefined} ->
+ {SHI, SHI};
+ #cfg{release_cursor_interval = {_, C}} ->
+ {SHI, C}
+ end,
- State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {SHI, SHICur},
+ State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCI,
dead_letter_handler = DLH,
become_leader_handler = BLH,
max_length = MaxLength,
@@ -312,17 +310,17 @@ apply(#{index := RaftIdx}, #purge{},
messages = Messages} = State0) ->
Total = messages_ready(State0),
Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
- [I || {I, _} <- lists:sort(maps:values(Messages))]),
+ [I || {I, _} <- lists:sort(lqueue:to_list(Messages))]),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1,
[I || {_, {I, _}} <- lqueue:to_list(Returns)]),
{State, _, Effects} =
update_smallest_raft_index(RaftIdx,
State0#?MODULE{ra_indexes = Indexes,
- messages = #{},
+ messages = lqueue:new(),
returns = lqueue:new(),
msg_bytes_enqueue = 0,
prefix_msgs = {0, [], 0, []},
- low_msg_num = undefined,
+ % low_msg_num = undefined,
msg_bytes_in_memory = 0,
msgs_ready_in_memory = 0},
[]),
@@ -467,9 +465,13 @@ apply(_, #purge_nodes{nodes = Nodes}, State0) ->
{State, ok, Effects};
apply(Meta, #update_config{config = Conf}, State) ->
checkout(Meta, update_config(Conf, State), []);
-apply(_Meta, {machine_version, 0, 1}, State) ->
+apply(_Meta, {machine_version, 0, 1}, V0State0) ->
+ V0State = rabbit_fifo_v0:normalize_for_v1(V0State0),
%% quick hack to "convert" the state from version one
- {setelement(1, State, ?MODULE), ok, []}.
+ State = setelement(1, V0State, ?MODULE),
+ V0Msgs = rabbit_fifo_v0:messages_map(V0State),
+ V1Msgs = lqueue:from_list(lists:sort(maps:to_list(V0Msgs))),
+ {State#?MODULE{messages = V1Msgs}, ok, []}.
purge_node(Node, State, Effects) ->
lists:foldl(fun(Pid, {S0, E0}) ->
@@ -816,7 +818,7 @@ messages_ready(#?MODULE{messages = M,
%% prefix messages will rarely have anything in them during normal
%% operations so length/1 is fine here
- maps:size(M) + lqueue:len(R) + RCnt + PCnt.
+ lqueue:len(M) + lqueue:len(R) + RCnt + PCnt.
messages_total(#?MODULE{ra_indexes = I,
prefix_msgs = {RCnt, _R, PCnt, _P}}) ->
@@ -1005,7 +1007,6 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
end.
enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
- low_msg_num = LowMsgNum,
next_msg_num = NextMsgNum} = State0) ->
%% the initial header is an integer only - it will get expanded to a map
%% when the next required key is added
@@ -1020,10 +1021,10 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
{RaftIdx, {Header, RawMsg}}} % indexed message with header map
end,
State = add_bytes_enqueue(Header, State1),
- State#?MODULE{messages = Messages#{NextMsgNum => Msg},
+ State#?MODULE{messages = lqueue:in({NextMsgNum, Msg}, Messages),
%% this is probably only done to record it when low_msg_num
%% is undefined
- low_msg_num = min(LowMsgNum, NextMsgNum),
+ % low_msg_num = min(LowMsgNum, NextMsgNum),
next_msg_num = NextMsgNum + 1}.
append_to_master_index(RaftIdx,
@@ -1042,12 +1043,6 @@ incr_enqueue_count(#?MODULE{enqueue_count = C,
%% A: Because it needs to be the very last thing we do and we
%% first needs to run the checkout logic.
State0#?MODULE{enqueue_count = 0};
-incr_enqueue_count(#?MODULE{cfg = #cfg{release_cursor_interval = C} = Cfg}
- = State0)
- when is_integer(C) ->
- %% conversion to new release cursor interval format
- State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
- incr_enqueue_count(State);
incr_enqueue_count(#?MODULE{enqueue_count = C} = State) ->
State#?MODULE{enqueue_count = C + 1}.
@@ -1070,22 +1065,13 @@ maybe_store_dehydrated_state(RaftIdx,
min(max(Total, Base),
?RELEASE_CURSOR_EVERY_MAX)
end,
- State = convert_prefix_msgs(
- State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval =
- {Base, Interval}}}),
+ State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval =
+ {Base, Interval}}},
Dehydrated = dehydrate_state(State),
Cursor = {release_cursor, RaftIdx, Dehydrated},
Cursors = lqueue:in(Cursor, Cursors0),
State#?MODULE{release_cursors = Cursors}
end;
-maybe_store_dehydrated_state(RaftIdx,
- #?MODULE{cfg =
- #cfg{release_cursor_interval = C} = Cfg}
- = State0)
- when is_integer(C) ->
- %% convert to new format
- State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
- maybe_store_dehydrated_state(RaftIdx, State);
maybe_store_dehydrated_state(_RaftIdx, State) ->
State.
@@ -1406,8 +1392,7 @@ evaluate_limit(Result,
max_bytes = undefined}} = State,
Effects) ->
{State, Result, Effects};
-evaluate_limit(Result, State00, Effects0) ->
- State0 = convert_prefix_msgs(State00),
+evaluate_limit(Result, State0, Effects0) ->
case is_over_limit(State0) of
true ->
{State, Effects} = drop_head(State0, Effects0),
@@ -1463,7 +1448,6 @@ take_next_msg(#?MODULE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) -
{{'$prefix_msg', Header},
State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
take_next_msg(#?MODULE{returns = Returns,
- low_msg_num = Low0,
messages = Messages0,
prefix_msgs = {NumR, R, NumP, P}} = State) ->
%% use peek rather than out there as the most likely case is an empty
@@ -1473,21 +1457,11 @@ take_next_msg(#?MODULE{returns = Returns,
{NextMsg,
State#?MODULE{returns = lqueue:drop(Returns)}};
empty when P == [] ->
- case Low0 of
- undefined ->
+ case lqueue:out(Messages0) of
+ {empty, _} ->
empty;
- _ ->
- {Msg, Messages} = maps:take(Low0, Messages0),
- case maps:size(Messages) of
- 0 ->
- {{Low0, Msg},
- State#?MODULE{messages = Messages,
- low_msg_num = undefined}};
- _ ->
- {{Low0, Msg},
- State#?MODULE{messages = Messages,
- low_msg_num = Low0 + 1}}
- end
+ {{value, {_, _} = SeqMsg}, Messages} ->
+ {SeqMsg, State#?MODULE{messages = Messages }}
end;
empty ->
[Msg | Rem] = P,
@@ -1584,7 +1558,7 @@ checkout_one(#?MODULE{service_queue = SQ0,
{nochange, InitState}
end;
empty ->
- case maps:size(Messages0) of
+ case lqueue:len(Messages0) of
0 -> {nochange, InitState};
_ -> {inactive, InitState}
end
@@ -1675,18 +1649,13 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
ServiceQueue0
end.
-convert_prefix_msgs(#?MODULE{prefix_msgs = {R, P}} = State) ->
- State#?MODULE{prefix_msgs = {length(R), R, length(P), P}};
-convert_prefix_msgs(State) ->
- State.
-
%% creates a dehydrated version of the current state to be cached and
%% potentially used to for a snaphot at a later point
dehydrate_state(#?MODULE{messages = Messages,
consumers = Consumers,
returns = Returns,
- low_msg_num = Low,
- next_msg_num = Next,
+ % low_msg_num = Low,
+ % next_msg_num = Next,
prefix_msgs = {PRCnt, PrefRet0, PPCnt, PrefMsg0},
waiting_consumers = Waiting0} = State) ->
RCnt = lqueue:len(Returns),
@@ -1703,34 +1672,34 @@ dehydrate_state(#?MODULE{messages = Messages,
[],
lqueue:to_list(Returns)),
PrefRet = PrefRet0 ++ PrefRet1,
- PrefMsgsSuff = dehydrate_messages(Low, Next - 1, Messages, []),
+ PrefMsgsSuff = dehydrate_messages(Messages, []),
%% prefix messages are not populated in normal operation only after
%% recovering from a snapshot
PrefMsgs = PrefMsg0 ++ PrefMsgsSuff,
Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0],
- State#?MODULE{messages = #{},
+ State#?MODULE{messages = lqueue:new(),
ra_indexes = rabbit_fifo_index:empty(),
release_cursors = lqueue:new(),
- low_msg_num = undefined,
+ % low_msg_num = undefined,
consumers = maps:map(fun (_, C) ->
dehydrate_consumer(C)
end, Consumers),
returns = lqueue:new(),
prefix_msgs = {PRCnt + RCnt, PrefRet,
- PPCnt + maps:size(Messages), PrefMsgs},
+ PPCnt + lqueue:len(Messages), PrefMsgs},
waiting_consumers = Waiting}.
-dehydrate_messages(Low, Next, _Msgs, Acc)
- when Next < Low ->
- Acc;
-dehydrate_messages(Low, Next, Msgs, Acc0) ->
- Acc = case maps:get(Next, Msgs) of
- {_RaftIdx, {_, 'empty'} = Msg} ->
- [Msg | Acc0];
- {_RaftIdx, {Header, _}} ->
- [Header | Acc0]
- end,
- dehydrate_messages(Low, Next - 1, Msgs, Acc).
+%% TODO make body recursive to avoid lists:reverse
+dehydrate_messages(Msgs0, Acc0) ->
+ {OutRes, Msgs} = lqueue:out(Msgs0),
+ case OutRes of
+ {value, {_, 'empty'} = Msg} ->
+ dehydrate_messages(Msgs, [Msg | Acc0]);
+ {value, {Header, _}} ->
+ dehydrate_messages(Msgs, [Header | Acc0]);
+ empty ->
+ lists:reverse(Acc0)
+ end.
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index ebbaa9e1eb..ec93f480dd 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -110,9 +110,7 @@
-record(cfg,
{name :: atom(),
resource :: rabbit_types:r('queue'),
- release_cursor_interval ::
- undefined | non_neg_integer() |
- {non_neg_integer(), non_neg_integer()},
+ release_cursor_interval :: option({non_neg_integer(), non_neg_integer()}),
dead_letter_handler :: option(applied_mfa()),
become_leader_handler :: option(applied_mfa()),
max_length :: option(non_neg_integer()),
@@ -132,7 +130,7 @@
-record(rabbit_fifo,
{cfg :: #cfg{},
% unassigned messages
- messages = #{} :: #{msg_in_id() => indexed_msg()},
+ messages = lqueue:new() :: lqueue:queue(),
% defines the lowest message in id available in the messages map
% that isn't a return
low_msg_num :: option(msg_in_id()),
diff --git a/src/rabbit_fifo_v0.erl b/src/rabbit_fifo_v0.erl
index 01330fe54f..52706aec1f 100644
--- a/src/rabbit_fifo_v0.erl
+++ b/src/rabbit_fifo_v0.erl
@@ -53,6 +53,9 @@
%% misc
dehydrate_state/1,
normalize/1,
+ normalize_for_v1/1,
+ %% getters for coversions
+ messages_map/1,
%% protocol helpers
make_enqueue/3,
@@ -124,6 +127,7 @@
-spec init(config()) -> state().
init(#{name := Name,
queue_resource := Resource} = Conf) ->
+ rabbit_log:info("rabbit_fifo: init v0 ~p", [Conf]),
update_config(Conf, #?MODULE{cfg = #cfg{name = Name,
resource = Resource}}).
@@ -1754,6 +1758,22 @@ is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
+normalize_for_v1(#?MODULE{cfg = Cfg} = State) ->
+ %% run all v0 conversions so that v1 does not have to have this code
+ RCI = case Cfg of
+ #cfg{release_cursor_interval = {_, _} = R} ->
+ R;
+ #cfg{release_cursor_interval = undefined} ->
+ {?RELEASE_CURSOR_EVERY, ?RELEASE_CURSOR_EVERY};
+ #cfg{release_cursor_interval = C} ->
+ {?RELEASE_CURSOR_EVERY, C}
+ end,
+ convert_prefix_msgs(
+ State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCI}}).
+
+messages_map(#?MODULE{messages = Messages}) ->
+ Messages.
+
-spec make_enqueue(option(pid()), option(msg_seqno()), raw_msg()) -> protocol().
make_enqueue(Pid, Seq, Msg) ->
#enqueue{pid = Pid, seq = Seq, msg = Msg}.
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 9f2daac762..e3dfb29e7d 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -380,7 +380,7 @@ cancelled_checkout_out_test(_) ->
{State1, _} = check_auto(Cid, 2, State0),
% cancelled checkout should not return pending messages to queue
{State2, _, _} = apply(meta(3), rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
- ?assertEqual(1, maps:size(State2#rabbit_fifo.messages)),
+ ?assertEqual(1, lqueue:len(State2#rabbit_fifo.messages)),
?assertEqual(0, lqueue:len(State2#rabbit_fifo.returns)),
{State3, {dequeue, empty}} =
@@ -436,13 +436,13 @@ down_with_noconnection_returns_unack_test(_) ->
Pid = spawn(fun() -> ok end),
Cid = {<<"down_with_noconnect">>, Pid},
{State0, _} = enq(1, 1, second, test_init(test)),
- ?assertEqual(1, maps:size(State0#rabbit_fifo.messages)),
+ ?assertEqual(1, lqueue:len(State0#rabbit_fifo.messages)),
?assertEqual(0, lqueue:len(State0#rabbit_fifo.returns)),
{State1, {_, _}} = deq(2, Cid, unsettled, State0),
- ?assertEqual(0, maps:size(State1#rabbit_fifo.messages)),
+ ?assertEqual(0, lqueue:len(State1#rabbit_fifo.messages)),
?assertEqual(0, lqueue:len(State1#rabbit_fifo.returns)),
{State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
- ?assertEqual(0, maps:size(State2a#rabbit_fifo.messages)),
+ ?assertEqual(0, lqueue:len(State2a#rabbit_fifo.messages)),
?assertEqual(1, lqueue:len(State2a#rabbit_fifo.returns)),
?assertMatch(#consumer{checked_out = Ch,
status = suspected_down}
@@ -539,7 +539,7 @@ duplicate_delivery_test(_) ->
{#rabbit_fifo{ra_indexes = RaIdxs,
messages = Messages}, _} = enq(2, 1, first, State0),
?assertEqual(1, rabbit_fifo_index:size(RaIdxs)),
- ?assertEqual(1, maps:size(Messages)),
+ ?assertEqual(1, lqueue:len(Messages)),
ok.
state_enter_file_handle_leader_reservation_test(_) ->
@@ -622,7 +622,7 @@ down_noproc_returns_checked_out_in_order_test(_) ->
{FS, _} = enq(Num, Num, Num, FS0),
FS
end, S0, lists:seq(1, 100)),
- ?assertEqual(100, maps:size(S1#rabbit_fifo.messages)),
+ ?assertEqual(100, lqueue:len(S1#rabbit_fifo.messages)),
Cid = {<<"cid">>, self()},
{S2, _} = check(Cid, 101, 1000, S1),
#consumer{checked_out = Checked} = maps:get(Cid, S2#rabbit_fifo.consumers),
@@ -643,7 +643,7 @@ down_noconnection_returns_checked_out_test(_) ->
{FS, _} = enq(Num, Num, Num, FS0),
FS
end, S0, lists:seq(1, NumMsgs)),
- ?assertEqual(NumMsgs, maps:size(S1#rabbit_fifo.messages)),
+ ?assertEqual(NumMsgs, lqueue:len(S1#rabbit_fifo.messages)),
Cid = {<<"cid">>, self()},
{S2, _} = check(Cid, 101, 1000, S1),
#consumer{checked_out = Checked} = maps:get(Cid, S2#rabbit_fifo.consumers),
@@ -1386,6 +1386,29 @@ aux_test(_) ->
?assert(X > 0.0),
ok.
+
+%% machine version conversion test
+
+machine_version_test(_) ->
+ V0 = rabbit_fifo_v0,
+ S0 = V0:init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}),
+ Idx = 1,
+ {#rabbit_fifo{}, ok, []} = apply(meta(Idx), {machine_version, 0, 1}, S0),
+
+ Cid = {atom_to_binary(?FUNCTION_NAME, utf8), self()},
+ Entries = [
+ {1, rabbit_fifo_v0:make_enqueue(self(), 1, banana)},
+ {2, rabbit_fifo_v0:make_enqueue(self(), 2, apple)},
+ {3, rabbit_fifo_v0:make_checkout(Cid, {auto, 1, unsettled}, #{})}
+ ],
+ {S1, _Effects} = rabbit_fifo_v0_SUITE:run_log(S0, Entries),
+ {#rabbit_fifo{messages = Msgs}, ok, []} = apply(meta(Idx),
+ {machine_version, 0, 1}, S1),
+ %% validate message conversion to lqueue
+ ?assertEqual(1, lqueue:len(Msgs)),
+ ok.
+
%% Utility
init(Conf) -> rabbit_fifo:init(Conf).