diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-01-13 12:05:33 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-13 12:05:33 +0000 |
commit | 0ef00f88edd1dd1a00d349c78d5cec75e0d6c8e7 (patch) | |
tree | 3be76ac2f3c1330dc33de337badd70eb80d850d3 | |
parent | 015ec8b47fabf2e72f5edd68ae5b1af74a0358d0 (diff) | |
parent | 61de203fc54b1138fab44fd324bc7dca434cfc88 (diff) | |
download | rabbitmq-server-git-0ef00f88edd1dd1a00d349c78d5cec75e0d6c8e7.tar.gz |
Merge pull request #2695 from rabbitmq/update-retention
Update retention when only stream retention policy has changed
-rw-r--r-- | deps/rabbit/src/rabbit_stream_coordinator.erl | 110 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 31 | ||||
-rw-r--r-- | deps/rabbit/test/quorum_queue_utils.erl | 60 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_stream_queue_SUITE.erl | 36 |
4 files changed, 198 insertions, 39 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 93053bc04e..6ae819171a 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -41,7 +41,8 @@ phase_start_new_leader/1, phase_stop_replicas/1, phase_start_replica/3, - phase_delete_replica/2]). + phase_delete_replica/2, + phase_update_retention/4]). -export([log_overview/1]). @@ -53,6 +54,7 @@ -record(?MODULE, {streams, monitors}). +-include("amqqueue.hrl"). start() -> Nodes = rabbit_mnesia:cluster_nodes(all), ServerId = {?MODULE, node()}, @@ -108,8 +110,11 @@ add_replica(StreamId, Node) -> process_command({start_replica, #{stream_id => StreamId, node => Node, retries => 1}}). -policy_changed(StreamId) -> - process_command({policy_changed, #{stream_id => StreamId}}). +policy_changed(Q) when ?is_amqqueue(Q) -> + StreamId = maps:get(name, amqqueue:get_type_state(Q)), + process_command({policy_changed, #{stream_id => StreamId, + queue => Q, + retries => 1}}). delete_replica(StreamId, Node) -> process_command({delete_replica, #{stream_id => StreamId, node => Node}}). @@ -174,19 +179,37 @@ init(_Conf) -> #?MODULE{streams = #{}, monitors = #{}}. -apply(#{from := From}, {policy_changed, #{stream_id := StreamId}} = Cmd, +apply(Meta, {policy_changed, #{stream_id := StreamId, + queue := Q, + retries := Retries}} = Cmd, #?MODULE{streams = Streams0} = State) -> + From = maps:get(from, Meta, undefined), case maps:get(StreamId, Streams0, undefined) of undefined -> {State, ok, []}; #{conf := Conf, - state := running} -> - case rabbit_stream_queue:update_stream_conf(Conf) of + state := running} = SState0 -> + case rabbit_stream_queue:update_stream_conf(Q, Conf) of Conf -> %% No changes, ensure we only trigger an election if it's a must {State, ok, []}; - _ -> - {State, ok, [{mod_call, osiris_writer, stop, [Conf]}]} + #{retention := Retention, + leader_pid := Pid, + replica_pids := Pids} = Conf0 -> + case maps:remove(retention, Conf) == maps:remove(retention, Conf0) of + true -> + %% Only retention policy has changed, it doesn't need a full restart + Phase = phase_update_retention, + %% TODO do it over replicas too + PhaseArgs = [[Pid | Pids], Retention, Conf0, Retries], + SState = update_stream_state(From, update_retention, Phase, PhaseArgs, SState0), + rabbit_log:debug("rabbit_stream_coordinator: ~p entering ~p on node ~p", + [StreamId, Phase, node()]), + {State#?MODULE{streams = Streams0#{StreamId => SState}}, '$ra_no_reply', + [{aux, {phase, StreamId, Phase, PhaseArgs}}]}; + false -> + {State, ok, [{mod_call, osiris_writer, stop, [Conf]}]} + end end; SState0 -> Streams = maps:put(StreamId, add_pending_cmd(From, Cmd, SState0), Streams0), @@ -253,6 +276,23 @@ apply(_Meta, {start_replica_failed, StreamId, Node, Retries, Reply}, ?RESTART_TIMEOUT * Retries}], State#?MODULE{streams = Streams}) end; +apply(_Meta, {update_retention_failed, StreamId, Q, Retries, Reply}, + #?MODULE{streams = Streams0} = State) -> + rabbit_log:debug("rabbit_stream_coordinator: ~w update retention failed", [StreamId]), + case maps:get(StreamId, Streams0, undefined) of + undefined -> + {State, {error, not_found}, []}; + #{reply_to := From} = SState -> + Streams = Streams0#{StreamId => clear_stream_state(SState)}, + reply_and_run_pending( + From, StreamId, ok, Reply, + [{timer, {pipeline, + [{policy_changed, #{stream_id => StreamId, + queue => Q, + retries => Retries + 1}}]}, + ?RESTART_TIMEOUT * Retries}], + State#?MODULE{streams = Streams}) + end; apply(_Meta, {phase_finished, StreamId, Reply}, #?MODULE{streams = Streams0} = State) -> rabbit_log:debug("rabbit_stream_coordinator: ~p phase finished", [StreamId]), case maps:get(StreamId, Streams0, undefined) of @@ -385,7 +425,8 @@ apply(_Meta, {delete_cluster_reply, StreamId}, #?MODULE{streams = Streams} = Sta State = State0#?MODULE{streams = maps:remove(StreamId, Streams)}, rabbit_log:debug("rabbit_stream_coordinator: ~p finished delete_cluster_reply", [StreamId]), - Actions = [{ra, pipeline_command, [{?MODULE, node()}, Cmd]} || Cmd <- Pending], + Actions = [{mod_call, ra, pipeline_command, [{?MODULE, node()}, Cmd]} + || Cmd <- Pending], {State, ok, Actions ++ wrap_reply(From, {ok, 0})}; apply(_Meta, {down, Pid, _Reason} = Cmd, #?MODULE{streams = Streams, monitors = Monitors0} = State) -> @@ -415,7 +456,7 @@ apply(_Meta, {down, Pid, _Reason} = Cmd, #?MODULE{streams = Streams, streams = Streams#{StreamId => SState}}, ok, Events ++ [{aux, {phase, StreamId, Phase, PhaseArgs}}]}; follower -> - case rabbit_misc:is_process_alive(maps:get(leader_pid, Conf0)) of + case maps:is_key(maps:get(leader_pid, Conf0), Monitors) of true -> Phase = phase_start_replica, PhaseArgs = [node(Pid), Conf0, 1], @@ -425,11 +466,13 @@ apply(_Meta, {down, Pid, _Reason} = Cmd, #?MODULE{streams = Streams, SState0), rabbit_log:debug("rabbit_stream_coordinator: ~p replica on node ~p is down, entering ~p", [StreamId, node(Pid), Phase]), {State#?MODULE{monitors = Monitors, - streams = Streams#{StreamId => SState}}, + streams = Streams#{StreamId => SState}}, ok, [{aux, {phase, StreamId, Phase, PhaseArgs}}]}; false -> SState = SState0#{pending_cmds => Pending0 ++ [Cmd]}, - reply_and_run_pending(undefined, StreamId, ok, ok, [], State#?MODULE{streams = Streams#{StreamId => SState}}) + reply_and_run_pending(undefined, StreamId, ok, ok, [], + State#?MODULE{monitors = Monitors, + streams = Streams#{StreamId => SState}}) end end; #{pending_cmds := Pending0} = SState0 -> @@ -439,7 +482,7 @@ apply(_Meta, {down, Pid, _Reason} = Cmd, #?MODULE{streams = Streams, undefined -> {State, ok, []} end; -apply(_Meta, {start_leader_election, StreamId, NewEpoch, Offsets}, +apply(_Meta, {start_leader_election, StreamId, Q, NewEpoch, Offsets}, #?MODULE{streams = Streams} = State) -> #{conf := Conf0} = SState0 = maps:get(StreamId, Streams), #{leader_node := Leader, @@ -450,10 +493,10 @@ apply(_Meta, {start_leader_election, StreamId, NewEpoch, Offsets}, [StreamId, NewLeader]), {ReplicaPids, _} = delete_replica_pid(NewLeader, ReplicaPids0), Conf = rabbit_stream_queue:update_stream_conf( - Conf0#{epoch => NewEpoch, - leader_node => NewLeader, - replica_nodes => lists:delete(NewLeader, Replicas ++ [Leader]), - replica_pids => ReplicaPids}), + Q, Conf0#{epoch => NewEpoch, + leader_node => NewLeader, + replica_nodes => lists:delete(NewLeader, Replicas ++ [Leader]), + replica_pids => ReplicaPids}), Phase = phase_start_new_leader, PhaseArgs = [Conf], SState = SState0#{conf => Conf, @@ -763,20 +806,51 @@ phase_start_new_leader(#{name := StreamId, leader_node := Node} = Conf) -> end). phase_check_quorum(#{name := StreamId, + reference := QName, epoch := Epoch, replica_nodes := Nodes} = Conf) -> spawn(fun() -> Offsets = find_replica_offsets(Conf), case is_quorum(length(Nodes) + 1, length(Offsets)) of true -> + Q = case rabbit_amqqueue:lookup(QName) of + {ok, A} -> A; + {error, not_found} -> + undefined + end, ra:pipeline_command({?MODULE, node()}, - {start_leader_election, StreamId, Epoch + 1, Offsets}); + {start_leader_election, StreamId, + Q, Epoch + 1, Offsets}); false -> %% Let's crash this process so the monitor will restart it exit({not_enough_quorum, StreamId}) end end). +phase_update_retention(Pids0, Retention, #{name := StreamId}, Retries) -> + spawn( + fun() -> + case update_retention(Pids0, Retention) of + ok -> + ra:pipeline_command({?MODULE, node()}, {phase_finished, StreamId, ok}); + {error, Reason} -> + ra:pipeline_command({?MODULE, node()}, + {update_retention_failed, StreamId, + Retention, Retries, + {error, Reason}}) + end + end). + +update_retention([], _) -> + ok; +update_retention([Pid | Pids], Retention) -> + case osiris:update_retention(Pid, Retention) of + ok -> + update_retention(Pids, Retention); + {error, Reason} -> + {error, Reason} + end. + find_replica_offsets(#{replica_nodes := Nodes, leader_node := Leader} = Conf) -> lists:foldl( diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 223e19c713..1f789f165b 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -43,7 +43,7 @@ -export([add_replica/3, delete_replica/3]). -export([format_osiris_event/2]). --export([update_stream_conf/1]). +-export([update_stream_conf/2]). -include("rabbit.hrl"). -include("amqqueue.hrl"). @@ -142,8 +142,7 @@ purge(_) -> -spec policy_changed(amqqueue:amqqueue()) -> 'ok'. policy_changed(Q) -> - Name = maps:get(name, amqqueue:get_type_state(Q)), - _ = rabbit_stream_coordinator:policy_changed(Name), + _ = rabbit_stream_coordinator:policy_changed(Q), ok. stat(_) -> @@ -571,20 +570,18 @@ select_stream_nodes(Size, Rest, Selected) -> S = lists:nth(rand:uniform(length(Rest)), Rest), select_stream_nodes(Size - 1, lists:delete(S, Rest), [S | Selected]). -update_stream_conf(#{reference := QName} = Conf) -> - case rabbit_amqqueue:lookup(QName) of - {ok, Q} -> - MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), - MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun max_age/2, Q)), - MaxSegmentSize = args_policy_lookup(<<"max-segment-size">>, fun min/2, Q), - Retention = lists:filter(fun({_, R}) -> - R =/= undefined - end, [{max_bytes, MaxBytes}, - {max_age, MaxAge}]), - add_if_defined(max_segment_size, MaxSegmentSize, Conf#{retention => Retention}); - _ -> - Conf - end. +update_stream_conf(undefined, #{} = Conf) -> + Conf; +update_stream_conf(Q, #{} = Conf) when ?is_amqqueue(Q) -> + MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), + MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun max_age/2, Q)), + MaxSegmentSize = args_policy_lookup(<<"max-segment-size">>, fun min/2, Q), + Retention = lists:filter(fun({_, R}) -> + R =/= undefined + end, [{max_bytes, MaxBytes}, + {max_age, MaxAge}]), + add_if_defined(max_segment_size, MaxSegmentSize, + Conf#{retention => Retention}). add_if_defined(_, undefined, Map) -> Map; diff --git a/deps/rabbit/test/quorum_queue_utils.erl b/deps/rabbit/test/quorum_queue_utils.erl index 224abeeeeb..72d91d8a7d 100644 --- a/deps/rabbit/test/quorum_queue_utils.erl +++ b/deps/rabbit/test/quorum_queue_utils.erl @@ -7,6 +7,8 @@ wait_for_messages_pending_ack/3, wait_for_messages_total/3, wait_for_messages/2, + wait_for_min_messages/3, + wait_for_max_messages/3, dirty_query/3, ra_name/1, fifo_machines_use_same_version/1, @@ -75,6 +77,58 @@ wait_for_messages(Config, Stats, N) -> wait_for_messages(Config, Stats, N - 1) end. +wait_for_min_messages(Config, Queue, Msgs) -> + wait_for_min_messages(Config, Queue, Msgs, 60). + +wait_for_min_messages(Config, Queue, Msgs, 0) -> + [[_, Got]] = filter_queues([[Queue, Msgs]], + rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, ["list_queues", "name", "messages"])), + ct:pal("Got ~p messages on queue ~p", [Got, Queue]), + ?assert(binary_to_integer(Got) >= Msgs); +wait_for_min_messages(Config, Queue, Msgs, N) -> + case filter_queues([[Queue, Msgs]], + rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, ["list_queues", "name", "messages"])) of + [[_, Msgs0]] -> + case (binary_to_integer(Msgs0) >= Msgs) of + true -> + ok; + false -> + timer:sleep(500), + wait_for_min_messages(Config, Queue, Msgs, N - 1) + end; + _ -> + timer:sleep(500), + wait_for_min_messages(Config, Queue, Msgs, N - 1) + end. + +wait_for_max_messages(Config, Queue, Msgs) -> + wait_for_max_messages(Config, Queue, Msgs, 60). + +wait_for_max_messages(Config, Queue, Msgs, 0) -> + [[_, Got]] = filter_queues([[Queue, Msgs]], + rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, ["list_queues", "name", "messages"])), + ct:pal("Got ~p messages on queue ~p", [Got, Queue]), + ?assert(binary_to_integer(Got) =< Msgs); +wait_for_max_messages(Config, Queue, Msgs, N) -> + case filter_queues([[Queue, Msgs]], + rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, ["list_queues", "name", "messages"])) of + [[_, Msgs0]] -> + case (binary_to_integer(Msgs0) =< Msgs) of + true -> + ok; + false -> + timer:sleep(500), + wait_for_max_messages(Config, Queue, Msgs, N - 1) + end; + _ -> + timer:sleep(500), + wait_for_max_messages(Config, Queue, Msgs, N - 1) + end. + dirty_query(Servers, QName, Fun) -> lists:map( fun(N) -> @@ -90,9 +144,9 @@ ra_name(Q) -> binary_to_atom(<<"%2F_", Q/binary>>, utf8). filter_queues(Expected, Got) -> - Keys = [K || [K, _, _, _] <- Expected], - lists:filter(fun([K, _, _, _]) -> - lists:member(K, Keys) + Keys = [hd(E) || E <- Expected], + lists:filter(fun(G) -> + lists:member(hd(G), Keys) end, Got). fifo_machines_use_same_version(Config) -> diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 25f3c05c66..92d9f8c806 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -93,7 +93,8 @@ all_tests() -> invalid_policy, max_age_policy, max_segment_size_policy, - purge + purge, + update_retention_policy ]. %% ------------------------------------------------------------------- @@ -1458,6 +1459,7 @@ max_age_policy(Config) -> Q = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + ok = rabbit_ct_broker_helpers:set_policy( Config, 0, <<"age">>, <<"max_age_policy.*">>, <<"queues">>, [{<<"max-age">>, <<"1Y">>}]), @@ -1470,8 +1472,40 @@ max_age_policy(Config) -> ?assertEqual('', proplists:get_value(operator_policy, Info)), ?assertEqual([{<<"max-age">>, <<"1Y">>}], proplists:get_value(effective_policy_definition, Info)), + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"age">>). +update_retention_policy(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-max-segment-size">>, long, 200} + ])), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"0">>, <<"0">>, <<"0">>]]), + [publish(Ch, Q, <<"msg">>) || _ <- lists:seq(1, 10000)], + quorum_queue_utils:wait_for_min_messages(Config, Q, 10000), + + {ok, Q0} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, + [rabbit_misc:r(<<"/">>, queue, Q)]), + timer:sleep(1000), + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"retention">>, <<"update_retention_policy.*">>, <<"queues">>, + [{<<"max-age">>, <<"1s">>}]), + timer:sleep(1000), + + quorum_queue_utils:wait_for_max_messages(Config, Q, 1000), + + {ok, Q1} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, + [rabbit_misc:r(<<"/">>, queue, Q)]), + + %% If there are changes only in the retention policy, processes should not be restarted + ?assertEqual(amqqueue:get_pid(Q0), amqqueue:get_pid(Q1)), + + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"retention">>). + max_segment_size_policy(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |