summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-09-16 12:46:45 +0100
committerdcorbacho <dparracorbacho@piotal.io>2020-09-16 12:55:34 +0100
commit15efebd8dc54de87fc58b03cb0eadffe08d4ea42 (patch)
tree18f10a4adf6923ddd1555d65dd4097c79c213969
parent48f38874d701e34727b4f482ea42bb0e3e82c460 (diff)
downloadrabbitmq-server-git-15efebd8dc54de87fc58b03cb0eadffe08d4ea42.tar.gz
Quorum queue: API to reclaim quorum memory
Operator can choose to force a garbage collection and flush of the WAL on a specific quorum queue
-rw-r--r--src/rabbit_fifo.erl34
-rw-r--r--src/rabbit_quorum_queue.erl14
2 files changed, 46 insertions, 2 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 7745789593..edc1b62e4a 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -58,7 +58,8 @@
make_credit/4,
make_purge/0,
make_purge_nodes/1,
- make_update_config/1
+ make_update_config/1,
+ make_garbage_collection/0
]).
%% command records representing all the protocol actions that are supported
@@ -82,6 +83,7 @@
-record(purge, {}).
-record(purge_nodes, {nodes :: [node()]}).
-record(update_config, {config :: config()}).
+-record(garbage_collection, {}).
-opaque protocol() ::
#enqueue{} |
@@ -93,7 +95,8 @@
#credit{} |
#purge{} |
#purge_nodes{} |
- #update_config{}.
+ #update_config{} |
+ #garbage_collection{}.
-type command() :: protocol() | ra_machine:builtin_command().
%% all the command types supported by ra fifo
@@ -365,6 +368,8 @@ apply(#{index := Index}, #purge{},
{State, _, Effects} = evaluate_limit(Index, false, State0,
State1, Effects0),
update_smallest_raft_index(Index, Reply, State, Effects);
+apply(_Meta, #garbage_collection{}, State) ->
+ {State, ok, [{aux, garbage_collection}]};
apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
@@ -752,6 +757,12 @@ init_aux(Name) when is_atom(Name) ->
#aux{name = Name,
utilisation = {inactive, Now, 1, 1.0}}.
+handle_aux(leader, _, garbage_collection, State, Log, _MacState) ->
+ ra_log_wal:force_roll_over(ra_log_wal),
+ {no_reply, State, Log};
+handle_aux(follower, _, garbage_collection, State, Log, MacState) ->
+ ra_log_wal:force_roll_over(ra_log_wal),
+ {no_reply, force_eval_gc(Log, MacState, State), Log};
handle_aux(_RaState, cast, Cmd, #aux{name = Name,
utilisation = Use0} = State0,
Log, MacState) ->
@@ -784,6 +795,22 @@ eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}} = MacState,
AuxState
end.
+force_eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}},
+ #aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) ->
+ {Idx, _} = ra_log:last_index_term(Log),
+ {memory, Mem} = erlang:process_info(self(), memory),
+ case Idx > LastGcIdx of
+ true ->
+ garbage_collect(),
+ {memory, MemAfter} = erlang:process_info(self(), memory),
+ rabbit_log:debug("~s: full GC sweep complete. "
+ "Process memory reduced from ~.2fMB to ~.2fMB.",
+ [rabbit_misc:rs(QR), Mem/?MB, MemAfter/?MB]),
+ AuxState#aux{gc = Gc#aux_gc{last_raft_idx = Idx}};
+ false ->
+ AuxState
+ end.
+
%%% Queries
query_messages_ready(State) ->
@@ -1893,6 +1920,9 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) ->
-spec make_purge() -> protocol().
make_purge() -> #purge{}.
+-spec make_garbage_collection() -> protocol().
+make_garbage_collection() -> #garbage_collection{}.
+
-spec make_purge_nodes([node()]) -> protocol().
make_purge_nodes(Nodes) ->
#purge_nodes{nodes = Nodes}.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 327fc11a5e..88de7b50e0 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -40,6 +40,7 @@
-export([repair_amqqueue_nodes/1,
repair_amqqueue_nodes/2
]).
+-export([reclaim_memory/2]).
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
@@ -1112,6 +1113,19 @@ file_handle_other_reservation() ->
file_handle_release_reservation() ->
file_handle_cache:release_reservation().
+-spec reclaim_memory(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> ok | {error, term()}.
+reclaim_memory(Vhost, QueueName) ->
+ QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue},
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, Q} when ?amqqueue_is_classic(Q) ->
+ {error, classic_queue_not_supported};
+ {ok, Q} when ?amqqueue_is_quorum(Q) ->
+ ok = ra:pipeline_command(amqqueue:get_pid(Q),
+ rabbit_fifo:make_garbage_collection());
+ {error, not_found} = E ->
+ E
+ end.
+
%%----------------------------------------------------------------------------
dlx_mfa(Q) ->
DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>,