diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-01-13 17:11:06 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-13 17:11:06 +0000 |
commit | 54226a162667e7c3fed33a0d240f1183e8e33d71 (patch) | |
tree | 7116e082d1a26f7bed0156d564c58839367913c7 | |
parent | e8fccbaf48fd8e0d448ea94409e05bb5038cbe9d (diff) | |
parent | ca53234ce336763218779a848b783d6f3a272030 (diff) | |
download | rabbitmq-server-git-54226a162667e7c3fed33a0d240f1183e8e33d71.tar.gz |
Merge pull request #2713 from rabbitmq/stream-queue-failover
Stream: Channel resend on leader change
-rw-r--r-- | deps/rabbit/src/rabbit_amqqueue.erl | 11 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_queue_type.erl | 25 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_stream_coordinator.erl | 151 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 71 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_stream_queue_SUITE.erl | 104 |
5 files changed, 316 insertions, 46 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index cd5f894680..933e3a6402 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -462,6 +462,8 @@ filter_per_type(all, _) -> true; filter_per_type(quorum, Q) -> ?amqqueue_is_quorum(Q); +filter_per_type(stream, Q) -> + ?amqqueue_is_stream(Q); filter_per_type(classic, Q) -> ?amqqueue_is_classic(Q). @@ -1714,10 +1716,11 @@ cancel_sync_mirrors(QPid) -> -spec is_replicated(amqqueue:amqqueue()) -> boolean(). -is_replicated(Q) when ?amqqueue_is_quorum(Q) -> - true; -is_replicated(Q) -> - rabbit_mirror_queue_misc:is_mirrored(Q). +is_replicated(Q) when ?amqqueue_is_classic(Q) -> + rabbit_mirror_queue_misc:is_mirrored(Q); +is_replicated(_Q) -> + %% streams and quorum queues are all replicated + true. is_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) -> false; diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 4e59b6a7c0..6ecaf40c7d 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -36,7 +36,6 @@ is_server_named_allowed/1 ]). -%% gah what is a good identity of a classic queue including all replicas -type queue_name() :: rabbit_types:r(queue). -type queue_ref() :: queue_name() | atom(). -type queue_state() :: term(). @@ -440,6 +439,7 @@ deliver(Qs, Delivery, stateless) -> end, Qs), {ok, stateless, []}; deliver(Qs, Delivery, #?STATE{} = State0) -> + %% TODO: optimise single queue case? %% sort by queue type - then dispatch each group ByType = lists:foldl( fun (Q, Acc) -> @@ -457,7 +457,7 @@ deliver(Qs, Delivery, #?STATE{} = State0) -> end, {[], []}, ByType), State = lists:foldl( fun({Q, S}, Acc) -> - Ctx = get_ctx(Q, Acc), + Ctx = get_ctx_with(Q, Acc, S), set_ctx(qref(Q), Ctx#ctx{state = S}, Acc) end, State0, Xs), return_ok(State, Actions). @@ -511,21 +511,32 @@ dequeue(Q, NoAck, LimiterPid, CTag, Ctxs) -> Err end. -get_ctx(Q, #?STATE{ctxs = Contexts}) when ?is_amqqueue(Q) -> +get_ctx(QOrQref, State) -> + get_ctx_with(QOrQref, State, undefined). + +get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState) + when ?is_amqqueue(Q) -> Ref = qref(Q), case Contexts of #{Ref := #ctx{module = Mod, state = State} = Ctx} -> Ctx#ctx{state = Mod:update(Q, State)}; - _ -> - %% not found - initialize + _ when InitState == undefined -> + %% not found and no initial state passed - initialize new state + Mod = amqqueue:get_type(Q), + Name = amqqueue:get_name(Q), + #ctx{module = Mod, + name = Name, + state = Mod:init(Q)}; + _ -> + %% not found - initialize with supplied initial state Mod = amqqueue:get_type(Q), Name = amqqueue:get_name(Q), #ctx{module = Mod, name = Name, - state = Mod:init(Q)} + state = InitState} end; -get_ctx(QRef, Contexts) when ?QREF(QRef) -> +get_ctx_with(QRef, Contexts, undefined) when ?QREF(QRef) -> case get_ctx(QRef, Contexts, undefined) of undefined -> exit({queue_context_not_found, QRef}); diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 6ae819171a..89e64c3139 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -30,7 +30,8 @@ start_cluster/1, delete_cluster/2, add_replica/2, - delete_replica/2]). + delete_replica/2, + register_listener/1]). -export([policy_changed/1]). @@ -46,15 +47,62 @@ -export([log_overview/1]). +-include("amqqueue.hrl"). + -define(STREAM_COORDINATOR_STARTUP, {stream_coordinator_startup, self()}). -define(TICK_TIMEOUT, 60000). -define(RESTART_TIMEOUT, 1000). -define(PHASE_RETRY_TIMEOUT, 10000). -define(CMD_TIMEOUT, 30000). --record(?MODULE, {streams, monitors}). +-type stream_id() :: binary(). +-type stream() :: #{conf := osiris:config(), + atom() => term()}. +-type stream_role() :: leader | follower | listener. +-type queue_ref() :: rabbit_types:r(queue). + +-record(?MODULE, {streams = #{} :: #{stream_id() => stream()}, + monitors = #{} :: #{pid() => {stream_id(), stream_role()}}, + listeners = #{} :: #{stream_id() => + #{pid() := queue_ref()}}, + %% future extensibility + reserved_1, + reserved_2}). + +-type state() :: #?MODULE{}. +-type command() :: {policy_changed, #{stream_id := stream_id()}} | + {start_cluster, #{queue := amqqueue:amqqueue()}} | + {start_cluster_reply, amqqueue:amqqueue()} | + {start_replica, #{stream_id := stream_id(), + node := node(), + retries := non_neg_integer()}} | + {start_replica_failed, #{stream_id := stream_id(), + node := node(), + retries := non_neg_integer()}, + Reply :: term()} | + {start_replica_reply, stream_id(), pid()} | + + {delete_replica, #{stream_id := stream_id(), + node := node()}} | + {delete_cluster, #{stream_id := stream_id(), + %% TODO: refine type + acting_user := term()}} | + {delete_cluster_reply, stream_id()} | + + {start_leader_election, stream_id(), osiris:epoch(), + Offsets :: term()} | + {leader_elected, stream_id(), NewLeaderPid :: pid()} | + {replicas_stopped, stream_id()} | + {phase_finished, stream_id(), Reply :: term()} | + {stream_updated, stream()} | + {register_listener, #{pid := pid(), + stream_id := stream_id(), + queue_ref := queue_ref()}} | + ra_machine:effect(). + + +-export_type([command/0]). --include("amqqueue.hrl"). start() -> Nodes = rabbit_mnesia:cluster_nodes(all), ServerId = {?MODULE, node()}, @@ -119,6 +167,16 @@ policy_changed(Q) when ?is_amqqueue(Q) -> delete_replica(StreamId, Node) -> process_command({delete_replica, #{stream_id => StreamId, node => Node}}). +-spec register_listener(amqeueue:amqqueue()) -> + {error, term()} | {ok, ok, atom() | {atom(), atom()}}. +register_listener(Q) when ?is_amqqueue(Q)-> + #{name := StreamId} = amqqueue:get_type_state(Q), + QRef= amqqueue:get_name(Q), + process_command({register_listener, + #{pid => self(), + stream_id => StreamId, + queue_ref => QRef}}). + process_command(Cmd) -> global:set_lock(?STREAM_COORDINATOR_STARTUP), Servers = ensure_coordinator_started(), @@ -176,9 +234,10 @@ all_nodes() -> [{?MODULE, Node} || Node <- [node() | Nodes]]. init(_Conf) -> - #?MODULE{streams = #{}, - monitors = #{}}. + #?MODULE{}. +-spec apply(map(), command(), state()) -> + {state(), term(), ra_machine:effects()}. apply(Meta, {policy_changed, #{stream_id := StreamId, queue := Q, retries := Retries}} = Cmd, @@ -233,7 +292,8 @@ apply(#{from := From}, {start_cluster, #{queue := Q}}, #?MODULE{streams = Stream pending_cmds => [], pending_replicas => []}, rabbit_log:debug("rabbit_stream_coordinator: ~p entering phase_start_cluster", [StreamId]), - {State#?MODULE{streams = maps:put(StreamId, SState, Streams)}, '$ra_no_reply', + {State#?MODULE{streams = maps:put(StreamId, SState, Streams)}, + '$ra_no_reply', [{aux, {phase, StreamId, Phase, PhaseArgs}}]} end; apply(_Meta, {start_cluster_reply, Q}, #?MODULE{streams = Streams, @@ -241,6 +301,9 @@ apply(_Meta, {start_cluster_reply, Q}, #?MODULE{streams = Streams, #{name := StreamId, leader_pid := LeaderPid, replica_pids := ReplicaPids} = Conf = amqqueue:get_type_state(Q), + %% TODO: this doesn't guarantee that all replicas were started successfully + %% we need to do something to check if any were not started and start a + %% retry phase to get them running SState0 = maps:get(StreamId, Streams), Phase = phase_repair_mnesia, PhaseArgs = [new, Q], @@ -256,7 +319,9 @@ apply(_Meta, {start_cluster_reply, Q}, #?MODULE{streams = Streams, {State#?MODULE{streams = maps:put(StreamId, SState, Streams), monitors = Monitors}, ok, MonitorActions ++ [{aux, {phase, StreamId, Phase, PhaseArgs}}]}; -apply(_Meta, {start_replica_failed, StreamId, Node, Retries, Reply}, +apply(_Meta, {start_replica_failed, #{stream_id := StreamId, + node := Node, + retries := Retries}, Reply}, #?MODULE{streams = Streams0} = State) -> rabbit_log:debug("rabbit_stream_coordinator: ~p start replica failed", [StreamId]), case maps:get(StreamId, Streams0, undefined) of @@ -271,7 +336,6 @@ apply(_Meta, {start_replica_failed, StreamId, Node, Retries, Reply}, [{timer, {pipeline, [{start_replica, #{stream_id => StreamId, node => Node, - from => undefined, retries => Retries + 1}}]}, ?RESTART_TIMEOUT * Retries}], State#?MODULE{streams = Streams}) @@ -302,9 +366,10 @@ apply(_Meta, {phase_finished, StreamId, Reply}, #?MODULE{streams = Streams0} = S Streams = Streams0#{StreamId => clear_stream_state(SState)}, reply_and_run_pending(From, StreamId, ok, Reply, [], State#?MODULE{streams = Streams}) end; -apply(#{from := From}, {start_replica, #{stream_id := StreamId, node := Node, +apply(Meta, {start_replica, #{stream_id := StreamId, node := Node, retries := Retries}} = Cmd, #?MODULE{streams = Streams0} = State) -> + From = maps:get(from, Meta, undefined), case maps:get(StreamId, Streams0, undefined) of undefined -> case From of @@ -428,9 +493,25 @@ apply(_Meta, {delete_cluster_reply, StreamId}, #?MODULE{streams = Streams} = Sta Actions = [{mod_call, ra, pipeline_command, [{?MODULE, node()}, Cmd]} || Cmd <- Pending], {State, ok, Actions ++ wrap_reply(From, {ok, 0})}; -apply(_Meta, {down, Pid, _Reason} = Cmd, #?MODULE{streams = Streams, - monitors = Monitors0} = State) -> +apply(_Meta, {down, Pid, _Reason} = Cmd, + #?MODULE{streams = Streams, + listeners = Listeners0, + monitors = Monitors0} = State) -> case maps:get(Pid, Monitors0, undefined) of + {StreamId, listener} -> + Listeners = case maps:take(StreamId, Listeners0) of + error -> + Listeners0; + {Pids0, Listeners1} -> + case maps:remove(Pid, Pids0) of + Pids when map_size(Pids) == 0 -> + Listeners1; + Pids -> + Listeners1#{StreamId => Pids} + end + end, + {State#?MODULE{listeners = Listeners, + monitors = maps:remove(Pid, Monitors0)}, ok, []}; {StreamId, Role} -> Monitors = maps:remove(Pid, Monitors0), case maps:get(StreamId, Streams, undefined) of @@ -507,7 +588,9 @@ apply(_Meta, {start_leader_election, StreamId, Q, NewEpoch, Offsets}, {State#?MODULE{streams = Streams#{StreamId => SState}}, ok, [{aux, {phase, StreamId, Phase, PhaseArgs}}]}; apply(_Meta, {leader_elected, StreamId, NewLeaderPid}, - #?MODULE{streams = Streams, monitors = Monitors0} = State) -> + #?MODULE{streams = Streams, + listeners = Listeners, + monitors = Monitors0} = State) -> rabbit_log:info("rabbit_stream_coordinator: ~p leader elected", [StreamId]), #{conf := Conf0, pending_cmds := Pending0} = SState0 = maps:get(StreamId, Streams), @@ -527,10 +610,22 @@ apply(_Meta, {leader_elected, StreamId, NewLeaderPid}, Monitors = maps:put(NewLeaderPid, {StreamId, leader}, maps:remove(LeaderPid, Monitors0)), rabbit_log:debug("rabbit_stream_coordinator: ~p entering ~p after " "leader election", [StreamId, Phase]), + Notifications = case maps:get(StreamId, Listeners, undefined) of + undefined -> + []; + Pids -> + maps:fold( + fun (Pid, QRef, Acc) -> + [{send_msg, Pid, + {queue_event, QRef, + {stream_leader_change, NewLeaderPid}}, + cast} | Acc] + end, [], Pids) + end, {State#?MODULE{streams = Streams#{StreamId => SState}, monitors = Monitors}, ok, [{monitor, process, NewLeaderPid}, - {aux, {phase, StreamId, Phase, PhaseArgs}}]}; + {aux, {phase, StreamId, Phase, PhaseArgs}} | Notifications]}; apply(_Meta, {replicas_stopped, StreamId}, #?MODULE{streams = Streams} = State) -> case maps:get(StreamId, Streams, undefined) of undefined -> @@ -564,7 +659,25 @@ apply(_, {timeout, {pipeline, Cmds}}, State) -> apply(_, {timeout, {aux, Cmd}}, State) -> {State, ok, [{aux, Cmd}]}; apply(Meta, {_, #{from := From}} = Cmd, State) -> - ?MODULE:apply(Meta#{from => From}, Cmd, State). + ?MODULE:apply(Meta#{from => From}, Cmd, State); +apply(_Meta, {register_listener, #{pid := Pid, + stream_id := StreamId, + queue_ref := QRef}}, + #?MODULE{listeners = Listeners0, + monitors = Monitors0} = State0) -> + Listeners = maps:update_with(StreamId, + fun (Pids) -> + maps:put(Pid, QRef, Pids) + end, #{Pid => QRef}, Listeners0), + Monitors = maps:put(Pid, {StreamId, listener}, Monitors0), + + {State0#?MODULE{listeners = Listeners, + monitors = Monitors}, ok, + [{monitor, process, Pid}]}; +apply(_Meta, UnkCmd, State) -> + rabbit_log:debug("rabbit_stream_coordinator: unknown command ~W", + [UnkCmd, 10]), + {State, {error, unknown_command}, []}. state_enter(leader, #?MODULE{streams = Streams, monitors = Monitors}) -> maps:fold(fun(_, #{conf := #{name := StreamId}, @@ -760,13 +873,19 @@ phase_start_replica(Node, #{name := StreamId} = Conf0, rabbit_log:warning("Error while starting replica for ~p : ~p", [maps:get(name, Conf0), Reason]), ra:pipeline_command({?MODULE, node()}, - {start_replica_failed, StreamId, Node, Retries, Error}) + {start_replica_failed, + #{stream_id => StreamId, + node => Node, + retries => Retries}, Error}) end catch _:E-> rabbit_log:warning("Error while starting replica for ~p : ~p", [maps:get(name, Conf0), E]), ra:pipeline_command({?MODULE, node()}, - {start_replica_failed, StreamId, Node, Retries, {error, E}}) + {start_replica_failed, + #{stream_id => StreamId, + node => Node, + retries => Retries}, {error, E}}) end end). diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 1f789f165b..695ab35241 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -45,7 +45,7 @@ -export([format_osiris_event/2]). -export([update_stream_conf/2]). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). -include("amqqueue.hrl"). -define(INFO_KEYS, [name, durable, auto_delete, arguments, leader, members, online, state, @@ -54,6 +54,9 @@ -type appender_seq() :: non_neg_integer(). +-type msg_id() :: non_neg_integer(). +-type msg() :: term(). %% TODO: refine + -record(stream, {name :: rabbit_types:r('queue'), credit :: integer(), max :: non_neg_integer(), @@ -64,10 +67,11 @@ -record(stream_client, {name :: term(), leader :: pid(), next_seq = 1 :: non_neg_integer(), - correlation = #{} :: #{appender_seq() => term()}, + correlation = #{} :: #{appender_seq() => {msg_id(), msg()}}, soft_limit :: non_neg_integer(), slow = false :: boolean(), - readers = #{} :: #{term() => #stream{}} + readers = #{} :: #{term() => #stream{}}, + writer_id :: binary() }). -import(rabbit_queue_type_util, [args_policy_lookup/3]). @@ -284,16 +288,17 @@ deliver(QSs, #delivery{confirm = Confirm} = Delivery) -> deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId}, #stream_client{name = Name, leader = LeaderPid, + writer_id = WriterId, next_seq = Seq, correlation = Correlation0, soft_limit = SftLmt, slow = Slow0} = State) -> - ok = osiris:write(LeaderPid, undefined, Seq, msg_to_iodata(Msg)), + ok = osiris:write(LeaderPid, WriterId, Seq, msg_to_iodata(Msg)), Correlation = case MsgId of undefined -> Correlation0; _ when is_number(MsgId) -> - Correlation0#{Seq => MsgId} + Correlation0#{Seq => {MsgId, Msg}} end, Slow = case maps:size(Correlation) >= SftLmt of true when not Slow0 -> @@ -305,16 +310,21 @@ deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId}, State#stream_client{next_seq = Seq + 1, correlation = Correlation, slow = Slow}. + -spec dequeue(_, _, _, client()) -> no_return(). dequeue(_, _, _, #stream_client{name = Name}) -> {protocol_error, not_implemented, "basic.get not supported by stream queues ~s", [rabbit_misc:rs(Name)]}. -handle_event({osiris_written, From, _WriterId, Corrs}, State = #stream_client{correlation = Correlation0, - soft_limit = SftLmt, - slow = Slow0, - name = Name}) -> - MsgIds = maps:values(maps:with(Corrs, Correlation0)), +handle_event({osiris_written, From, _WriterId, Corrs}, + State = #stream_client{correlation = Correlation0, + soft_limit = SftLmt, + slow = Slow0, + name = Name}) -> + MsgIds = lists:sort(maps:fold( + fun (_Seq, {I, _M}, Acc) -> + [I | Acc] + end, [], maps:with(Corrs, Correlation0))), Correlation = maps:without(Corrs, Correlation0), Slow = case maps:size(Correlation) < SftLmt of true when Slow0 -> @@ -325,9 +335,10 @@ handle_event({osiris_written, From, _WriterId, Corrs}, State = #stream_client{co end, {ok, State#stream_client{correlation = Correlation, slow = Slow}, [{settled, From, MsgIds}]}; -handle_event({osiris_offset, _From, _Offs}, State = #stream_client{leader = Leader, - readers = Readers0, - name = Name}) -> +handle_event({osiris_offset, _From, _Offs}, + State = #stream_client{leader = Leader, + readers = Readers0, + name = Name}) -> %% offset isn't actually needed as we use the atomic to read the %% current committed {Readers, TagMsgs} = maps:fold( @@ -342,7 +353,9 @@ handle_event({osiris_offset, _From, _Offs}, State = #stream_client{leader = Lead Ack = true, Deliveries = [{deliver, Tag, Ack, OffsetMsg} || {Tag, _LeaderPid, OffsetMsg} <- TagMsgs], - {ok, State#stream_client{readers = Readers}, Deliveries}. + {ok, State#stream_client{readers = Readers}, Deliveries}; +handle_event({stream_leader_change, Pid}, State) -> + {ok, update_leader_pid(Pid, State), []}. is_recoverable(Q) -> Node = node(), @@ -358,8 +371,8 @@ recover(_VHost, Queues) -> end, {[], []}, Queues). settle(complete, CTag, MsgIds, #stream_client{readers = Readers0, - name = Name, - leader = Leader} = State) -> + name = Name, + leader = Leader} = State) -> Credit = length(MsgIds), {Readers, Msgs} = case Readers0 of #{CTag := #stream{credit = Credit0} = Str0} -> @@ -450,9 +463,13 @@ i(_, _) -> init(Q) when ?is_amqqueue(Q) -> Leader = amqqueue:get_pid(Q), + {ok, ok, _} = rabbit_stream_coordinator:register_listener(Q), + Prefix = erlang:pid_to_list(self()) ++ "_", + WriterId = rabbit_guid:binary(rabbit_guid:gen(), Prefix), {ok, SoftLimit} = application:get_env(rabbit, stream_messages_soft_limit), #stream_client{name = amqqueue:get_name(Q), leader = Leader, + writer_id = WriterId, soft_limit = SoftLimit}. close(#stream_client{readers = Readers}) -> @@ -461,8 +478,15 @@ close(#stream_client{readers = Readers}) -> end, Readers), ok. -update(_, State) -> - State. +update(Q, State) + when ?is_amqqueue(Q) -> + Pid = amqqueue:get_pid(Q), + update_leader_pid(Pid, State). + +update_leader_pid(Pid, #stream_client{leader = Pid} = State) -> + State; +update_leader_pid(Pid, #stream_client{} = State) -> + resend_all(State#stream_client{leader = Pid}). state_info(_) -> #{}. @@ -611,7 +635,7 @@ initial_cluster_size(Val) -> res_arg(PolVal, undefined) -> PolVal; res_arg(_, ArgVal) -> ArgVal. - + queue_name(#resource{virtual_host = VHost, name = Name}) -> Timestamp = erlang:integer_to_binary(erlang:system_time()), osiris_util:to_base64uri(erlang:binary_to_list(<<VHost/binary, "_", Name/binary, "_", @@ -729,3 +753,12 @@ capabilities() -> <<"x-initial-cluster-size">>, <<"x-queue-leader-locator">>], consumer_arguments => [<<"x-stream-offset">>], server_named => false}. + +resend_all(#stream_client{leader = LeaderPid, + writer_id = WriterId, + correlation = Corrs} = State) -> + Msgs = lists:sort(maps:values(Corrs)), + [begin + ok = osiris:write(LeaderPid, WriterId, Seq, msg_to_iodata(Msg)) + end || {Seq, Msg} <- Msgs], + State. diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 92d9f8c806..a010869043 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -47,6 +47,7 @@ groups() -> delete_quorum_replica, consume_from_replica, leader_failover, + leader_failover_dedupe, initial_cluster_size_one, initial_cluster_size_two, initial_cluster_size_one_policy, @@ -1194,6 +1195,76 @@ leader_failover(Config) -> ?assert(NewLeader =/= Server1), ok = rabbit_ct_broker_helpers:start_node(Config, Server1). +leader_failover_dedupe(Config) -> + %% tests that in-flight messages are automatically handled in the case where + %% a leader change happens during publishing + [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + check_leader_and_replicas(Config, Q, Server1, [Server2, Server3]), + + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2), + #'confirm.select_ok'{} = amqp_channel:call(Ch2, #'confirm.select'{}), + + Self= self(), + F = fun F(N) -> + receive + go -> + [publish(Ch2, Q, integer_to_binary(N + I)) + || I <- lists:seq(1, 100)], + true = amqp_channel:wait_for_confirms(Ch2, 25), + F(N + 100); + stop -> + Self ! {last_msg, N}, + ct:pal("stop"), + ok + after 2 -> + self() ! go, + F(N) + end + end, + Pid = spawn(fun () -> + amqp_channel:register_confirm_handler(Ch2, self()), + F(0) + end), + erlang:monitor(process, Pid), + Pid ! go, + timer:sleep(10), + ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), + %% this should cause a new leader to be elected and the channel on node 2 + %% to have to resend any pending messages to ensure none is lost + timer:sleep(30000), + [Info] = lists:filter( + fun(Props) -> + QName = rabbit_misc:r(<<"/">>, queue, Q), + lists:member({name, QName}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader, members]])), + NewLeader = proplists:get_value(leader, Info), + ?assert(NewLeader =/= Server1), + flush(), + ?assert(erlang:is_process_alive(Pid)), + Pid ! stop, + ok = rabbit_ct_broker_helpers:start_node(Config, Server1), + + N = receive + {last_msg, X} -> X + after 2000 -> + exit(last_msg_timeout) + end, + %% validate that no duplicates were written even though an internal + %% resend might have taken place + qos(Ch2, 100, false), + subscribe(Ch2, Q, false, 0), + validate_dedupe(Ch2, 1, N), + + ok. + initial_cluster_size_one(Config) -> [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1598,6 +1669,30 @@ qos(Ch, Prefetch, Global) -> amqp_channel:call(Ch, #'basic.qos'{global = Global, prefetch_count = Prefetch})). +validate_dedupe(Ch, N, N) -> + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, + #amqp_msg{payload = B}} -> + I = binary_to_integer(B), + ?assertEqual(N, I), + ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}) + after 60000 -> + exit({missing_record, N}) + end; +validate_dedupe(Ch, N, M) -> + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, + #amqp_msg{payload = B}} -> + I = binary_to_integer(B), + ?assertEqual(N, I), + ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + validate_dedupe(Ch, N + 1, M) + after 60000 -> + exit({missing_record, N}) + end. + receive_batch(Ch, N, N) -> receive {#'basic.deliver'{delivery_tag = DeliveryTag}, @@ -1642,3 +1737,12 @@ run_proper(Fun, Args, NumTests) -> {on_output, fun(".", _) -> ok; % don't print the '.'s on new lines (F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) end}])). + +flush() -> + receive + Any -> + ct:pal("flush ~p", [Any]), + flush() + after 0 -> + ok + end. |