summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorD Corbacho <diana@rabbitmq.com>2020-03-05 13:49:37 +0100
committerGitHub <noreply@github.com>2020-03-05 13:49:37 +0100
commit377792984be10411f6e712c9e1045d13d802c27c (patch)
tree1c884be5717bd6698154fc67b5b88ec465a307c3
parentf1cc93a0bb951c2f10e16fe6e0a068b4c172e0f6 (diff)
parent34f3a0066e050f3eaaebf67d73db462e4262b699 (diff)
downloadrabbitmq-server-git-377792984be10411f6e712c9e1045d13d802c27c.tar.gz
Merge pull request #2267 from rabbitmq/rabbit-fifo-release-cursor-fix
rabbit_fifo: change release cursor calculation
-rw-r--r--src/rabbit_fifo.erl34
-rw-r--r--src/rabbit_fifo.hrl1
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl21
3 files changed, 48 insertions, 8 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index d0b56961ec..7c7eba257d 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -609,15 +609,28 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
overview(#?MODULE{consumers = Cons,
enqueuers = Enqs,
release_cursors = Cursors,
+ enqueue_count = EnqCount,
msg_bytes_enqueue = EnqueueBytes,
- msg_bytes_checkout = CheckoutBytes} = State) ->
+ msg_bytes_checkout = CheckoutBytes,
+ cfg = Cfg} = State) ->
+ Conf = #{name => Cfg#cfg.name,
+ resource => Cfg#cfg.resource,
+ release_cursor_interval => Cfg#cfg.release_cursor_interval,
+ dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler,
+ max_length => Cfg#cfg.max_length,
+ max_bytes => Cfg#cfg.max_bytes,
+ consumer_strategy => Cfg#cfg.consumer_strategy,
+ max_in_memory_length => Cfg#cfg.max_in_memory_length,
+ max_in_memory_bytes => Cfg#cfg.max_in_memory_bytes},
#{type => ?MODULE,
+ config => Conf,
num_consumers => maps:size(Cons),
num_checked_out => num_checked_out(State),
num_enqueuers => maps:size(Enqs),
num_ready_messages => messages_ready(State),
num_messages => messages_total(State),
num_release_cursors => lqueue:len(Cursors),
+ release_crusor_enqueue_counter => EnqCount,
enqueue_message_bytes => EnqueueBytes,
checkout_message_bytes => CheckoutBytes}.
@@ -1022,15 +1035,20 @@ maybe_store_dehydrated_state(RaftIdx,
%% the incoming enqueue must already have been dropped
State0;
true ->
- State = convert_prefix_msgs(State0),
- {Time, Dehydrated} = timer:tc(fun () -> dehydrate_state(State) end),
- rabbit_log:info("dehydrating state took ~bms", [Time div 1000]),
+ Interval = case Base of
+ 0 -> 0;
+ _ ->
+ Total = messages_total(State0),
+ min(max(Total, Base),
+ ?RELEASE_CURSOR_EVERY_MAX)
+ end,
+ State = convert_prefix_msgs(
+ State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval =
+ {Base, Interval}}}),
+ Dehydrated = dehydrate_state(State),
Cursor = {release_cursor, RaftIdx, Dehydrated},
Cursors = lqueue:in(Cursor, Cursors0),
- Interval = lqueue:len(Cursors) * Base,
- State#?MODULE{release_cursors = Cursors,
- cfg = Cfg#cfg{release_cursor_interval =
- {Base, Interval}}}
+ State#?MODULE{release_cursors = Cursors}
end;
maybe_store_dehydrated_state(RaftIdx,
#?MODULE{cfg =
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index b9e967cbb1..2a8899d593 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -68,6 +68,7 @@
% represents a partially applied module call
-define(RELEASE_CURSOR_EVERY, 64000).
+-define(RELEASE_CURSOR_EVERY_MAX, 3200000).
-define(USE_AVG_HALF_LIFE, 10000.0).
-record(consumer,
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 2413e3391c..23522e71f9 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -45,6 +45,7 @@ all_tests() ->
scenario19,
scenario20,
scenario21,
+ scenario22,
single_active,
single_active_01,
single_active_02,
@@ -376,6 +377,24 @@ scenario21(_Config) ->
Commands),
ok.
+scenario22(_Config) ->
+ % C1Pid = c:pid(0,883,1),
+ % C1 = {<<>>, C1Pid},
+ E = c:pid(0,176,1),
+ Commands = [
+ make_enqueue(E,1,<<"1">>),
+ make_enqueue(E,2,<<"2">>),
+ make_enqueue(E,3,<<"3">>),
+ make_enqueue(E,4,<<"4">>),
+ make_enqueue(E,5,<<"5">>)
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ release_cursor_interval => 1,
+ max_length => 3,
+ dead_letter_handler => {?MODULE, banana, []}},
+ Commands),
+ ok.
+
single_active_01(_Config) ->
C1Pid = test_util:fake_pid(rabbit@fake_node1),
C1 = {<<0>>, C1Pid},
@@ -1093,6 +1112,8 @@ run_proper(Fun, Args, NumTests) ->
run_snapshot_test(Conf, Commands) ->
%% create every incremental permutation of the commands lists
%% and run the snapshot tests against that
+ ct:pal("running snapshot test with ~b commands using config ~p",
+ [length(Commands), Conf]),
[begin
% ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]),
run_snapshot_test0(Conf, C)