summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-09-03 13:13:39 +0100
committerkjnilsson <knilsson@pivotal.io>2020-09-07 09:42:11 +0100
commit3ee9af8e8bf5f8fb7e49a0c1b2a5bc4e416f0411 (patch)
treef3c8ccdb35d057c2ca76cc78a4abf5da92b2d349
parente50cb1d04dac1aa1b251f45bcf622cab1f1858ec (diff)
downloadrabbitmq-server-git-3ee9af8e8bf5f8fb7e49a0c1b2a5bc4e416f0411.tar.gz
rabbit_fifo: lower min snapshot interval
As the current default is way to large for queues that never empty but only keep a small backlog.
-rw-r--r--src/rabbit_fifo.erl34
-rw-r--r--src/rabbit_fifo.hrl2
2 files changed, 17 insertions, 19 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index b59f4b04f8..fef1104a8b 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -142,12 +142,7 @@ update_config(Conf, State) ->
competing
end,
Cfg = State#?MODULE.cfg,
- RCISpec = case State#?MODULE.cfg of
- #cfg{release_cursor_interval = undefined} ->
- {RCI, RCI};
- #cfg{release_cursor_interval = {_, C}} ->
- {RCI, C}
- end,
+ RCISpec = {RCI, RCI},
LastActive = maps:get(created, Conf, undefined),
State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCISpec,
@@ -367,12 +362,9 @@ apply(#{index := Index}, #purge{},
msgs_ready_in_memory = 0},
Effects0 = [garbage_collection],
Reply = {purge, Total},
- case evaluate_limit(Index, false, State0, State1, Effects0) of
- {State, true, Effects} ->
- update_smallest_raft_index(Index, Reply, State, Effects);
- {State, false, Effects} ->
- {State, Reply, Effects}
- end;
+ {State, _, Effects} = evaluate_limit(Index, false, State0,
+ State1, Effects0),
+ update_smallest_raft_index(Index, Reply, State, Effects);
apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
@@ -720,6 +712,7 @@ overview(#?MODULE{consumers = Cons,
num_ready_messages => messages_ready(State),
num_messages => messages_total(State),
num_release_cursors => lqueue:len(Cursors),
+ release_cursors => [I || {_, I, _} <- lqueue:to_list(Cursors)],
release_cursor_enqueue_counter => EnqCount,
enqueue_message_bytes => EnqueueBytes,
checkout_message_bytes => CheckoutBytes}.
@@ -1118,9 +1111,9 @@ append_to_master_index(RaftIdx,
State#?MODULE{ra_indexes = Indexes}.
-incr_enqueue_count(#?MODULE{enqueue_count = C,
+incr_enqueue_count(#?MODULE{enqueue_count = EC,
cfg = #cfg{release_cursor_interval = {_Base, C}}
- } = State0) ->
+ } = State0) when EC >= C->
%% this will trigger a dehydrated version of the state to be stored
%% at this raft index for potential future snapshot generation
%% Q: Why don't we just stash the release cursor here?
@@ -1146,8 +1139,7 @@ maybe_store_dehydrated_state(RaftIdx,
0 -> 0;
_ ->
Total = messages_total(State0),
- min(max(Total, Base),
- ?RELEASE_CURSOR_EVERY_MAX)
+ min(max(Total, Base), ?RELEASE_CURSOR_EVERY_MAX)
end,
State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval =
{Base, Interval}}},
@@ -1307,7 +1299,8 @@ update_smallest_raft_index(Idx, State, Effects) ->
update_smallest_raft_index(Idx, ok, State, Effects).
update_smallest_raft_index(IncomingRaftIdx, Reply,
- #?MODULE{ra_indexes = Indexes,
+ #?MODULE{cfg = Cfg,
+ ra_indexes = Indexes,
release_cursors = Cursors0} = State0,
Effects) ->
case rabbit_fifo_index:size(Indexes) of
@@ -1315,7 +1308,12 @@ update_smallest_raft_index(IncomingRaftIdx, Reply,
% there are no messages on queue anymore and no pending enqueues
% we can forward release_cursor all the way until
% the last received command, hooray
- State = State0#?MODULE{release_cursors = lqueue:new()},
+ %% reset the release cursor interval
+ #cfg{release_cursor_interval = {Base, _}} = Cfg,
+ RCI = {Base, Base},
+ State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCI},
+ release_cursors = lqueue:new(),
+ enqueue_count = 0},
{State, Reply, Effects ++ [{release_cursor, IncomingRaftIdx, State}]};
_ ->
Smallest = rabbit_fifo_index:smallest(Indexes),
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index 138b19d390..4c87167ea1 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -67,7 +67,7 @@
-type applied_mfa() :: {module(), atom(), list()}.
% represents a partially applied module call
--define(RELEASE_CURSOR_EVERY, 64000).
+-define(RELEASE_CURSOR_EVERY, 2048).
-define(RELEASE_CURSOR_EVERY_MAX, 3200000).
-define(USE_AVG_HALF_LIFE, 10000.0).
%% an average QQ without any message uses about 100KB so setting this limit