summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-01-13 12:05:33 +0000
committerGitHub <noreply@github.com>2021-01-13 12:05:33 +0000
commit0ef00f88edd1dd1a00d349c78d5cec75e0d6c8e7 (patch)
tree3be76ac2f3c1330dc33de337badd70eb80d850d3
parent015ec8b47fabf2e72f5edd68ae5b1af74a0358d0 (diff)
parent61de203fc54b1138fab44fd324bc7dca434cfc88 (diff)
downloadrabbitmq-server-git-0ef00f88edd1dd1a00d349c78d5cec75e0d6c8e7.tar.gz
Merge pull request #2695 from rabbitmq/update-retention
Update retention when only stream retention policy has changed
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.erl110
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl31
-rw-r--r--deps/rabbit/test/quorum_queue_utils.erl60
-rw-r--r--deps/rabbit/test/rabbit_stream_queue_SUITE.erl36
4 files changed, 198 insertions, 39 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl
index 93053bc04e..6ae819171a 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.erl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.erl
@@ -41,7 +41,8 @@
phase_start_new_leader/1,
phase_stop_replicas/1,
phase_start_replica/3,
- phase_delete_replica/2]).
+ phase_delete_replica/2,
+ phase_update_retention/4]).
-export([log_overview/1]).
@@ -53,6 +54,7 @@
-record(?MODULE, {streams, monitors}).
+-include("amqqueue.hrl").
start() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
ServerId = {?MODULE, node()},
@@ -108,8 +110,11 @@ add_replica(StreamId, Node) ->
process_command({start_replica, #{stream_id => StreamId, node => Node,
retries => 1}}).
-policy_changed(StreamId) ->
- process_command({policy_changed, #{stream_id => StreamId}}).
+policy_changed(Q) when ?is_amqqueue(Q) ->
+ StreamId = maps:get(name, amqqueue:get_type_state(Q)),
+ process_command({policy_changed, #{stream_id => StreamId,
+ queue => Q,
+ retries => 1}}).
delete_replica(StreamId, Node) ->
process_command({delete_replica, #{stream_id => StreamId, node => Node}}).
@@ -174,19 +179,37 @@ init(_Conf) ->
#?MODULE{streams = #{},
monitors = #{}}.
-apply(#{from := From}, {policy_changed, #{stream_id := StreamId}} = Cmd,
+apply(Meta, {policy_changed, #{stream_id := StreamId,
+ queue := Q,
+ retries := Retries}} = Cmd,
#?MODULE{streams = Streams0} = State) ->
+ From = maps:get(from, Meta, undefined),
case maps:get(StreamId, Streams0, undefined) of
undefined ->
{State, ok, []};
#{conf := Conf,
- state := running} ->
- case rabbit_stream_queue:update_stream_conf(Conf) of
+ state := running} = SState0 ->
+ case rabbit_stream_queue:update_stream_conf(Q, Conf) of
Conf ->
%% No changes, ensure we only trigger an election if it's a must
{State, ok, []};
- _ ->
- {State, ok, [{mod_call, osiris_writer, stop, [Conf]}]}
+ #{retention := Retention,
+ leader_pid := Pid,
+ replica_pids := Pids} = Conf0 ->
+ case maps:remove(retention, Conf) == maps:remove(retention, Conf0) of
+ true ->
+ %% Only retention policy has changed, it doesn't need a full restart
+ Phase = phase_update_retention,
+ %% TODO do it over replicas too
+ PhaseArgs = [[Pid | Pids], Retention, Conf0, Retries],
+ SState = update_stream_state(From, update_retention, Phase, PhaseArgs, SState0),
+ rabbit_log:debug("rabbit_stream_coordinator: ~p entering ~p on node ~p",
+ [StreamId, Phase, node()]),
+ {State#?MODULE{streams = Streams0#{StreamId => SState}}, '$ra_no_reply',
+ [{aux, {phase, StreamId, Phase, PhaseArgs}}]};
+ false ->
+ {State, ok, [{mod_call, osiris_writer, stop, [Conf]}]}
+ end
end;
SState0 ->
Streams = maps:put(StreamId, add_pending_cmd(From, Cmd, SState0), Streams0),
@@ -253,6 +276,23 @@ apply(_Meta, {start_replica_failed, StreamId, Node, Retries, Reply},
?RESTART_TIMEOUT * Retries}],
State#?MODULE{streams = Streams})
end;
+apply(_Meta, {update_retention_failed, StreamId, Q, Retries, Reply},
+ #?MODULE{streams = Streams0} = State) ->
+ rabbit_log:debug("rabbit_stream_coordinator: ~w update retention failed", [StreamId]),
+ case maps:get(StreamId, Streams0, undefined) of
+ undefined ->
+ {State, {error, not_found}, []};
+ #{reply_to := From} = SState ->
+ Streams = Streams0#{StreamId => clear_stream_state(SState)},
+ reply_and_run_pending(
+ From, StreamId, ok, Reply,
+ [{timer, {pipeline,
+ [{policy_changed, #{stream_id => StreamId,
+ queue => Q,
+ retries => Retries + 1}}]},
+ ?RESTART_TIMEOUT * Retries}],
+ State#?MODULE{streams = Streams})
+ end;
apply(_Meta, {phase_finished, StreamId, Reply}, #?MODULE{streams = Streams0} = State) ->
rabbit_log:debug("rabbit_stream_coordinator: ~p phase finished", [StreamId]),
case maps:get(StreamId, Streams0, undefined) of
@@ -385,7 +425,8 @@ apply(_Meta, {delete_cluster_reply, StreamId}, #?MODULE{streams = Streams} = Sta
State = State0#?MODULE{streams = maps:remove(StreamId, Streams)},
rabbit_log:debug("rabbit_stream_coordinator: ~p finished delete_cluster_reply",
[StreamId]),
- Actions = [{ra, pipeline_command, [{?MODULE, node()}, Cmd]} || Cmd <- Pending],
+ Actions = [{mod_call, ra, pipeline_command, [{?MODULE, node()}, Cmd]}
+ || Cmd <- Pending],
{State, ok, Actions ++ wrap_reply(From, {ok, 0})};
apply(_Meta, {down, Pid, _Reason} = Cmd, #?MODULE{streams = Streams,
monitors = Monitors0} = State) ->
@@ -415,7 +456,7 @@ apply(_Meta, {down, Pid, _Reason} = Cmd, #?MODULE{streams = Streams,
streams = Streams#{StreamId => SState}},
ok, Events ++ [{aux, {phase, StreamId, Phase, PhaseArgs}}]};
follower ->
- case rabbit_misc:is_process_alive(maps:get(leader_pid, Conf0)) of
+ case maps:is_key(maps:get(leader_pid, Conf0), Monitors) of
true ->
Phase = phase_start_replica,
PhaseArgs = [node(Pid), Conf0, 1],
@@ -425,11 +466,13 @@ apply(_Meta, {down, Pid, _Reason} = Cmd, #?MODULE{streams = Streams,
SState0),
rabbit_log:debug("rabbit_stream_coordinator: ~p replica on node ~p is down, entering ~p", [StreamId, node(Pid), Phase]),
{State#?MODULE{monitors = Monitors,
- streams = Streams#{StreamId => SState}},
+ streams = Streams#{StreamId => SState}},
ok, [{aux, {phase, StreamId, Phase, PhaseArgs}}]};
false ->
SState = SState0#{pending_cmds => Pending0 ++ [Cmd]},
- reply_and_run_pending(undefined, StreamId, ok, ok, [], State#?MODULE{streams = Streams#{StreamId => SState}})
+ reply_and_run_pending(undefined, StreamId, ok, ok, [],
+ State#?MODULE{monitors = Monitors,
+ streams = Streams#{StreamId => SState}})
end
end;
#{pending_cmds := Pending0} = SState0 ->
@@ -439,7 +482,7 @@ apply(_Meta, {down, Pid, _Reason} = Cmd, #?MODULE{streams = Streams,
undefined ->
{State, ok, []}
end;
-apply(_Meta, {start_leader_election, StreamId, NewEpoch, Offsets},
+apply(_Meta, {start_leader_election, StreamId, Q, NewEpoch, Offsets},
#?MODULE{streams = Streams} = State) ->
#{conf := Conf0} = SState0 = maps:get(StreamId, Streams),
#{leader_node := Leader,
@@ -450,10 +493,10 @@ apply(_Meta, {start_leader_election, StreamId, NewEpoch, Offsets},
[StreamId, NewLeader]),
{ReplicaPids, _} = delete_replica_pid(NewLeader, ReplicaPids0),
Conf = rabbit_stream_queue:update_stream_conf(
- Conf0#{epoch => NewEpoch,
- leader_node => NewLeader,
- replica_nodes => lists:delete(NewLeader, Replicas ++ [Leader]),
- replica_pids => ReplicaPids}),
+ Q, Conf0#{epoch => NewEpoch,
+ leader_node => NewLeader,
+ replica_nodes => lists:delete(NewLeader, Replicas ++ [Leader]),
+ replica_pids => ReplicaPids}),
Phase = phase_start_new_leader,
PhaseArgs = [Conf],
SState = SState0#{conf => Conf,
@@ -763,20 +806,51 @@ phase_start_new_leader(#{name := StreamId, leader_node := Node} = Conf) ->
end).
phase_check_quorum(#{name := StreamId,
+ reference := QName,
epoch := Epoch,
replica_nodes := Nodes} = Conf) ->
spawn(fun() ->
Offsets = find_replica_offsets(Conf),
case is_quorum(length(Nodes) + 1, length(Offsets)) of
true ->
+ Q = case rabbit_amqqueue:lookup(QName) of
+ {ok, A} -> A;
+ {error, not_found} ->
+ undefined
+ end,
ra:pipeline_command({?MODULE, node()},
- {start_leader_election, StreamId, Epoch + 1, Offsets});
+ {start_leader_election, StreamId,
+ Q, Epoch + 1, Offsets});
false ->
%% Let's crash this process so the monitor will restart it
exit({not_enough_quorum, StreamId})
end
end).
+phase_update_retention(Pids0, Retention, #{name := StreamId}, Retries) ->
+ spawn(
+ fun() ->
+ case update_retention(Pids0, Retention) of
+ ok ->
+ ra:pipeline_command({?MODULE, node()}, {phase_finished, StreamId, ok});
+ {error, Reason} ->
+ ra:pipeline_command({?MODULE, node()},
+ {update_retention_failed, StreamId,
+ Retention, Retries,
+ {error, Reason}})
+ end
+ end).
+
+update_retention([], _) ->
+ ok;
+update_retention([Pid | Pids], Retention) ->
+ case osiris:update_retention(Pid, Retention) of
+ ok ->
+ update_retention(Pids, Retention);
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
find_replica_offsets(#{replica_nodes := Nodes,
leader_node := Leader} = Conf) ->
lists:foldl(
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index 223e19c713..1f789f165b 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -43,7 +43,7 @@
-export([add_replica/3,
delete_replica/3]).
-export([format_osiris_event/2]).
--export([update_stream_conf/1]).
+-export([update_stream_conf/2]).
-include("rabbit.hrl").
-include("amqqueue.hrl").
@@ -142,8 +142,7 @@ purge(_) ->
-spec policy_changed(amqqueue:amqqueue()) -> 'ok'.
policy_changed(Q) ->
- Name = maps:get(name, amqqueue:get_type_state(Q)),
- _ = rabbit_stream_coordinator:policy_changed(Name),
+ _ = rabbit_stream_coordinator:policy_changed(Q),
ok.
stat(_) ->
@@ -571,20 +570,18 @@ select_stream_nodes(Size, Rest, Selected) ->
S = lists:nth(rand:uniform(length(Rest)), Rest),
select_stream_nodes(Size - 1, lists:delete(S, Rest), [S | Selected]).
-update_stream_conf(#{reference := QName} = Conf) ->
- case rabbit_amqqueue:lookup(QName) of
- {ok, Q} ->
- MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
- MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun max_age/2, Q)),
- MaxSegmentSize = args_policy_lookup(<<"max-segment-size">>, fun min/2, Q),
- Retention = lists:filter(fun({_, R}) ->
- R =/= undefined
- end, [{max_bytes, MaxBytes},
- {max_age, MaxAge}]),
- add_if_defined(max_segment_size, MaxSegmentSize, Conf#{retention => Retention});
- _ ->
- Conf
- end.
+update_stream_conf(undefined, #{} = Conf) ->
+ Conf;
+update_stream_conf(Q, #{} = Conf) when ?is_amqqueue(Q) ->
+ MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
+ MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun max_age/2, Q)),
+ MaxSegmentSize = args_policy_lookup(<<"max-segment-size">>, fun min/2, Q),
+ Retention = lists:filter(fun({_, R}) ->
+ R =/= undefined
+ end, [{max_bytes, MaxBytes},
+ {max_age, MaxAge}]),
+ add_if_defined(max_segment_size, MaxSegmentSize,
+ Conf#{retention => Retention}).
add_if_defined(_, undefined, Map) ->
Map;
diff --git a/deps/rabbit/test/quorum_queue_utils.erl b/deps/rabbit/test/quorum_queue_utils.erl
index 224abeeeeb..72d91d8a7d 100644
--- a/deps/rabbit/test/quorum_queue_utils.erl
+++ b/deps/rabbit/test/quorum_queue_utils.erl
@@ -7,6 +7,8 @@
wait_for_messages_pending_ack/3,
wait_for_messages_total/3,
wait_for_messages/2,
+ wait_for_min_messages/3,
+ wait_for_max_messages/3,
dirty_query/3,
ra_name/1,
fifo_machines_use_same_version/1,
@@ -75,6 +77,58 @@ wait_for_messages(Config, Stats, N) ->
wait_for_messages(Config, Stats, N - 1)
end.
+wait_for_min_messages(Config, Queue, Msgs) ->
+ wait_for_min_messages(Config, Queue, Msgs, 60).
+
+wait_for_min_messages(Config, Queue, Msgs, 0) ->
+ [[_, Got]] = filter_queues([[Queue, Msgs]],
+ rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name", "messages"])),
+ ct:pal("Got ~p messages on queue ~p", [Got, Queue]),
+ ?assert(binary_to_integer(Got) >= Msgs);
+wait_for_min_messages(Config, Queue, Msgs, N) ->
+ case filter_queues([[Queue, Msgs]],
+ rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name", "messages"])) of
+ [[_, Msgs0]] ->
+ case (binary_to_integer(Msgs0) >= Msgs) of
+ true ->
+ ok;
+ false ->
+ timer:sleep(500),
+ wait_for_min_messages(Config, Queue, Msgs, N - 1)
+ end;
+ _ ->
+ timer:sleep(500),
+ wait_for_min_messages(Config, Queue, Msgs, N - 1)
+ end.
+
+wait_for_max_messages(Config, Queue, Msgs) ->
+ wait_for_max_messages(Config, Queue, Msgs, 60).
+
+wait_for_max_messages(Config, Queue, Msgs, 0) ->
+ [[_, Got]] = filter_queues([[Queue, Msgs]],
+ rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name", "messages"])),
+ ct:pal("Got ~p messages on queue ~p", [Got, Queue]),
+ ?assert(binary_to_integer(Got) =< Msgs);
+wait_for_max_messages(Config, Queue, Msgs, N) ->
+ case filter_queues([[Queue, Msgs]],
+ rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name", "messages"])) of
+ [[_, Msgs0]] ->
+ case (binary_to_integer(Msgs0) =< Msgs) of
+ true ->
+ ok;
+ false ->
+ timer:sleep(500),
+ wait_for_max_messages(Config, Queue, Msgs, N - 1)
+ end;
+ _ ->
+ timer:sleep(500),
+ wait_for_max_messages(Config, Queue, Msgs, N - 1)
+ end.
+
dirty_query(Servers, QName, Fun) ->
lists:map(
fun(N) ->
@@ -90,9 +144,9 @@ ra_name(Q) ->
binary_to_atom(<<"%2F_", Q/binary>>, utf8).
filter_queues(Expected, Got) ->
- Keys = [K || [K, _, _, _] <- Expected],
- lists:filter(fun([K, _, _, _]) ->
- lists:member(K, Keys)
+ Keys = [hd(E) || E <- Expected],
+ lists:filter(fun(G) ->
+ lists:member(hd(G), Keys)
end, Got).
fifo_machines_use_same_version(Config) ->
diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
index 25f3c05c66..92d9f8c806 100644
--- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
+++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
@@ -93,7 +93,8 @@ all_tests() ->
invalid_policy,
max_age_policy,
max_segment_size_policy,
- purge
+ purge,
+ update_retention_policy
].
%% -------------------------------------------------------------------
@@ -1458,6 +1459,7 @@ max_age_policy(Config) ->
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
ok = rabbit_ct_broker_helpers:set_policy(
Config, 0, <<"age">>, <<"max_age_policy.*">>, <<"queues">>,
[{<<"max-age">>, <<"1Y">>}]),
@@ -1470,8 +1472,40 @@ max_age_policy(Config) ->
?assertEqual('', proplists:get_value(operator_policy, Info)),
?assertEqual([{<<"max-age">>, <<"1Y">>}],
proplists:get_value(effective_policy_definition, Info)),
+
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"age">>).
+update_retention_policy(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Q = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-max-segment-size">>, long, 200}
+ ])),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"0">>, <<"0">>, <<"0">>]]),
+ [publish(Ch, Q, <<"msg">>) || _ <- lists:seq(1, 10000)],
+ quorum_queue_utils:wait_for_min_messages(Config, Q, 10000),
+
+ {ok, Q0} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup,
+ [rabbit_misc:r(<<"/">>, queue, Q)]),
+ timer:sleep(1000),
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"retention">>, <<"update_retention_policy.*">>, <<"queues">>,
+ [{<<"max-age">>, <<"1s">>}]),
+ timer:sleep(1000),
+
+ quorum_queue_utils:wait_for_max_messages(Config, Q, 1000),
+
+ {ok, Q1} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup,
+ [rabbit_misc:r(<<"/">>, queue, Q)]),
+
+ %% If there are changes only in the retention policy, processes should not be restarted
+ ?assertEqual(amqqueue:get_pid(Q0), amqqueue:get_pid(Q1)),
+
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"retention">>).
+
max_segment_size_policy(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),