summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-03-05 11:55:28 +0000
committerkjnilsson <knilsson@pivotal.io>2020-03-05 11:55:28 +0000
commit34f3a0066e050f3eaaebf67d73db462e4262b699 (patch)
treee3dca0d8d0092f972c9d24ef7e72ff156057f5ea
parent23745d6a880e10836145c429518d3e5faded7c3d (diff)
downloadrabbitmq-server-git-34f3a0066e050f3eaaebf67d73db462e4262b699.tar.gz
rabbit_fifo: change release cursor calculationrabbit-fifo-release-cursor-fix
Release cursors are taken less frequently the more messages there are on queue. This changes how this is calculated to simply use the message count rather than some multiple of the currently captured release cursors. This is more consistent and doesn't depend on non snapshottable state.
-rw-r--r--src/rabbit_fifo.erl34
-rw-r--r--src/rabbit_fifo.hrl1
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl21
3 files changed, 48 insertions, 8 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index d0b56961ec..7c7eba257d 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -609,15 +609,28 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
overview(#?MODULE{consumers = Cons,
enqueuers = Enqs,
release_cursors = Cursors,
+ enqueue_count = EnqCount,
msg_bytes_enqueue = EnqueueBytes,
- msg_bytes_checkout = CheckoutBytes} = State) ->
+ msg_bytes_checkout = CheckoutBytes,
+ cfg = Cfg} = State) ->
+ Conf = #{name => Cfg#cfg.name,
+ resource => Cfg#cfg.resource,
+ release_cursor_interval => Cfg#cfg.release_cursor_interval,
+ dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler,
+ max_length => Cfg#cfg.max_length,
+ max_bytes => Cfg#cfg.max_bytes,
+ consumer_strategy => Cfg#cfg.consumer_strategy,
+ max_in_memory_length => Cfg#cfg.max_in_memory_length,
+ max_in_memory_bytes => Cfg#cfg.max_in_memory_bytes},
#{type => ?MODULE,
+ config => Conf,
num_consumers => maps:size(Cons),
num_checked_out => num_checked_out(State),
num_enqueuers => maps:size(Enqs),
num_ready_messages => messages_ready(State),
num_messages => messages_total(State),
num_release_cursors => lqueue:len(Cursors),
+ release_crusor_enqueue_counter => EnqCount,
enqueue_message_bytes => EnqueueBytes,
checkout_message_bytes => CheckoutBytes}.
@@ -1022,15 +1035,20 @@ maybe_store_dehydrated_state(RaftIdx,
%% the incoming enqueue must already have been dropped
State0;
true ->
- State = convert_prefix_msgs(State0),
- {Time, Dehydrated} = timer:tc(fun () -> dehydrate_state(State) end),
- rabbit_log:info("dehydrating state took ~bms", [Time div 1000]),
+ Interval = case Base of
+ 0 -> 0;
+ _ ->
+ Total = messages_total(State0),
+ min(max(Total, Base),
+ ?RELEASE_CURSOR_EVERY_MAX)
+ end,
+ State = convert_prefix_msgs(
+ State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval =
+ {Base, Interval}}}),
+ Dehydrated = dehydrate_state(State),
Cursor = {release_cursor, RaftIdx, Dehydrated},
Cursors = lqueue:in(Cursor, Cursors0),
- Interval = lqueue:len(Cursors) * Base,
- State#?MODULE{release_cursors = Cursors,
- cfg = Cfg#cfg{release_cursor_interval =
- {Base, Interval}}}
+ State#?MODULE{release_cursors = Cursors}
end;
maybe_store_dehydrated_state(RaftIdx,
#?MODULE{cfg =
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index b9e967cbb1..2a8899d593 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -68,6 +68,7 @@
% represents a partially applied module call
-define(RELEASE_CURSOR_EVERY, 64000).
+-define(RELEASE_CURSOR_EVERY_MAX, 3200000).
-define(USE_AVG_HALF_LIFE, 10000.0).
-record(consumer,
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 2413e3391c..23522e71f9 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -45,6 +45,7 @@ all_tests() ->
scenario19,
scenario20,
scenario21,
+ scenario22,
single_active,
single_active_01,
single_active_02,
@@ -376,6 +377,24 @@ scenario21(_Config) ->
Commands),
ok.
+scenario22(_Config) ->
+ % C1Pid = c:pid(0,883,1),
+ % C1 = {<<>>, C1Pid},
+ E = c:pid(0,176,1),
+ Commands = [
+ make_enqueue(E,1,<<"1">>),
+ make_enqueue(E,2,<<"2">>),
+ make_enqueue(E,3,<<"3">>),
+ make_enqueue(E,4,<<"4">>),
+ make_enqueue(E,5,<<"5">>)
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ release_cursor_interval => 1,
+ max_length => 3,
+ dead_letter_handler => {?MODULE, banana, []}},
+ Commands),
+ ok.
+
single_active_01(_Config) ->
C1Pid = test_util:fake_pid(rabbit@fake_node1),
C1 = {<<0>>, C1Pid},
@@ -1093,6 +1112,8 @@ run_proper(Fun, Args, NumTests) ->
run_snapshot_test(Conf, Commands) ->
%% create every incremental permutation of the commands lists
%% and run the snapshot tests against that
+ ct:pal("running snapshot test with ~b commands using config ~p",
+ [length(Commands), Conf]),
[begin
% ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]),
run_snapshot_test0(Conf, C)