summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-12-22 15:38:32 +0100
committerkjnilsson <knilsson@pivotal.io>2021-01-12 12:18:13 +0000
commite5a2eaaa0dda6a27713346e28937771bac2b68a1 (patch)
treeb6e873954d5828dd82529e3c31a30e505693d7af
parent015ec8b47fabf2e72f5edd68ae5b1af74a0358d0 (diff)
downloadrabbitmq-server-git-e5a2eaaa0dda6a27713346e28937771bac2b68a1.tar.gz
Update retention when only stream retention policy has changed
In any other case, the worker needs to be restarted
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.erl11
-rw-r--r--deps/rabbit/test/rabbit_stream_queue_SUITE.erl10
2 files changed, 19 insertions, 2 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl
index 93053bc04e..5e97f67d3b 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.erl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.erl
@@ -185,8 +185,15 @@ apply(#{from := From}, {policy_changed, #{stream_id := StreamId}} = Cmd,
Conf ->
%% No changes, ensure we only trigger an election if it's a must
{State, ok, []};
- _ ->
- {State, ok, [{mod_call, osiris_writer, stop, [Conf]}]}
+ #{retention := Retention,
+ leader_pid := Pid} = Conf0 ->
+ case maps:remove(retention, Conf) == maps:remove(retention, Conf0) of
+ true ->
+ %% Only retention policy has changed, it doesn't need a full restart
+ {State, ok, [{mod_call, osiris, update_retention, [Pid, Retention]}]};
+ false ->
+ {State, ok, [{mod_call, osiris_writer, stop, [Conf]}]}
+ end
end;
SState0 ->
Streams = maps:put(StreamId, add_pending_cmd(From, Cmd, SState0), Streams0),
diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
index 25f3c05c66..9c2afd80f8 100644
--- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
+++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
@@ -1458,6 +1458,10 @@ max_age_policy(Config) ->
Q = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ {ok, Q0} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup,
+ [rabbit_misc:r(<<"/">>, queue, Q)]),
+
ok = rabbit_ct_broker_helpers:set_policy(
Config, 0, <<"age">>, <<"max_age_policy.*">>, <<"queues">>,
[{<<"max-age">>, <<"1Y">>}]),
@@ -1466,10 +1470,16 @@ max_age_policy(Config) ->
info_all, [<<"/">>, [policy, operator_policy,
effective_policy_definition]]),
+ {ok, Q1} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup,
+ [rabbit_misc:r(<<"/">>, queue, Q)]),
+
?assertEqual(<<"age">>, proplists:get_value(policy, Info)),
?assertEqual('', proplists:get_value(operator_policy, Info)),
?assertEqual([{<<"max-age">>, <<"1Y">>}],
proplists:get_value(effective_policy_definition, Info)),
+ %% If there are changes only in the retention policy, processes should not be restarted
+ ?assertEqual(amqqueue:get_pid(Q0), amqqueue:get_pid(Q1)),
+
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"age">>).
max_segment_size_policy(Config) ->