summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-01-13 17:11:06 +0000
committerGitHub <noreply@github.com>2021-01-13 17:11:06 +0000
commit54226a162667e7c3fed33a0d240f1183e8e33d71 (patch)
tree7116e082d1a26f7bed0156d564c58839367913c7
parente8fccbaf48fd8e0d448ea94409e05bb5038cbe9d (diff)
parentca53234ce336763218779a848b783d6f3a272030 (diff)
downloadrabbitmq-server-git-54226a162667e7c3fed33a0d240f1183e8e33d71.tar.gz
Merge pull request #2713 from rabbitmq/stream-queue-failover
Stream: Channel resend on leader change
-rw-r--r--deps/rabbit/src/rabbit_amqqueue.erl11
-rw-r--r--deps/rabbit/src/rabbit_queue_type.erl25
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.erl151
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl71
-rw-r--r--deps/rabbit/test/rabbit_stream_queue_SUITE.erl104
5 files changed, 316 insertions, 46 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl
index cd5f894680..933e3a6402 100644
--- a/deps/rabbit/src/rabbit_amqqueue.erl
+++ b/deps/rabbit/src/rabbit_amqqueue.erl
@@ -462,6 +462,8 @@ filter_per_type(all, _) ->
true;
filter_per_type(quorum, Q) ->
?amqqueue_is_quorum(Q);
+filter_per_type(stream, Q) ->
+ ?amqqueue_is_stream(Q);
filter_per_type(classic, Q) ->
?amqqueue_is_classic(Q).
@@ -1714,10 +1716,11 @@ cancel_sync_mirrors(QPid) ->
-spec is_replicated(amqqueue:amqqueue()) -> boolean().
-is_replicated(Q) when ?amqqueue_is_quorum(Q) ->
- true;
-is_replicated(Q) ->
- rabbit_mirror_queue_misc:is_mirrored(Q).
+is_replicated(Q) when ?amqqueue_is_classic(Q) ->
+ rabbit_mirror_queue_misc:is_mirrored(Q);
+is_replicated(_Q) ->
+ %% streams and quorum queues are all replicated
+ true.
is_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) ->
false;
diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl
index 4e59b6a7c0..6ecaf40c7d 100644
--- a/deps/rabbit/src/rabbit_queue_type.erl
+++ b/deps/rabbit/src/rabbit_queue_type.erl
@@ -36,7 +36,6 @@
is_server_named_allowed/1
]).
-%% gah what is a good identity of a classic queue including all replicas
-type queue_name() :: rabbit_types:r(queue).
-type queue_ref() :: queue_name() | atom().
-type queue_state() :: term().
@@ -440,6 +439,7 @@ deliver(Qs, Delivery, stateless) ->
end, Qs),
{ok, stateless, []};
deliver(Qs, Delivery, #?STATE{} = State0) ->
+ %% TODO: optimise single queue case?
%% sort by queue type - then dispatch each group
ByType = lists:foldl(
fun (Q, Acc) ->
@@ -457,7 +457,7 @@ deliver(Qs, Delivery, #?STATE{} = State0) ->
end, {[], []}, ByType),
State = lists:foldl(
fun({Q, S}, Acc) ->
- Ctx = get_ctx(Q, Acc),
+ Ctx = get_ctx_with(Q, Acc, S),
set_ctx(qref(Q), Ctx#ctx{state = S}, Acc)
end, State0, Xs),
return_ok(State, Actions).
@@ -511,21 +511,32 @@ dequeue(Q, NoAck, LimiterPid, CTag, Ctxs) ->
Err
end.
-get_ctx(Q, #?STATE{ctxs = Contexts}) when ?is_amqqueue(Q) ->
+get_ctx(QOrQref, State) ->
+ get_ctx_with(QOrQref, State, undefined).
+
+get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState)
+ when ?is_amqqueue(Q) ->
Ref = qref(Q),
case Contexts of
#{Ref := #ctx{module = Mod,
state = State} = Ctx} ->
Ctx#ctx{state = Mod:update(Q, State)};
- _ ->
- %% not found - initialize
+ _ when InitState == undefined ->
+ %% not found and no initial state passed - initialize new state
+ Mod = amqqueue:get_type(Q),
+ Name = amqqueue:get_name(Q),
+ #ctx{module = Mod,
+ name = Name,
+ state = Mod:init(Q)};
+ _ ->
+ %% not found - initialize with supplied initial state
Mod = amqqueue:get_type(Q),
Name = amqqueue:get_name(Q),
#ctx{module = Mod,
name = Name,
- state = Mod:init(Q)}
+ state = InitState}
end;
-get_ctx(QRef, Contexts) when ?QREF(QRef) ->
+get_ctx_with(QRef, Contexts, undefined) when ?QREF(QRef) ->
case get_ctx(QRef, Contexts, undefined) of
undefined ->
exit({queue_context_not_found, QRef});
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl
index 6ae819171a..89e64c3139 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.erl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.erl
@@ -30,7 +30,8 @@
start_cluster/1,
delete_cluster/2,
add_replica/2,
- delete_replica/2]).
+ delete_replica/2,
+ register_listener/1]).
-export([policy_changed/1]).
@@ -46,15 +47,62 @@
-export([log_overview/1]).
+-include("amqqueue.hrl").
+
-define(STREAM_COORDINATOR_STARTUP, {stream_coordinator_startup, self()}).
-define(TICK_TIMEOUT, 60000).
-define(RESTART_TIMEOUT, 1000).
-define(PHASE_RETRY_TIMEOUT, 10000).
-define(CMD_TIMEOUT, 30000).
--record(?MODULE, {streams, monitors}).
+-type stream_id() :: binary().
+-type stream() :: #{conf := osiris:config(),
+ atom() => term()}.
+-type stream_role() :: leader | follower | listener.
+-type queue_ref() :: rabbit_types:r(queue).
+
+-record(?MODULE, {streams = #{} :: #{stream_id() => stream()},
+ monitors = #{} :: #{pid() => {stream_id(), stream_role()}},
+ listeners = #{} :: #{stream_id() =>
+ #{pid() := queue_ref()}},
+ %% future extensibility
+ reserved_1,
+ reserved_2}).
+
+-type state() :: #?MODULE{}.
+-type command() :: {policy_changed, #{stream_id := stream_id()}} |
+ {start_cluster, #{queue := amqqueue:amqqueue()}} |
+ {start_cluster_reply, amqqueue:amqqueue()} |
+ {start_replica, #{stream_id := stream_id(),
+ node := node(),
+ retries := non_neg_integer()}} |
+ {start_replica_failed, #{stream_id := stream_id(),
+ node := node(),
+ retries := non_neg_integer()},
+ Reply :: term()} |
+ {start_replica_reply, stream_id(), pid()} |
+
+ {delete_replica, #{stream_id := stream_id(),
+ node := node()}} |
+ {delete_cluster, #{stream_id := stream_id(),
+ %% TODO: refine type
+ acting_user := term()}} |
+ {delete_cluster_reply, stream_id()} |
+
+ {start_leader_election, stream_id(), osiris:epoch(),
+ Offsets :: term()} |
+ {leader_elected, stream_id(), NewLeaderPid :: pid()} |
+ {replicas_stopped, stream_id()} |
+ {phase_finished, stream_id(), Reply :: term()} |
+ {stream_updated, stream()} |
+ {register_listener, #{pid := pid(),
+ stream_id := stream_id(),
+ queue_ref := queue_ref()}} |
+ ra_machine:effect().
+
+
+-export_type([command/0]).
--include("amqqueue.hrl").
start() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
ServerId = {?MODULE, node()},
@@ -119,6 +167,16 @@ policy_changed(Q) when ?is_amqqueue(Q) ->
delete_replica(StreamId, Node) ->
process_command({delete_replica, #{stream_id => StreamId, node => Node}}).
+-spec register_listener(amqeueue:amqqueue()) ->
+ {error, term()} | {ok, ok, atom() | {atom(), atom()}}.
+register_listener(Q) when ?is_amqqueue(Q)->
+ #{name := StreamId} = amqqueue:get_type_state(Q),
+ QRef= amqqueue:get_name(Q),
+ process_command({register_listener,
+ #{pid => self(),
+ stream_id => StreamId,
+ queue_ref => QRef}}).
+
process_command(Cmd) ->
global:set_lock(?STREAM_COORDINATOR_STARTUP),
Servers = ensure_coordinator_started(),
@@ -176,9 +234,10 @@ all_nodes() ->
[{?MODULE, Node} || Node <- [node() | Nodes]].
init(_Conf) ->
- #?MODULE{streams = #{},
- monitors = #{}}.
+ #?MODULE{}.
+-spec apply(map(), command(), state()) ->
+ {state(), term(), ra_machine:effects()}.
apply(Meta, {policy_changed, #{stream_id := StreamId,
queue := Q,
retries := Retries}} = Cmd,
@@ -233,7 +292,8 @@ apply(#{from := From}, {start_cluster, #{queue := Q}}, #?MODULE{streams = Stream
pending_cmds => [],
pending_replicas => []},
rabbit_log:debug("rabbit_stream_coordinator: ~p entering phase_start_cluster", [StreamId]),
- {State#?MODULE{streams = maps:put(StreamId, SState, Streams)}, '$ra_no_reply',
+ {State#?MODULE{streams = maps:put(StreamId, SState, Streams)},
+ '$ra_no_reply',
[{aux, {phase, StreamId, Phase, PhaseArgs}}]}
end;
apply(_Meta, {start_cluster_reply, Q}, #?MODULE{streams = Streams,
@@ -241,6 +301,9 @@ apply(_Meta, {start_cluster_reply, Q}, #?MODULE{streams = Streams,
#{name := StreamId,
leader_pid := LeaderPid,
replica_pids := ReplicaPids} = Conf = amqqueue:get_type_state(Q),
+ %% TODO: this doesn't guarantee that all replicas were started successfully
+ %% we need to do something to check if any were not started and start a
+ %% retry phase to get them running
SState0 = maps:get(StreamId, Streams),
Phase = phase_repair_mnesia,
PhaseArgs = [new, Q],
@@ -256,7 +319,9 @@ apply(_Meta, {start_cluster_reply, Q}, #?MODULE{streams = Streams,
{State#?MODULE{streams = maps:put(StreamId, SState, Streams),
monitors = Monitors}, ok,
MonitorActions ++ [{aux, {phase, StreamId, Phase, PhaseArgs}}]};
-apply(_Meta, {start_replica_failed, StreamId, Node, Retries, Reply},
+apply(_Meta, {start_replica_failed, #{stream_id := StreamId,
+ node := Node,
+ retries := Retries}, Reply},
#?MODULE{streams = Streams0} = State) ->
rabbit_log:debug("rabbit_stream_coordinator: ~p start replica failed", [StreamId]),
case maps:get(StreamId, Streams0, undefined) of
@@ -271,7 +336,6 @@ apply(_Meta, {start_replica_failed, StreamId, Node, Retries, Reply},
[{timer, {pipeline,
[{start_replica, #{stream_id => StreamId,
node => Node,
- from => undefined,
retries => Retries + 1}}]},
?RESTART_TIMEOUT * Retries}],
State#?MODULE{streams = Streams})
@@ -302,9 +366,10 @@ apply(_Meta, {phase_finished, StreamId, Reply}, #?MODULE{streams = Streams0} = S
Streams = Streams0#{StreamId => clear_stream_state(SState)},
reply_and_run_pending(From, StreamId, ok, Reply, [], State#?MODULE{streams = Streams})
end;
-apply(#{from := From}, {start_replica, #{stream_id := StreamId, node := Node,
+apply(Meta, {start_replica, #{stream_id := StreamId, node := Node,
retries := Retries}} = Cmd,
#?MODULE{streams = Streams0} = State) ->
+ From = maps:get(from, Meta, undefined),
case maps:get(StreamId, Streams0, undefined) of
undefined ->
case From of
@@ -428,9 +493,25 @@ apply(_Meta, {delete_cluster_reply, StreamId}, #?MODULE{streams = Streams} = Sta
Actions = [{mod_call, ra, pipeline_command, [{?MODULE, node()}, Cmd]}
|| Cmd <- Pending],
{State, ok, Actions ++ wrap_reply(From, {ok, 0})};
-apply(_Meta, {down, Pid, _Reason} = Cmd, #?MODULE{streams = Streams,
- monitors = Monitors0} = State) ->
+apply(_Meta, {down, Pid, _Reason} = Cmd,
+ #?MODULE{streams = Streams,
+ listeners = Listeners0,
+ monitors = Monitors0} = State) ->
case maps:get(Pid, Monitors0, undefined) of
+ {StreamId, listener} ->
+ Listeners = case maps:take(StreamId, Listeners0) of
+ error ->
+ Listeners0;
+ {Pids0, Listeners1} ->
+ case maps:remove(Pid, Pids0) of
+ Pids when map_size(Pids) == 0 ->
+ Listeners1;
+ Pids ->
+ Listeners1#{StreamId => Pids}
+ end
+ end,
+ {State#?MODULE{listeners = Listeners,
+ monitors = maps:remove(Pid, Monitors0)}, ok, []};
{StreamId, Role} ->
Monitors = maps:remove(Pid, Monitors0),
case maps:get(StreamId, Streams, undefined) of
@@ -507,7 +588,9 @@ apply(_Meta, {start_leader_election, StreamId, Q, NewEpoch, Offsets},
{State#?MODULE{streams = Streams#{StreamId => SState}}, ok,
[{aux, {phase, StreamId, Phase, PhaseArgs}}]};
apply(_Meta, {leader_elected, StreamId, NewLeaderPid},
- #?MODULE{streams = Streams, monitors = Monitors0} = State) ->
+ #?MODULE{streams = Streams,
+ listeners = Listeners,
+ monitors = Monitors0} = State) ->
rabbit_log:info("rabbit_stream_coordinator: ~p leader elected", [StreamId]),
#{conf := Conf0,
pending_cmds := Pending0} = SState0 = maps:get(StreamId, Streams),
@@ -527,10 +610,22 @@ apply(_Meta, {leader_elected, StreamId, NewLeaderPid},
Monitors = maps:put(NewLeaderPid, {StreamId, leader}, maps:remove(LeaderPid, Monitors0)),
rabbit_log:debug("rabbit_stream_coordinator: ~p entering ~p after "
"leader election", [StreamId, Phase]),
+ Notifications = case maps:get(StreamId, Listeners, undefined) of
+ undefined ->
+ [];
+ Pids ->
+ maps:fold(
+ fun (Pid, QRef, Acc) ->
+ [{send_msg, Pid,
+ {queue_event, QRef,
+ {stream_leader_change, NewLeaderPid}},
+ cast} | Acc]
+ end, [], Pids)
+ end,
{State#?MODULE{streams = Streams#{StreamId => SState},
monitors = Monitors}, ok,
[{monitor, process, NewLeaderPid},
- {aux, {phase, StreamId, Phase, PhaseArgs}}]};
+ {aux, {phase, StreamId, Phase, PhaseArgs}} | Notifications]};
apply(_Meta, {replicas_stopped, StreamId}, #?MODULE{streams = Streams} = State) ->
case maps:get(StreamId, Streams, undefined) of
undefined ->
@@ -564,7 +659,25 @@ apply(_, {timeout, {pipeline, Cmds}}, State) ->
apply(_, {timeout, {aux, Cmd}}, State) ->
{State, ok, [{aux, Cmd}]};
apply(Meta, {_, #{from := From}} = Cmd, State) ->
- ?MODULE:apply(Meta#{from => From}, Cmd, State).
+ ?MODULE:apply(Meta#{from => From}, Cmd, State);
+apply(_Meta, {register_listener, #{pid := Pid,
+ stream_id := StreamId,
+ queue_ref := QRef}},
+ #?MODULE{listeners = Listeners0,
+ monitors = Monitors0} = State0) ->
+ Listeners = maps:update_with(StreamId,
+ fun (Pids) ->
+ maps:put(Pid, QRef, Pids)
+ end, #{Pid => QRef}, Listeners0),
+ Monitors = maps:put(Pid, {StreamId, listener}, Monitors0),
+
+ {State0#?MODULE{listeners = Listeners,
+ monitors = Monitors}, ok,
+ [{monitor, process, Pid}]};
+apply(_Meta, UnkCmd, State) ->
+ rabbit_log:debug("rabbit_stream_coordinator: unknown command ~W",
+ [UnkCmd, 10]),
+ {State, {error, unknown_command}, []}.
state_enter(leader, #?MODULE{streams = Streams, monitors = Monitors}) ->
maps:fold(fun(_, #{conf := #{name := StreamId},
@@ -760,13 +873,19 @@ phase_start_replica(Node, #{name := StreamId} = Conf0,
rabbit_log:warning("Error while starting replica for ~p : ~p",
[maps:get(name, Conf0), Reason]),
ra:pipeline_command({?MODULE, node()},
- {start_replica_failed, StreamId, Node, Retries, Error})
+ {start_replica_failed,
+ #{stream_id => StreamId,
+ node => Node,
+ retries => Retries}, Error})
end
catch _:E->
rabbit_log:warning("Error while starting replica for ~p : ~p",
[maps:get(name, Conf0), E]),
ra:pipeline_command({?MODULE, node()},
- {start_replica_failed, StreamId, Node, Retries, {error, E}})
+ {start_replica_failed,
+ #{stream_id => StreamId,
+ node => Node,
+ retries => Retries}, {error, E}})
end
end).
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index 1f789f165b..695ab35241 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -45,7 +45,7 @@
-export([format_osiris_event/2]).
-export([update_stream_conf/2]).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
-include("amqqueue.hrl").
-define(INFO_KEYS, [name, durable, auto_delete, arguments, leader, members, online, state,
@@ -54,6 +54,9 @@
-type appender_seq() :: non_neg_integer().
+-type msg_id() :: non_neg_integer().
+-type msg() :: term(). %% TODO: refine
+
-record(stream, {name :: rabbit_types:r('queue'),
credit :: integer(),
max :: non_neg_integer(),
@@ -64,10 +67,11 @@
-record(stream_client, {name :: term(),
leader :: pid(),
next_seq = 1 :: non_neg_integer(),
- correlation = #{} :: #{appender_seq() => term()},
+ correlation = #{} :: #{appender_seq() => {msg_id(), msg()}},
soft_limit :: non_neg_integer(),
slow = false :: boolean(),
- readers = #{} :: #{term() => #stream{}}
+ readers = #{} :: #{term() => #stream{}},
+ writer_id :: binary()
}).
-import(rabbit_queue_type_util, [args_policy_lookup/3]).
@@ -284,16 +288,17 @@ deliver(QSs, #delivery{confirm = Confirm} = Delivery) ->
deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId},
#stream_client{name = Name,
leader = LeaderPid,
+ writer_id = WriterId,
next_seq = Seq,
correlation = Correlation0,
soft_limit = SftLmt,
slow = Slow0} = State) ->
- ok = osiris:write(LeaderPid, undefined, Seq, msg_to_iodata(Msg)),
+ ok = osiris:write(LeaderPid, WriterId, Seq, msg_to_iodata(Msg)),
Correlation = case MsgId of
undefined ->
Correlation0;
_ when is_number(MsgId) ->
- Correlation0#{Seq => MsgId}
+ Correlation0#{Seq => {MsgId, Msg}}
end,
Slow = case maps:size(Correlation) >= SftLmt of
true when not Slow0 ->
@@ -305,16 +310,21 @@ deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId},
State#stream_client{next_seq = Seq + 1,
correlation = Correlation,
slow = Slow}.
+
-spec dequeue(_, _, _, client()) -> no_return().
dequeue(_, _, _, #stream_client{name = Name}) ->
{protocol_error, not_implemented, "basic.get not supported by stream queues ~s",
[rabbit_misc:rs(Name)]}.
-handle_event({osiris_written, From, _WriterId, Corrs}, State = #stream_client{correlation = Correlation0,
- soft_limit = SftLmt,
- slow = Slow0,
- name = Name}) ->
- MsgIds = maps:values(maps:with(Corrs, Correlation0)),
+handle_event({osiris_written, From, _WriterId, Corrs},
+ State = #stream_client{correlation = Correlation0,
+ soft_limit = SftLmt,
+ slow = Slow0,
+ name = Name}) ->
+ MsgIds = lists:sort(maps:fold(
+ fun (_Seq, {I, _M}, Acc) ->
+ [I | Acc]
+ end, [], maps:with(Corrs, Correlation0))),
Correlation = maps:without(Corrs, Correlation0),
Slow = case maps:size(Correlation) < SftLmt of
true when Slow0 ->
@@ -325,9 +335,10 @@ handle_event({osiris_written, From, _WriterId, Corrs}, State = #stream_client{co
end,
{ok, State#stream_client{correlation = Correlation,
slow = Slow}, [{settled, From, MsgIds}]};
-handle_event({osiris_offset, _From, _Offs}, State = #stream_client{leader = Leader,
- readers = Readers0,
- name = Name}) ->
+handle_event({osiris_offset, _From, _Offs},
+ State = #stream_client{leader = Leader,
+ readers = Readers0,
+ name = Name}) ->
%% offset isn't actually needed as we use the atomic to read the
%% current committed
{Readers, TagMsgs} = maps:fold(
@@ -342,7 +353,9 @@ handle_event({osiris_offset, _From, _Offs}, State = #stream_client{leader = Lead
Ack = true,
Deliveries = [{deliver, Tag, Ack, OffsetMsg}
|| {Tag, _LeaderPid, OffsetMsg} <- TagMsgs],
- {ok, State#stream_client{readers = Readers}, Deliveries}.
+ {ok, State#stream_client{readers = Readers}, Deliveries};
+handle_event({stream_leader_change, Pid}, State) ->
+ {ok, update_leader_pid(Pid, State), []}.
is_recoverable(Q) ->
Node = node(),
@@ -358,8 +371,8 @@ recover(_VHost, Queues) ->
end, {[], []}, Queues).
settle(complete, CTag, MsgIds, #stream_client{readers = Readers0,
- name = Name,
- leader = Leader} = State) ->
+ name = Name,
+ leader = Leader} = State) ->
Credit = length(MsgIds),
{Readers, Msgs} = case Readers0 of
#{CTag := #stream{credit = Credit0} = Str0} ->
@@ -450,9 +463,13 @@ i(_, _) ->
init(Q) when ?is_amqqueue(Q) ->
Leader = amqqueue:get_pid(Q),
+ {ok, ok, _} = rabbit_stream_coordinator:register_listener(Q),
+ Prefix = erlang:pid_to_list(self()) ++ "_",
+ WriterId = rabbit_guid:binary(rabbit_guid:gen(), Prefix),
{ok, SoftLimit} = application:get_env(rabbit, stream_messages_soft_limit),
#stream_client{name = amqqueue:get_name(Q),
leader = Leader,
+ writer_id = WriterId,
soft_limit = SoftLimit}.
close(#stream_client{readers = Readers}) ->
@@ -461,8 +478,15 @@ close(#stream_client{readers = Readers}) ->
end, Readers),
ok.
-update(_, State) ->
- State.
+update(Q, State)
+ when ?is_amqqueue(Q) ->
+ Pid = amqqueue:get_pid(Q),
+ update_leader_pid(Pid, State).
+
+update_leader_pid(Pid, #stream_client{leader = Pid} = State) ->
+ State;
+update_leader_pid(Pid, #stream_client{} = State) ->
+ resend_all(State#stream_client{leader = Pid}).
state_info(_) ->
#{}.
@@ -611,7 +635,7 @@ initial_cluster_size(Val) ->
res_arg(PolVal, undefined) -> PolVal;
res_arg(_, ArgVal) -> ArgVal.
-
+
queue_name(#resource{virtual_host = VHost, name = Name}) ->
Timestamp = erlang:integer_to_binary(erlang:system_time()),
osiris_util:to_base64uri(erlang:binary_to_list(<<VHost/binary, "_", Name/binary, "_",
@@ -729,3 +753,12 @@ capabilities() ->
<<"x-initial-cluster-size">>, <<"x-queue-leader-locator">>],
consumer_arguments => [<<"x-stream-offset">>],
server_named => false}.
+
+resend_all(#stream_client{leader = LeaderPid,
+ writer_id = WriterId,
+ correlation = Corrs} = State) ->
+ Msgs = lists:sort(maps:values(Corrs)),
+ [begin
+ ok = osiris:write(LeaderPid, WriterId, Seq, msg_to_iodata(Msg))
+ end || {Seq, Msg} <- Msgs],
+ State.
diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
index 92d9f8c806..a010869043 100644
--- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
+++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
@@ -47,6 +47,7 @@ groups() ->
delete_quorum_replica,
consume_from_replica,
leader_failover,
+ leader_failover_dedupe,
initial_cluster_size_one,
initial_cluster_size_two,
initial_cluster_size_one_policy,
@@ -1194,6 +1195,76 @@ leader_failover(Config) ->
?assert(NewLeader =/= Server1),
ok = rabbit_ct_broker_helpers:start_node(Config, Server1).
+leader_failover_dedupe(Config) ->
+ %% tests that in-flight messages are automatically handled in the case where
+ %% a leader change happens during publishing
+ [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ check_leader_and_replicas(Config, Q, Server1, [Server2, Server3]),
+
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch2, #'confirm.select'{}),
+
+ Self= self(),
+ F = fun F(N) ->
+ receive
+ go ->
+ [publish(Ch2, Q, integer_to_binary(N + I))
+ || I <- lists:seq(1, 100)],
+ true = amqp_channel:wait_for_confirms(Ch2, 25),
+ F(N + 100);
+ stop ->
+ Self ! {last_msg, N},
+ ct:pal("stop"),
+ ok
+ after 2 ->
+ self() ! go,
+ F(N)
+ end
+ end,
+ Pid = spawn(fun () ->
+ amqp_channel:register_confirm_handler(Ch2, self()),
+ F(0)
+ end),
+ erlang:monitor(process, Pid),
+ Pid ! go,
+ timer:sleep(10),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ %% this should cause a new leader to be elected and the channel on node 2
+ %% to have to resend any pending messages to ensure none is lost
+ timer:sleep(30000),
+ [Info] = lists:filter(
+ fun(Props) ->
+ QName = rabbit_misc:r(<<"/">>, queue, Q),
+ lists:member({name, QName}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader, members]])),
+ NewLeader = proplists:get_value(leader, Info),
+ ?assert(NewLeader =/= Server1),
+ flush(),
+ ?assert(erlang:is_process_alive(Pid)),
+ Pid ! stop,
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
+
+ N = receive
+ {last_msg, X} -> X
+ after 2000 ->
+ exit(last_msg_timeout)
+ end,
+ %% validate that no duplicates were written even though an internal
+ %% resend might have taken place
+ qos(Ch2, 100, false),
+ subscribe(Ch2, Q, false, 0),
+ validate_dedupe(Ch2, 1, N),
+
+ ok.
+
initial_cluster_size_one(Config) ->
[Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1598,6 +1669,30 @@ qos(Ch, Prefetch, Global) ->
amqp_channel:call(Ch, #'basic.qos'{global = Global,
prefetch_count = Prefetch})).
+validate_dedupe(Ch, N, N) ->
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag},
+ #amqp_msg{payload = B}} ->
+ I = binary_to_integer(B),
+ ?assertEqual(N, I),
+ ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false})
+ after 60000 ->
+ exit({missing_record, N})
+ end;
+validate_dedupe(Ch, N, M) ->
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag},
+ #amqp_msg{payload = B}} ->
+ I = binary_to_integer(B),
+ ?assertEqual(N, I),
+ ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false}),
+ validate_dedupe(Ch, N + 1, M)
+ after 60000 ->
+ exit({missing_record, N})
+ end.
+
receive_batch(Ch, N, N) ->
receive
{#'basic.deliver'{delivery_tag = DeliveryTag},
@@ -1642,3 +1737,12 @@ run_proper(Fun, Args, NumTests) ->
{on_output, fun(".", _) -> ok; % don't print the '.'s on new lines
(F, A) -> ct:pal(?LOW_IMPORTANCE, F, A)
end}])).
+
+flush() ->
+ receive
+ Any ->
+ ct:pal("flush ~p", [Any]),
+ flush()
+ after 0 ->
+ ok
+ end.