diff options
author | kjnilsson <knilsson@pivotal.io> | 2021-01-11 16:30:14 +0000 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2021-01-13 12:09:44 +0000 |
commit | 2f0dba45d82b4e1f1ab7a0cbf75b3c9b99ff7a2b (patch) | |
tree | 017666fcc5e7bfaca72722009f482bfccaa87a12 | |
parent | 0ef00f88edd1dd1a00d349c78d5cec75e0d6c8e7 (diff) | |
download | rabbitmq-server-git-2f0dba45d82b4e1f1ab7a0cbf75b3c9b99ff7a2b.tar.gz |
Stream: Channel resend on leader change
Detect when a new stream leader is elected and make stream_queues
re-send any unconfirmed, pending messages to ensure they did not get
lost during the leader change. This is done using the osiris
deduplication feature to ensure the resend does not create duplicates of
messages in the stream.
-rw-r--r-- | deps/rabbit/src/rabbit_amqqueue.erl | 12 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_queue_type.erl | 25 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_stream_coordinator.erl | 148 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 71 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_stream_queue_SUITE.erl | 104 |
5 files changed, 315 insertions, 45 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index cd5f894680..435f5ff089 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -462,6 +462,8 @@ filter_per_type(all, _) -> true; filter_per_type(quorum, Q) -> ?amqqueue_is_quorum(Q); +filter_per_type(stream, Q) -> + ?amqqueue_is_stream(Q); filter_per_type(classic, Q) -> ?amqqueue_is_classic(Q). @@ -1714,10 +1716,11 @@ cancel_sync_mirrors(QPid) -> -spec is_replicated(amqqueue:amqqueue()) -> boolean(). -is_replicated(Q) when ?amqqueue_is_quorum(Q) -> - true; -is_replicated(Q) -> - rabbit_mirror_queue_misc:is_mirrored(Q). +is_replicated(Q) when ?amqqueue_is_classic(Q) -> + rabbit_mirror_queue_misc:is_mirrored(Q); +is_replicated(_Q) -> + %% streams and quorum queues are all replicated + true. is_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) -> false; @@ -1792,6 +1795,7 @@ on_node_down(Node) -> ok. delete_queues_on_node_down(Node) -> + rabbit_log:info("delete_queues_on_node_down GAHHH", []), lists:unzip(lists:flatten([ rabbit_misc:execute_mnesia_transaction( fun () -> [{Queue, delete_queue(Queue)} || Queue <- Queues] end diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 4e59b6a7c0..6ecaf40c7d 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -36,7 +36,6 @@ is_server_named_allowed/1 ]). -%% gah what is a good identity of a classic queue including all replicas -type queue_name() :: rabbit_types:r(queue). -type queue_ref() :: queue_name() | atom(). -type queue_state() :: term(). @@ -440,6 +439,7 @@ deliver(Qs, Delivery, stateless) -> end, Qs), {ok, stateless, []}; deliver(Qs, Delivery, #?STATE{} = State0) -> + %% TODO: optimise single queue case? %% sort by queue type - then dispatch each group ByType = lists:foldl( fun (Q, Acc) -> @@ -457,7 +457,7 @@ deliver(Qs, Delivery, #?STATE{} = State0) -> end, {[], []}, ByType), State = lists:foldl( fun({Q, S}, Acc) -> - Ctx = get_ctx(Q, Acc), + Ctx = get_ctx_with(Q, Acc, S), set_ctx(qref(Q), Ctx#ctx{state = S}, Acc) end, State0, Xs), return_ok(State, Actions). @@ -511,21 +511,32 @@ dequeue(Q, NoAck, LimiterPid, CTag, Ctxs) -> Err end. -get_ctx(Q, #?STATE{ctxs = Contexts}) when ?is_amqqueue(Q) -> +get_ctx(QOrQref, State) -> + get_ctx_with(QOrQref, State, undefined). + +get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState) + when ?is_amqqueue(Q) -> Ref = qref(Q), case Contexts of #{Ref := #ctx{module = Mod, state = State} = Ctx} -> Ctx#ctx{state = Mod:update(Q, State)}; - _ -> - %% not found - initialize + _ when InitState == undefined -> + %% not found and no initial state passed - initialize new state + Mod = amqqueue:get_type(Q), + Name = amqqueue:get_name(Q), + #ctx{module = Mod, + name = Name, + state = Mod:init(Q)}; + _ -> + %% not found - initialize with supplied initial state Mod = amqqueue:get_type(Q), Name = amqqueue:get_name(Q), #ctx{module = Mod, name = Name, - state = Mod:init(Q)} + state = InitState} end; -get_ctx(QRef, Contexts) when ?QREF(QRef) -> +get_ctx_with(QRef, Contexts, undefined) when ?QREF(QRef) -> case get_ctx(QRef, Contexts, undefined) of undefined -> exit({queue_context_not_found, QRef}); diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 6ae819171a..731a16e94d 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -30,7 +30,8 @@ start_cluster/1, delete_cluster/2, add_replica/2, - delete_replica/2]). + delete_replica/2, + register_listener/1]). -export([policy_changed/1]). @@ -46,15 +47,62 @@ -export([log_overview/1]). +-include("amqqueue.hrl"). + -define(STREAM_COORDINATOR_STARTUP, {stream_coordinator_startup, self()}). -define(TICK_TIMEOUT, 60000). -define(RESTART_TIMEOUT, 1000). -define(PHASE_RETRY_TIMEOUT, 10000). -define(CMD_TIMEOUT, 30000). --record(?MODULE, {streams, monitors}). +-type stream_id() :: binary(). +-type stream() :: #{conf := osiris:config(), + atom() => term()}. +-type stream_role() :: leader | follower | listener. +-type queue_ref() :: rabbit_types:r(queue). + +-record(?MODULE, {streams = #{} :: #{stream_id() => stream()}, + monitors = #{} :: #{pid() => {stream_id(), stream_role()}}, + listeners = #{} :: #{stream_id() => + #{pid() := queue_ref()}}, + %% future extensibility + reserved_1, + reserved_2}). + +-type state() :: #?MODULE{}. +-type command() :: {policy_changed, #{stream_id := stream_id()}} | + {start_cluster, #{queue := amqqueue:amqqueue()}} | + {start_cluster_reply, amqqueue:amqqueue()} | + {start_replica, #{stream_id := stream_id(), + node := node(), + retries := non_neg_integer()}} | + {start_replica_failed, #{stream_id := stream_id(), + node := node(), + retries := non_neg_integer()}, + Reply :: term()} | + {start_replica_reply, stream_id(), pid()} | + + {delete_replica, #{stream_id := stream_id(), + node := node()}} | + {delete_cluster, #{stream_id := stream_id(), + %% TODO: refine type + acting_user := term()}} | + {delete_cluster_reply, stream_id()} | + + {start_leader_election, stream_id(), osiris:epoch(), + Offsets :: term()} | + {leader_elected, stream_id(), NewLeaderPid :: pid()} | + {replicas_stopped, stream_id()} | + {phase_finished, stream_id(), Reply :: term()} | + {stream_updated, stream()} | + {register_listener, #{pid := pid(), + stream_id := stream_id(), + queue_ref := queue_ref()}} | + ra_machine:effect(). + + +-export_type([command/0]). --include("amqqueue.hrl"). start() -> Nodes = rabbit_mnesia:cluster_nodes(all), ServerId = {?MODULE, node()}, @@ -119,6 +167,16 @@ policy_changed(Q) when ?is_amqqueue(Q) -> delete_replica(StreamId, Node) -> process_command({delete_replica, #{stream_id => StreamId, node => Node}}). +-spec register_listener(amqeueue:amqqueue()) -> + {error, term()} | {ok, ok, atom() | {atom(), atom()}}. +register_listener(Q) when ?is_amqqueue(Q)-> + #{name := StreamId} = amqqueue:get_type_state(Q), + QRef= amqqueue:get_name(Q), + process_command({register_listener, + #{pid => self(), + stream_id => StreamId, + queue_ref => QRef}}). + process_command(Cmd) -> global:set_lock(?STREAM_COORDINATOR_STARTUP), Servers = ensure_coordinator_started(), @@ -176,9 +234,10 @@ all_nodes() -> [{?MODULE, Node} || Node <- [node() | Nodes]]. init(_Conf) -> - #?MODULE{streams = #{}, - monitors = #{}}. + #?MODULE{}. +-spec apply(map(), command(), state()) -> + {state(), term(), ra_machine:effects()}. apply(Meta, {policy_changed, #{stream_id := StreamId, queue := Q, retries := Retries}} = Cmd, @@ -233,7 +292,8 @@ apply(#{from := From}, {start_cluster, #{queue := Q}}, #?MODULE{streams = Stream pending_cmds => [], pending_replicas => []}, rabbit_log:debug("rabbit_stream_coordinator: ~p entering phase_start_cluster", [StreamId]), - {State#?MODULE{streams = maps:put(StreamId, SState, Streams)}, '$ra_no_reply', + {State#?MODULE{streams = maps:put(StreamId, SState, Streams)}, + '$ra_no_reply', [{aux, {phase, StreamId, Phase, PhaseArgs}}]} end; apply(_Meta, {start_cluster_reply, Q}, #?MODULE{streams = Streams, @@ -241,6 +301,9 @@ apply(_Meta, {start_cluster_reply, Q}, #?MODULE{streams = Streams, #{name := StreamId, leader_pid := LeaderPid, replica_pids := ReplicaPids} = Conf = amqqueue:get_type_state(Q), + %% TODO: this doesn't guarantee that all replicas were started successfully + %% we need to do something to check if any were not started and start a + %% retry phase to get them running SState0 = maps:get(StreamId, Streams), Phase = phase_repair_mnesia, PhaseArgs = [new, Q], @@ -256,7 +319,9 @@ apply(_Meta, {start_cluster_reply, Q}, #?MODULE{streams = Streams, {State#?MODULE{streams = maps:put(StreamId, SState, Streams), monitors = Monitors}, ok, MonitorActions ++ [{aux, {phase, StreamId, Phase, PhaseArgs}}]}; -apply(_Meta, {start_replica_failed, StreamId, Node, Retries, Reply}, +apply(_Meta, {start_replica_failed, #{stream_id := StreamId, + node := Node, + retries := Retries}, Reply}, #?MODULE{streams = Streams0} = State) -> rabbit_log:debug("rabbit_stream_coordinator: ~p start replica failed", [StreamId]), case maps:get(StreamId, Streams0, undefined) of @@ -271,7 +336,6 @@ apply(_Meta, {start_replica_failed, StreamId, Node, Retries, Reply}, [{timer, {pipeline, [{start_replica, #{stream_id => StreamId, node => Node, - from => undefined, retries => Retries + 1}}]}, ?RESTART_TIMEOUT * Retries}], State#?MODULE{streams = Streams}) @@ -428,9 +492,25 @@ apply(_Meta, {delete_cluster_reply, StreamId}, #?MODULE{streams = Streams} = Sta Actions = [{mod_call, ra, pipeline_command, [{?MODULE, node()}, Cmd]} || Cmd <- Pending], {State, ok, Actions ++ wrap_reply(From, {ok, 0})}; -apply(_Meta, {down, Pid, _Reason} = Cmd, #?MODULE{streams = Streams, - monitors = Monitors0} = State) -> +apply(_Meta, {down, Pid, _Reason} = Cmd, + #?MODULE{streams = Streams, + listeners = Listeners0, + monitors = Monitors0} = State) -> case maps:get(Pid, Monitors0, undefined) of + {StreamId, listener} -> + Listeners = case maps:take(StreamId, Listeners0) of + error -> + Listeners0; + {Pids0, Listeners1} -> + case maps:remove(Pid, Pids0) of + Pids when map_size(Pids) == 0 -> + Listeners1; + Pids -> + Listeners1#{StreamId => Pids} + end + end, + {State#?MODULE{listeners = Listeners, + monitors = maps:remove(Pid, Monitors0)}, ok, []}; {StreamId, Role} -> Monitors = maps:remove(Pid, Monitors0), case maps:get(StreamId, Streams, undefined) of @@ -507,7 +587,9 @@ apply(_Meta, {start_leader_election, StreamId, Q, NewEpoch, Offsets}, {State#?MODULE{streams = Streams#{StreamId => SState}}, ok, [{aux, {phase, StreamId, Phase, PhaseArgs}}]}; apply(_Meta, {leader_elected, StreamId, NewLeaderPid}, - #?MODULE{streams = Streams, monitors = Monitors0} = State) -> + #?MODULE{streams = Streams, + listeners = Listeners, + monitors = Monitors0} = State) -> rabbit_log:info("rabbit_stream_coordinator: ~p leader elected", [StreamId]), #{conf := Conf0, pending_cmds := Pending0} = SState0 = maps:get(StreamId, Streams), @@ -527,10 +609,22 @@ apply(_Meta, {leader_elected, StreamId, NewLeaderPid}, Monitors = maps:put(NewLeaderPid, {StreamId, leader}, maps:remove(LeaderPid, Monitors0)), rabbit_log:debug("rabbit_stream_coordinator: ~p entering ~p after " "leader election", [StreamId, Phase]), + Notifications = case maps:get(StreamId, Listeners, undefined) of + undefined -> + []; + Pids -> + maps:fold( + fun (Pid, QRef, Acc) -> + [{send_msg, Pid, + {queue_event, QRef, + {stream_leader_change, NewLeaderPid}}, + cast} | Acc] + end, [], Pids) + end, {State#?MODULE{streams = Streams#{StreamId => SState}, monitors = Monitors}, ok, [{monitor, process, NewLeaderPid}, - {aux, {phase, StreamId, Phase, PhaseArgs}}]}; + {aux, {phase, StreamId, Phase, PhaseArgs}} | Notifications]}; apply(_Meta, {replicas_stopped, StreamId}, #?MODULE{streams = Streams} = State) -> case maps:get(StreamId, Streams, undefined) of undefined -> @@ -564,7 +658,25 @@ apply(_, {timeout, {pipeline, Cmds}}, State) -> apply(_, {timeout, {aux, Cmd}}, State) -> {State, ok, [{aux, Cmd}]}; apply(Meta, {_, #{from := From}} = Cmd, State) -> - ?MODULE:apply(Meta#{from => From}, Cmd, State). + ?MODULE:apply(Meta#{from => From}, Cmd, State); +apply(_Meta, {register_listener, #{pid := Pid, + stream_id := StreamId, + queue_ref := QRef}}, + #?MODULE{listeners = Listeners0, + monitors = Monitors0} = State0) -> + Listeners = maps:update_with(StreamId, + fun (Pids) -> + maps:put(Pid, QRef, Pids) + end, #{Pid => QRef}, Listeners0), + Monitors = maps:put(Pid, {StreamId, listener}, Monitors0), + + {State0#?MODULE{listeners = Listeners, + monitors = Monitors}, ok, + [{monitor, process, Pid}]}; +apply(_Meta, UnkCmd, State) -> + rabbit_log:debug("rabbit_stream_coordinator: unknown command ~W", + [UnkCmd, 10]), + {State, {error, unknown_command}, []}. state_enter(leader, #?MODULE{streams = Streams, monitors = Monitors}) -> maps:fold(fun(_, #{conf := #{name := StreamId}, @@ -760,13 +872,19 @@ phase_start_replica(Node, #{name := StreamId} = Conf0, rabbit_log:warning("Error while starting replica for ~p : ~p", [maps:get(name, Conf0), Reason]), ra:pipeline_command({?MODULE, node()}, - {start_replica_failed, StreamId, Node, Retries, Error}) + {start_replica_failed, + #{stream_id => StreamId, + node => Node, + retries => Retries}, Error}) end catch _:E-> rabbit_log:warning("Error while starting replica for ~p : ~p", [maps:get(name, Conf0), E]), ra:pipeline_command({?MODULE, node()}, - {start_replica_failed, StreamId, Node, Retries, {error, E}}) + {start_replica_failed, + #{stream_id => StreamId, + node => Node, + retries => Retries}, {error, E}}) end end). diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 1f789f165b..695ab35241 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -45,7 +45,7 @@ -export([format_osiris_event/2]). -export([update_stream_conf/2]). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). -include("amqqueue.hrl"). -define(INFO_KEYS, [name, durable, auto_delete, arguments, leader, members, online, state, @@ -54,6 +54,9 @@ -type appender_seq() :: non_neg_integer(). +-type msg_id() :: non_neg_integer(). +-type msg() :: term(). %% TODO: refine + -record(stream, {name :: rabbit_types:r('queue'), credit :: integer(), max :: non_neg_integer(), @@ -64,10 +67,11 @@ -record(stream_client, {name :: term(), leader :: pid(), next_seq = 1 :: non_neg_integer(), - correlation = #{} :: #{appender_seq() => term()}, + correlation = #{} :: #{appender_seq() => {msg_id(), msg()}}, soft_limit :: non_neg_integer(), slow = false :: boolean(), - readers = #{} :: #{term() => #stream{}} + readers = #{} :: #{term() => #stream{}}, + writer_id :: binary() }). -import(rabbit_queue_type_util, [args_policy_lookup/3]). @@ -284,16 +288,17 @@ deliver(QSs, #delivery{confirm = Confirm} = Delivery) -> deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId}, #stream_client{name = Name, leader = LeaderPid, + writer_id = WriterId, next_seq = Seq, correlation = Correlation0, soft_limit = SftLmt, slow = Slow0} = State) -> - ok = osiris:write(LeaderPid, undefined, Seq, msg_to_iodata(Msg)), + ok = osiris:write(LeaderPid, WriterId, Seq, msg_to_iodata(Msg)), Correlation = case MsgId of undefined -> Correlation0; _ when is_number(MsgId) -> - Correlation0#{Seq => MsgId} + Correlation0#{Seq => {MsgId, Msg}} end, Slow = case maps:size(Correlation) >= SftLmt of true when not Slow0 -> @@ -305,16 +310,21 @@ deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId}, State#stream_client{next_seq = Seq + 1, correlation = Correlation, slow = Slow}. + -spec dequeue(_, _, _, client()) -> no_return(). dequeue(_, _, _, #stream_client{name = Name}) -> {protocol_error, not_implemented, "basic.get not supported by stream queues ~s", [rabbit_misc:rs(Name)]}. -handle_event({osiris_written, From, _WriterId, Corrs}, State = #stream_client{correlation = Correlation0, - soft_limit = SftLmt, - slow = Slow0, - name = Name}) -> - MsgIds = maps:values(maps:with(Corrs, Correlation0)), +handle_event({osiris_written, From, _WriterId, Corrs}, + State = #stream_client{correlation = Correlation0, + soft_limit = SftLmt, + slow = Slow0, + name = Name}) -> + MsgIds = lists:sort(maps:fold( + fun (_Seq, {I, _M}, Acc) -> + [I | Acc] + end, [], maps:with(Corrs, Correlation0))), Correlation = maps:without(Corrs, Correlation0), Slow = case maps:size(Correlation) < SftLmt of true when Slow0 -> @@ -325,9 +335,10 @@ handle_event({osiris_written, From, _WriterId, Corrs}, State = #stream_client{co end, {ok, State#stream_client{correlation = Correlation, slow = Slow}, [{settled, From, MsgIds}]}; -handle_event({osiris_offset, _From, _Offs}, State = #stream_client{leader = Leader, - readers = Readers0, - name = Name}) -> +handle_event({osiris_offset, _From, _Offs}, + State = #stream_client{leader = Leader, + readers = Readers0, + name = Name}) -> %% offset isn't actually needed as we use the atomic to read the %% current committed {Readers, TagMsgs} = maps:fold( @@ -342,7 +353,9 @@ handle_event({osiris_offset, _From, _Offs}, State = #stream_client{leader = Lead Ack = true, Deliveries = [{deliver, Tag, Ack, OffsetMsg} || {Tag, _LeaderPid, OffsetMsg} <- TagMsgs], - {ok, State#stream_client{readers = Readers}, Deliveries}. + {ok, State#stream_client{readers = Readers}, Deliveries}; +handle_event({stream_leader_change, Pid}, State) -> + {ok, update_leader_pid(Pid, State), []}. is_recoverable(Q) -> Node = node(), @@ -358,8 +371,8 @@ recover(_VHost, Queues) -> end, {[], []}, Queues). settle(complete, CTag, MsgIds, #stream_client{readers = Readers0, - name = Name, - leader = Leader} = State) -> + name = Name, + leader = Leader} = State) -> Credit = length(MsgIds), {Readers, Msgs} = case Readers0 of #{CTag := #stream{credit = Credit0} = Str0} -> @@ -450,9 +463,13 @@ i(_, _) -> init(Q) when ?is_amqqueue(Q) -> Leader = amqqueue:get_pid(Q), + {ok, ok, _} = rabbit_stream_coordinator:register_listener(Q), + Prefix = erlang:pid_to_list(self()) ++ "_", + WriterId = rabbit_guid:binary(rabbit_guid:gen(), Prefix), {ok, SoftLimit} = application:get_env(rabbit, stream_messages_soft_limit), #stream_client{name = amqqueue:get_name(Q), leader = Leader, + writer_id = WriterId, soft_limit = SoftLimit}. close(#stream_client{readers = Readers}) -> @@ -461,8 +478,15 @@ close(#stream_client{readers = Readers}) -> end, Readers), ok. -update(_, State) -> - State. +update(Q, State) + when ?is_amqqueue(Q) -> + Pid = amqqueue:get_pid(Q), + update_leader_pid(Pid, State). + +update_leader_pid(Pid, #stream_client{leader = Pid} = State) -> + State; +update_leader_pid(Pid, #stream_client{} = State) -> + resend_all(State#stream_client{leader = Pid}). state_info(_) -> #{}. @@ -611,7 +635,7 @@ initial_cluster_size(Val) -> res_arg(PolVal, undefined) -> PolVal; res_arg(_, ArgVal) -> ArgVal. - + queue_name(#resource{virtual_host = VHost, name = Name}) -> Timestamp = erlang:integer_to_binary(erlang:system_time()), osiris_util:to_base64uri(erlang:binary_to_list(<<VHost/binary, "_", Name/binary, "_", @@ -729,3 +753,12 @@ capabilities() -> <<"x-initial-cluster-size">>, <<"x-queue-leader-locator">>], consumer_arguments => [<<"x-stream-offset">>], server_named => false}. + +resend_all(#stream_client{leader = LeaderPid, + writer_id = WriterId, + correlation = Corrs} = State) -> + Msgs = lists:sort(maps:values(Corrs)), + [begin + ok = osiris:write(LeaderPid, WriterId, Seq, msg_to_iodata(Msg)) + end || {Seq, Msg} <- Msgs], + State. diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 92d9f8c806..a010869043 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -47,6 +47,7 @@ groups() -> delete_quorum_replica, consume_from_replica, leader_failover, + leader_failover_dedupe, initial_cluster_size_one, initial_cluster_size_two, initial_cluster_size_one_policy, @@ -1194,6 +1195,76 @@ leader_failover(Config) -> ?assert(NewLeader =/= Server1), ok = rabbit_ct_broker_helpers:start_node(Config, Server1). +leader_failover_dedupe(Config) -> + %% tests that in-flight messages are automatically handled in the case where + %% a leader change happens during publishing + [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + check_leader_and_replicas(Config, Q, Server1, [Server2, Server3]), + + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2), + #'confirm.select_ok'{} = amqp_channel:call(Ch2, #'confirm.select'{}), + + Self= self(), + F = fun F(N) -> + receive + go -> + [publish(Ch2, Q, integer_to_binary(N + I)) + || I <- lists:seq(1, 100)], + true = amqp_channel:wait_for_confirms(Ch2, 25), + F(N + 100); + stop -> + Self ! {last_msg, N}, + ct:pal("stop"), + ok + after 2 -> + self() ! go, + F(N) + end + end, + Pid = spawn(fun () -> + amqp_channel:register_confirm_handler(Ch2, self()), + F(0) + end), + erlang:monitor(process, Pid), + Pid ! go, + timer:sleep(10), + ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), + %% this should cause a new leader to be elected and the channel on node 2 + %% to have to resend any pending messages to ensure none is lost + timer:sleep(30000), + [Info] = lists:filter( + fun(Props) -> + QName = rabbit_misc:r(<<"/">>, queue, Q), + lists:member({name, QName}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader, members]])), + NewLeader = proplists:get_value(leader, Info), + ?assert(NewLeader =/= Server1), + flush(), + ?assert(erlang:is_process_alive(Pid)), + Pid ! stop, + ok = rabbit_ct_broker_helpers:start_node(Config, Server1), + + N = receive + {last_msg, X} -> X + after 2000 -> + exit(last_msg_timeout) + end, + %% validate that no duplicates were written even though an internal + %% resend might have taken place + qos(Ch2, 100, false), + subscribe(Ch2, Q, false, 0), + validate_dedupe(Ch2, 1, N), + + ok. + initial_cluster_size_one(Config) -> [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1598,6 +1669,30 @@ qos(Ch, Prefetch, Global) -> amqp_channel:call(Ch, #'basic.qos'{global = Global, prefetch_count = Prefetch})). +validate_dedupe(Ch, N, N) -> + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, + #amqp_msg{payload = B}} -> + I = binary_to_integer(B), + ?assertEqual(N, I), + ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}) + after 60000 -> + exit({missing_record, N}) + end; +validate_dedupe(Ch, N, M) -> + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, + #amqp_msg{payload = B}} -> + I = binary_to_integer(B), + ?assertEqual(N, I), + ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + validate_dedupe(Ch, N + 1, M) + after 60000 -> + exit({missing_record, N}) + end. + receive_batch(Ch, N, N) -> receive {#'basic.deliver'{delivery_tag = DeliveryTag}, @@ -1642,3 +1737,12 @@ run_proper(Fun, Args, NumTests) -> {on_output, fun(".", _) -> ok; % don't print the '.'s on new lines (F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) end}])). + +flush() -> + receive + Any -> + ct:pal("flush ~p", [Any]), + flush() + after 0 -> + ok + end. |