diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2020-12-22 15:38:32 +0100 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2021-01-12 12:18:13 +0000 |
commit | e5a2eaaa0dda6a27713346e28937771bac2b68a1 (patch) | |
tree | b6e873954d5828dd82529e3c31a30e505693d7af | |
parent | 015ec8b47fabf2e72f5edd68ae5b1af74a0358d0 (diff) | |
download | rabbitmq-server-git-e5a2eaaa0dda6a27713346e28937771bac2b68a1.tar.gz |
Update retention when only stream retention policy has changed
In any other case, the worker needs to be restarted
-rw-r--r-- | deps/rabbit/src/rabbit_stream_coordinator.erl | 11 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_stream_queue_SUITE.erl | 10 |
2 files changed, 19 insertions, 2 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 93053bc04e..5e97f67d3b 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -185,8 +185,15 @@ apply(#{from := From}, {policy_changed, #{stream_id := StreamId}} = Cmd, Conf -> %% No changes, ensure we only trigger an election if it's a must {State, ok, []}; - _ -> - {State, ok, [{mod_call, osiris_writer, stop, [Conf]}]} + #{retention := Retention, + leader_pid := Pid} = Conf0 -> + case maps:remove(retention, Conf) == maps:remove(retention, Conf0) of + true -> + %% Only retention policy has changed, it doesn't need a full restart + {State, ok, [{mod_call, osiris, update_retention, [Pid, Retention]}]}; + false -> + {State, ok, [{mod_call, osiris_writer, stop, [Conf]}]} + end end; SState0 -> Streams = maps:put(StreamId, add_pending_cmd(From, Cmd, SState0), Streams0), diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 25f3c05c66..9c2afd80f8 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -1458,6 +1458,10 @@ max_age_policy(Config) -> Q = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + {ok, Q0} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, + [rabbit_misc:r(<<"/">>, queue, Q)]), + ok = rabbit_ct_broker_helpers:set_policy( Config, 0, <<"age">>, <<"max_age_policy.*">>, <<"queues">>, [{<<"max-age">>, <<"1Y">>}]), @@ -1466,10 +1470,16 @@ max_age_policy(Config) -> info_all, [<<"/">>, [policy, operator_policy, effective_policy_definition]]), + {ok, Q1} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, + [rabbit_misc:r(<<"/">>, queue, Q)]), + ?assertEqual(<<"age">>, proplists:get_value(policy, Info)), ?assertEqual('', proplists:get_value(operator_policy, Info)), ?assertEqual([{<<"max-age">>, <<"1Y">>}], proplists:get_value(effective_policy_definition, Info)), + %% If there are changes only in the retention policy, processes should not be restarted + ?assertEqual(amqqueue:get_pid(Q0), amqqueue:get_pid(Q1)), + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"age">>). max_segment_size_policy(Config) -> |