diff options
author | Michael Klishin <klishinm@vmware.com> | 2020-09-20 18:49:13 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-20 18:49:13 +0300 |
commit | 36078d7fd9f9d73cece7cda785745ef8774ef507 (patch) | |
tree | 1e8a8860007c829edc8c4e1ad21419970fe99edc | |
parent | 9b1dd6c0dde783570a70bf63f52200a5aa8ca53a (diff) | |
parent | 13a2d7b70ca4d456d3f674a842d68c6faa17077c (diff) | |
download | rabbitmq-server-git-36078d7fd9f9d73cece7cda785745ef8774ef507.tar.gz |
Merge pull request #2448 from rabbitmq/reclaim-qq-memory
Quorum queue: API to reclaim quorum memory
-rw-r--r-- | src/rabbit_fifo.erl | 36 | ||||
-rw-r--r-- | src/rabbit_fifo_v0.erl | 2 | ||||
-rw-r--r-- | src/rabbit_quorum_queue.erl | 14 |
3 files changed, 48 insertions, 4 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 7745789593..d03a997c85 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -58,7 +58,8 @@ make_credit/4, make_purge/0, make_purge_nodes/1, - make_update_config/1 + make_update_config/1, + make_garbage_collection/0 ]). %% command records representing all the protocol actions that are supported @@ -82,6 +83,7 @@ -record(purge, {}). -record(purge_nodes, {nodes :: [node()]}). -record(update_config, {config :: config()}). +-record(garbage_collection, {}). -opaque protocol() :: #enqueue{} | @@ -93,7 +95,8 @@ #credit{} | #purge{} | #purge_nodes{} | - #update_config{}. + #update_config{} | + #garbage_collection{}. -type command() :: protocol() | ra_machine:builtin_command(). %% all the command types supported by ra fifo @@ -365,6 +368,8 @@ apply(#{index := Index}, #purge{}, {State, _, Effects} = evaluate_limit(Index, false, State0, State1, Effects0), update_smallest_raft_index(Index, Reply, State, Effects); +apply(_Meta, #garbage_collection{}, State) -> + {State, ok, [{aux, garbage_collection}]}; apply(#{system_time := Ts} = Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, @@ -752,6 +757,12 @@ init_aux(Name) when is_atom(Name) -> #aux{name = Name, utilisation = {inactive, Now, 1, 1.0}}. +handle_aux(leader, _, garbage_collection, State, Log, _MacState) -> + ra_log_wal:force_roll_over(ra_log_wal), + {no_reply, State, Log}; +handle_aux(follower, _, garbage_collection, State, Log, MacState) -> + ra_log_wal:force_roll_over(ra_log_wal), + {no_reply, force_eval_gc(Log, MacState, State), Log}; handle_aux(_RaState, cast, Cmd, #aux{name = Name, utilisation = Use0} = State0, Log, MacState) -> @@ -777,13 +788,29 @@ eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}} = MacState, garbage_collect(), {memory, MemAfter} = erlang:process_info(self(), memory), rabbit_log:debug("~s: full GC sweep complete. " - "Process memory reduced from ~.2fMB to ~.2fMB.", + "Process memory changed from ~.2fMB to ~.2fMB.", [rabbit_misc:rs(QR), Mem/?MB, MemAfter/?MB]), AuxState#aux{gc = Gc#aux_gc{last_raft_idx = Idx}}; _ -> AuxState end. +force_eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}}, + #aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) -> + {Idx, _} = ra_log:last_index_term(Log), + {memory, Mem} = erlang:process_info(self(), memory), + case Idx > LastGcIdx of + true -> + garbage_collect(), + {memory, MemAfter} = erlang:process_info(self(), memory), + rabbit_log:debug("~s: full GC sweep complete. " + "Process memory changed from ~.2fMB to ~.2fMB.", + [rabbit_misc:rs(QR), Mem/?MB, MemAfter/?MB]), + AuxState#aux{gc = Gc#aux_gc{last_raft_idx = Idx}}; + false -> + AuxState + end. + %%% Queries query_messages_ready(State) -> @@ -1893,6 +1920,9 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) -> -spec make_purge() -> protocol(). make_purge() -> #purge{}. +-spec make_garbage_collection() -> protocol(). +make_garbage_collection() -> #garbage_collection{}. + -spec make_purge_nodes([node()]) -> protocol(). make_purge_nodes(Nodes) -> #purge_nodes{nodes = Nodes}. diff --git a/src/rabbit_fifo_v0.erl b/src/rabbit_fifo_v0.erl index 51f6bd133e..a61f42616d 100644 --- a/src/rabbit_fifo_v0.erl +++ b/src/rabbit_fifo_v0.erl @@ -689,7 +689,7 @@ eval_gc(Log, #?STATE{cfg = #cfg{resource = QR}} = MacState, garbage_collect(), {memory, MemAfter} = erlang:process_info(self(), memory), rabbit_log:debug("~s: full GC sweep complete. " - "Process memory reduced from ~.2fMB to ~.2fMB.", + "Process memory changed from ~.2fMB to ~.2fMB.", [rabbit_misc:rs(QR), Mem/?MB, MemAfter/?MB]), AuxState#aux{gc = Gc#aux_gc{last_raft_idx = Idx}}; _ -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index c64ccae9a8..bb4c320b1f 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -40,6 +40,7 @@ -export([repair_amqqueue_nodes/1, repair_amqqueue_nodes/2 ]). +-export([reclaim_memory/2]). -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). @@ -1111,6 +1112,19 @@ file_handle_other_reservation() -> file_handle_release_reservation() -> file_handle_cache:release_reservation(). +-spec reclaim_memory(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> ok | {error, term()}. +reclaim_memory(Vhost, QueueName) -> + QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue}, + case rabbit_amqqueue:lookup(QName) of + {ok, Q} when ?amqqueue_is_classic(Q) -> + {error, classic_queue_not_supported}; + {ok, Q} when ?amqqueue_is_quorum(Q) -> + ok = ra:pipeline_command(amqqueue:get_pid(Q), + rabbit_fifo:make_garbage_collection()); + {error, not_found} = E -> + E + end. + %%---------------------------------------------------------------------------- dlx_mfa(Q) -> DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, |