summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2021-01-11 16:30:14 +0000
committerkjnilsson <knilsson@pivotal.io>2021-01-13 12:09:44 +0000
commit2f0dba45d82b4e1f1ab7a0cbf75b3c9b99ff7a2b (patch)
tree017666fcc5e7bfaca72722009f482bfccaa87a12
parent0ef00f88edd1dd1a00d349c78d5cec75e0d6c8e7 (diff)
downloadrabbitmq-server-git-2f0dba45d82b4e1f1ab7a0cbf75b3c9b99ff7a2b.tar.gz
Stream: Channel resend on leader change
Detect when a new stream leader is elected and make stream_queues re-send any unconfirmed, pending messages to ensure they did not get lost during the leader change. This is done using the osiris deduplication feature to ensure the resend does not create duplicates of messages in the stream.
-rw-r--r--deps/rabbit/src/rabbit_amqqueue.erl12
-rw-r--r--deps/rabbit/src/rabbit_queue_type.erl25
-rw-r--r--deps/rabbit/src/rabbit_stream_coordinator.erl148
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl71
-rw-r--r--deps/rabbit/test/rabbit_stream_queue_SUITE.erl104
5 files changed, 315 insertions, 45 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl
index cd5f894680..435f5ff089 100644
--- a/deps/rabbit/src/rabbit_amqqueue.erl
+++ b/deps/rabbit/src/rabbit_amqqueue.erl
@@ -462,6 +462,8 @@ filter_per_type(all, _) ->
true;
filter_per_type(quorum, Q) ->
?amqqueue_is_quorum(Q);
+filter_per_type(stream, Q) ->
+ ?amqqueue_is_stream(Q);
filter_per_type(classic, Q) ->
?amqqueue_is_classic(Q).
@@ -1714,10 +1716,11 @@ cancel_sync_mirrors(QPid) ->
-spec is_replicated(amqqueue:amqqueue()) -> boolean().
-is_replicated(Q) when ?amqqueue_is_quorum(Q) ->
- true;
-is_replicated(Q) ->
- rabbit_mirror_queue_misc:is_mirrored(Q).
+is_replicated(Q) when ?amqqueue_is_classic(Q) ->
+ rabbit_mirror_queue_misc:is_mirrored(Q);
+is_replicated(_Q) ->
+ %% streams and quorum queues are all replicated
+ true.
is_exclusive(Q) when ?amqqueue_exclusive_owner_is(Q, none) ->
false;
@@ -1792,6 +1795,7 @@ on_node_down(Node) ->
ok.
delete_queues_on_node_down(Node) ->
+ rabbit_log:info("delete_queues_on_node_down GAHHH", []),
lists:unzip(lists:flatten([
rabbit_misc:execute_mnesia_transaction(
fun () -> [{Queue, delete_queue(Queue)} || Queue <- Queues] end
diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl
index 4e59b6a7c0..6ecaf40c7d 100644
--- a/deps/rabbit/src/rabbit_queue_type.erl
+++ b/deps/rabbit/src/rabbit_queue_type.erl
@@ -36,7 +36,6 @@
is_server_named_allowed/1
]).
-%% gah what is a good identity of a classic queue including all replicas
-type queue_name() :: rabbit_types:r(queue).
-type queue_ref() :: queue_name() | atom().
-type queue_state() :: term().
@@ -440,6 +439,7 @@ deliver(Qs, Delivery, stateless) ->
end, Qs),
{ok, stateless, []};
deliver(Qs, Delivery, #?STATE{} = State0) ->
+ %% TODO: optimise single queue case?
%% sort by queue type - then dispatch each group
ByType = lists:foldl(
fun (Q, Acc) ->
@@ -457,7 +457,7 @@ deliver(Qs, Delivery, #?STATE{} = State0) ->
end, {[], []}, ByType),
State = lists:foldl(
fun({Q, S}, Acc) ->
- Ctx = get_ctx(Q, Acc),
+ Ctx = get_ctx_with(Q, Acc, S),
set_ctx(qref(Q), Ctx#ctx{state = S}, Acc)
end, State0, Xs),
return_ok(State, Actions).
@@ -511,21 +511,32 @@ dequeue(Q, NoAck, LimiterPid, CTag, Ctxs) ->
Err
end.
-get_ctx(Q, #?STATE{ctxs = Contexts}) when ?is_amqqueue(Q) ->
+get_ctx(QOrQref, State) ->
+ get_ctx_with(QOrQref, State, undefined).
+
+get_ctx_with(Q, #?STATE{ctxs = Contexts}, InitState)
+ when ?is_amqqueue(Q) ->
Ref = qref(Q),
case Contexts of
#{Ref := #ctx{module = Mod,
state = State} = Ctx} ->
Ctx#ctx{state = Mod:update(Q, State)};
- _ ->
- %% not found - initialize
+ _ when InitState == undefined ->
+ %% not found and no initial state passed - initialize new state
+ Mod = amqqueue:get_type(Q),
+ Name = amqqueue:get_name(Q),
+ #ctx{module = Mod,
+ name = Name,
+ state = Mod:init(Q)};
+ _ ->
+ %% not found - initialize with supplied initial state
Mod = amqqueue:get_type(Q),
Name = amqqueue:get_name(Q),
#ctx{module = Mod,
name = Name,
- state = Mod:init(Q)}
+ state = InitState}
end;
-get_ctx(QRef, Contexts) when ?QREF(QRef) ->
+get_ctx_with(QRef, Contexts, undefined) when ?QREF(QRef) ->
case get_ctx(QRef, Contexts, undefined) of
undefined ->
exit({queue_context_not_found, QRef});
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl
index 6ae819171a..731a16e94d 100644
--- a/deps/rabbit/src/rabbit_stream_coordinator.erl
+++ b/deps/rabbit/src/rabbit_stream_coordinator.erl
@@ -30,7 +30,8 @@
start_cluster/1,
delete_cluster/2,
add_replica/2,
- delete_replica/2]).
+ delete_replica/2,
+ register_listener/1]).
-export([policy_changed/1]).
@@ -46,15 +47,62 @@
-export([log_overview/1]).
+-include("amqqueue.hrl").
+
-define(STREAM_COORDINATOR_STARTUP, {stream_coordinator_startup, self()}).
-define(TICK_TIMEOUT, 60000).
-define(RESTART_TIMEOUT, 1000).
-define(PHASE_RETRY_TIMEOUT, 10000).
-define(CMD_TIMEOUT, 30000).
--record(?MODULE, {streams, monitors}).
+-type stream_id() :: binary().
+-type stream() :: #{conf := osiris:config(),
+ atom() => term()}.
+-type stream_role() :: leader | follower | listener.
+-type queue_ref() :: rabbit_types:r(queue).
+
+-record(?MODULE, {streams = #{} :: #{stream_id() => stream()},
+ monitors = #{} :: #{pid() => {stream_id(), stream_role()}},
+ listeners = #{} :: #{stream_id() =>
+ #{pid() := queue_ref()}},
+ %% future extensibility
+ reserved_1,
+ reserved_2}).
+
+-type state() :: #?MODULE{}.
+-type command() :: {policy_changed, #{stream_id := stream_id()}} |
+ {start_cluster, #{queue := amqqueue:amqqueue()}} |
+ {start_cluster_reply, amqqueue:amqqueue()} |
+ {start_replica, #{stream_id := stream_id(),
+ node := node(),
+ retries := non_neg_integer()}} |
+ {start_replica_failed, #{stream_id := stream_id(),
+ node := node(),
+ retries := non_neg_integer()},
+ Reply :: term()} |
+ {start_replica_reply, stream_id(), pid()} |
+
+ {delete_replica, #{stream_id := stream_id(),
+ node := node()}} |
+ {delete_cluster, #{stream_id := stream_id(),
+ %% TODO: refine type
+ acting_user := term()}} |
+ {delete_cluster_reply, stream_id()} |
+
+ {start_leader_election, stream_id(), osiris:epoch(),
+ Offsets :: term()} |
+ {leader_elected, stream_id(), NewLeaderPid :: pid()} |
+ {replicas_stopped, stream_id()} |
+ {phase_finished, stream_id(), Reply :: term()} |
+ {stream_updated, stream()} |
+ {register_listener, #{pid := pid(),
+ stream_id := stream_id(),
+ queue_ref := queue_ref()}} |
+ ra_machine:effect().
+
+
+-export_type([command/0]).
--include("amqqueue.hrl").
start() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
ServerId = {?MODULE, node()},
@@ -119,6 +167,16 @@ policy_changed(Q) when ?is_amqqueue(Q) ->
delete_replica(StreamId, Node) ->
process_command({delete_replica, #{stream_id => StreamId, node => Node}}).
+-spec register_listener(amqeueue:amqqueue()) ->
+ {error, term()} | {ok, ok, atom() | {atom(), atom()}}.
+register_listener(Q) when ?is_amqqueue(Q)->
+ #{name := StreamId} = amqqueue:get_type_state(Q),
+ QRef= amqqueue:get_name(Q),
+ process_command({register_listener,
+ #{pid => self(),
+ stream_id => StreamId,
+ queue_ref => QRef}}).
+
process_command(Cmd) ->
global:set_lock(?STREAM_COORDINATOR_STARTUP),
Servers = ensure_coordinator_started(),
@@ -176,9 +234,10 @@ all_nodes() ->
[{?MODULE, Node} || Node <- [node() | Nodes]].
init(_Conf) ->
- #?MODULE{streams = #{},
- monitors = #{}}.
+ #?MODULE{}.
+-spec apply(map(), command(), state()) ->
+ {state(), term(), ra_machine:effects()}.
apply(Meta, {policy_changed, #{stream_id := StreamId,
queue := Q,
retries := Retries}} = Cmd,
@@ -233,7 +292,8 @@ apply(#{from := From}, {start_cluster, #{queue := Q}}, #?MODULE{streams = Stream
pending_cmds => [],
pending_replicas => []},
rabbit_log:debug("rabbit_stream_coordinator: ~p entering phase_start_cluster", [StreamId]),
- {State#?MODULE{streams = maps:put(StreamId, SState, Streams)}, '$ra_no_reply',
+ {State#?MODULE{streams = maps:put(StreamId, SState, Streams)},
+ '$ra_no_reply',
[{aux, {phase, StreamId, Phase, PhaseArgs}}]}
end;
apply(_Meta, {start_cluster_reply, Q}, #?MODULE{streams = Streams,
@@ -241,6 +301,9 @@ apply(_Meta, {start_cluster_reply, Q}, #?MODULE{streams = Streams,
#{name := StreamId,
leader_pid := LeaderPid,
replica_pids := ReplicaPids} = Conf = amqqueue:get_type_state(Q),
+ %% TODO: this doesn't guarantee that all replicas were started successfully
+ %% we need to do something to check if any were not started and start a
+ %% retry phase to get them running
SState0 = maps:get(StreamId, Streams),
Phase = phase_repair_mnesia,
PhaseArgs = [new, Q],
@@ -256,7 +319,9 @@ apply(_Meta, {start_cluster_reply, Q}, #?MODULE{streams = Streams,
{State#?MODULE{streams = maps:put(StreamId, SState, Streams),
monitors = Monitors}, ok,
MonitorActions ++ [{aux, {phase, StreamId, Phase, PhaseArgs}}]};
-apply(_Meta, {start_replica_failed, StreamId, Node, Retries, Reply},
+apply(_Meta, {start_replica_failed, #{stream_id := StreamId,
+ node := Node,
+ retries := Retries}, Reply},
#?MODULE{streams = Streams0} = State) ->
rabbit_log:debug("rabbit_stream_coordinator: ~p start replica failed", [StreamId]),
case maps:get(StreamId, Streams0, undefined) of
@@ -271,7 +336,6 @@ apply(_Meta, {start_replica_failed, StreamId, Node, Retries, Reply},
[{timer, {pipeline,
[{start_replica, #{stream_id => StreamId,
node => Node,
- from => undefined,
retries => Retries + 1}}]},
?RESTART_TIMEOUT * Retries}],
State#?MODULE{streams = Streams})
@@ -428,9 +492,25 @@ apply(_Meta, {delete_cluster_reply, StreamId}, #?MODULE{streams = Streams} = Sta
Actions = [{mod_call, ra, pipeline_command, [{?MODULE, node()}, Cmd]}
|| Cmd <- Pending],
{State, ok, Actions ++ wrap_reply(From, {ok, 0})};
-apply(_Meta, {down, Pid, _Reason} = Cmd, #?MODULE{streams = Streams,
- monitors = Monitors0} = State) ->
+apply(_Meta, {down, Pid, _Reason} = Cmd,
+ #?MODULE{streams = Streams,
+ listeners = Listeners0,
+ monitors = Monitors0} = State) ->
case maps:get(Pid, Monitors0, undefined) of
+ {StreamId, listener} ->
+ Listeners = case maps:take(StreamId, Listeners0) of
+ error ->
+ Listeners0;
+ {Pids0, Listeners1} ->
+ case maps:remove(Pid, Pids0) of
+ Pids when map_size(Pids) == 0 ->
+ Listeners1;
+ Pids ->
+ Listeners1#{StreamId => Pids}
+ end
+ end,
+ {State#?MODULE{listeners = Listeners,
+ monitors = maps:remove(Pid, Monitors0)}, ok, []};
{StreamId, Role} ->
Monitors = maps:remove(Pid, Monitors0),
case maps:get(StreamId, Streams, undefined) of
@@ -507,7 +587,9 @@ apply(_Meta, {start_leader_election, StreamId, Q, NewEpoch, Offsets},
{State#?MODULE{streams = Streams#{StreamId => SState}}, ok,
[{aux, {phase, StreamId, Phase, PhaseArgs}}]};
apply(_Meta, {leader_elected, StreamId, NewLeaderPid},
- #?MODULE{streams = Streams, monitors = Monitors0} = State) ->
+ #?MODULE{streams = Streams,
+ listeners = Listeners,
+ monitors = Monitors0} = State) ->
rabbit_log:info("rabbit_stream_coordinator: ~p leader elected", [StreamId]),
#{conf := Conf0,
pending_cmds := Pending0} = SState0 = maps:get(StreamId, Streams),
@@ -527,10 +609,22 @@ apply(_Meta, {leader_elected, StreamId, NewLeaderPid},
Monitors = maps:put(NewLeaderPid, {StreamId, leader}, maps:remove(LeaderPid, Monitors0)),
rabbit_log:debug("rabbit_stream_coordinator: ~p entering ~p after "
"leader election", [StreamId, Phase]),
+ Notifications = case maps:get(StreamId, Listeners, undefined) of
+ undefined ->
+ [];
+ Pids ->
+ maps:fold(
+ fun (Pid, QRef, Acc) ->
+ [{send_msg, Pid,
+ {queue_event, QRef,
+ {stream_leader_change, NewLeaderPid}},
+ cast} | Acc]
+ end, [], Pids)
+ end,
{State#?MODULE{streams = Streams#{StreamId => SState},
monitors = Monitors}, ok,
[{monitor, process, NewLeaderPid},
- {aux, {phase, StreamId, Phase, PhaseArgs}}]};
+ {aux, {phase, StreamId, Phase, PhaseArgs}} | Notifications]};
apply(_Meta, {replicas_stopped, StreamId}, #?MODULE{streams = Streams} = State) ->
case maps:get(StreamId, Streams, undefined) of
undefined ->
@@ -564,7 +658,25 @@ apply(_, {timeout, {pipeline, Cmds}}, State) ->
apply(_, {timeout, {aux, Cmd}}, State) ->
{State, ok, [{aux, Cmd}]};
apply(Meta, {_, #{from := From}} = Cmd, State) ->
- ?MODULE:apply(Meta#{from => From}, Cmd, State).
+ ?MODULE:apply(Meta#{from => From}, Cmd, State);
+apply(_Meta, {register_listener, #{pid := Pid,
+ stream_id := StreamId,
+ queue_ref := QRef}},
+ #?MODULE{listeners = Listeners0,
+ monitors = Monitors0} = State0) ->
+ Listeners = maps:update_with(StreamId,
+ fun (Pids) ->
+ maps:put(Pid, QRef, Pids)
+ end, #{Pid => QRef}, Listeners0),
+ Monitors = maps:put(Pid, {StreamId, listener}, Monitors0),
+
+ {State0#?MODULE{listeners = Listeners,
+ monitors = Monitors}, ok,
+ [{monitor, process, Pid}]};
+apply(_Meta, UnkCmd, State) ->
+ rabbit_log:debug("rabbit_stream_coordinator: unknown command ~W",
+ [UnkCmd, 10]),
+ {State, {error, unknown_command}, []}.
state_enter(leader, #?MODULE{streams = Streams, monitors = Monitors}) ->
maps:fold(fun(_, #{conf := #{name := StreamId},
@@ -760,13 +872,19 @@ phase_start_replica(Node, #{name := StreamId} = Conf0,
rabbit_log:warning("Error while starting replica for ~p : ~p",
[maps:get(name, Conf0), Reason]),
ra:pipeline_command({?MODULE, node()},
- {start_replica_failed, StreamId, Node, Retries, Error})
+ {start_replica_failed,
+ #{stream_id => StreamId,
+ node => Node,
+ retries => Retries}, Error})
end
catch _:E->
rabbit_log:warning("Error while starting replica for ~p : ~p",
[maps:get(name, Conf0), E]),
ra:pipeline_command({?MODULE, node()},
- {start_replica_failed, StreamId, Node, Retries, {error, E}})
+ {start_replica_failed,
+ #{stream_id => StreamId,
+ node => Node,
+ retries => Retries}, {error, E}})
end
end).
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index 1f789f165b..695ab35241 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -45,7 +45,7 @@
-export([format_osiris_event/2]).
-export([update_stream_conf/2]).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
-include("amqqueue.hrl").
-define(INFO_KEYS, [name, durable, auto_delete, arguments, leader, members, online, state,
@@ -54,6 +54,9 @@
-type appender_seq() :: non_neg_integer().
+-type msg_id() :: non_neg_integer().
+-type msg() :: term(). %% TODO: refine
+
-record(stream, {name :: rabbit_types:r('queue'),
credit :: integer(),
max :: non_neg_integer(),
@@ -64,10 +67,11 @@
-record(stream_client, {name :: term(),
leader :: pid(),
next_seq = 1 :: non_neg_integer(),
- correlation = #{} :: #{appender_seq() => term()},
+ correlation = #{} :: #{appender_seq() => {msg_id(), msg()}},
soft_limit :: non_neg_integer(),
slow = false :: boolean(),
- readers = #{} :: #{term() => #stream{}}
+ readers = #{} :: #{term() => #stream{}},
+ writer_id :: binary()
}).
-import(rabbit_queue_type_util, [args_policy_lookup/3]).
@@ -284,16 +288,17 @@ deliver(QSs, #delivery{confirm = Confirm} = Delivery) ->
deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId},
#stream_client{name = Name,
leader = LeaderPid,
+ writer_id = WriterId,
next_seq = Seq,
correlation = Correlation0,
soft_limit = SftLmt,
slow = Slow0} = State) ->
- ok = osiris:write(LeaderPid, undefined, Seq, msg_to_iodata(Msg)),
+ ok = osiris:write(LeaderPid, WriterId, Seq, msg_to_iodata(Msg)),
Correlation = case MsgId of
undefined ->
Correlation0;
_ when is_number(MsgId) ->
- Correlation0#{Seq => MsgId}
+ Correlation0#{Seq => {MsgId, Msg}}
end,
Slow = case maps:size(Correlation) >= SftLmt of
true when not Slow0 ->
@@ -305,16 +310,21 @@ deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId},
State#stream_client{next_seq = Seq + 1,
correlation = Correlation,
slow = Slow}.
+
-spec dequeue(_, _, _, client()) -> no_return().
dequeue(_, _, _, #stream_client{name = Name}) ->
{protocol_error, not_implemented, "basic.get not supported by stream queues ~s",
[rabbit_misc:rs(Name)]}.
-handle_event({osiris_written, From, _WriterId, Corrs}, State = #stream_client{correlation = Correlation0,
- soft_limit = SftLmt,
- slow = Slow0,
- name = Name}) ->
- MsgIds = maps:values(maps:with(Corrs, Correlation0)),
+handle_event({osiris_written, From, _WriterId, Corrs},
+ State = #stream_client{correlation = Correlation0,
+ soft_limit = SftLmt,
+ slow = Slow0,
+ name = Name}) ->
+ MsgIds = lists:sort(maps:fold(
+ fun (_Seq, {I, _M}, Acc) ->
+ [I | Acc]
+ end, [], maps:with(Corrs, Correlation0))),
Correlation = maps:without(Corrs, Correlation0),
Slow = case maps:size(Correlation) < SftLmt of
true when Slow0 ->
@@ -325,9 +335,10 @@ handle_event({osiris_written, From, _WriterId, Corrs}, State = #stream_client{co
end,
{ok, State#stream_client{correlation = Correlation,
slow = Slow}, [{settled, From, MsgIds}]};
-handle_event({osiris_offset, _From, _Offs}, State = #stream_client{leader = Leader,
- readers = Readers0,
- name = Name}) ->
+handle_event({osiris_offset, _From, _Offs},
+ State = #stream_client{leader = Leader,
+ readers = Readers0,
+ name = Name}) ->
%% offset isn't actually needed as we use the atomic to read the
%% current committed
{Readers, TagMsgs} = maps:fold(
@@ -342,7 +353,9 @@ handle_event({osiris_offset, _From, _Offs}, State = #stream_client{leader = Lead
Ack = true,
Deliveries = [{deliver, Tag, Ack, OffsetMsg}
|| {Tag, _LeaderPid, OffsetMsg} <- TagMsgs],
- {ok, State#stream_client{readers = Readers}, Deliveries}.
+ {ok, State#stream_client{readers = Readers}, Deliveries};
+handle_event({stream_leader_change, Pid}, State) ->
+ {ok, update_leader_pid(Pid, State), []}.
is_recoverable(Q) ->
Node = node(),
@@ -358,8 +371,8 @@ recover(_VHost, Queues) ->
end, {[], []}, Queues).
settle(complete, CTag, MsgIds, #stream_client{readers = Readers0,
- name = Name,
- leader = Leader} = State) ->
+ name = Name,
+ leader = Leader} = State) ->
Credit = length(MsgIds),
{Readers, Msgs} = case Readers0 of
#{CTag := #stream{credit = Credit0} = Str0} ->
@@ -450,9 +463,13 @@ i(_, _) ->
init(Q) when ?is_amqqueue(Q) ->
Leader = amqqueue:get_pid(Q),
+ {ok, ok, _} = rabbit_stream_coordinator:register_listener(Q),
+ Prefix = erlang:pid_to_list(self()) ++ "_",
+ WriterId = rabbit_guid:binary(rabbit_guid:gen(), Prefix),
{ok, SoftLimit} = application:get_env(rabbit, stream_messages_soft_limit),
#stream_client{name = amqqueue:get_name(Q),
leader = Leader,
+ writer_id = WriterId,
soft_limit = SoftLimit}.
close(#stream_client{readers = Readers}) ->
@@ -461,8 +478,15 @@ close(#stream_client{readers = Readers}) ->
end, Readers),
ok.
-update(_, State) ->
- State.
+update(Q, State)
+ when ?is_amqqueue(Q) ->
+ Pid = amqqueue:get_pid(Q),
+ update_leader_pid(Pid, State).
+
+update_leader_pid(Pid, #stream_client{leader = Pid} = State) ->
+ State;
+update_leader_pid(Pid, #stream_client{} = State) ->
+ resend_all(State#stream_client{leader = Pid}).
state_info(_) ->
#{}.
@@ -611,7 +635,7 @@ initial_cluster_size(Val) ->
res_arg(PolVal, undefined) -> PolVal;
res_arg(_, ArgVal) -> ArgVal.
-
+
queue_name(#resource{virtual_host = VHost, name = Name}) ->
Timestamp = erlang:integer_to_binary(erlang:system_time()),
osiris_util:to_base64uri(erlang:binary_to_list(<<VHost/binary, "_", Name/binary, "_",
@@ -729,3 +753,12 @@ capabilities() ->
<<"x-initial-cluster-size">>, <<"x-queue-leader-locator">>],
consumer_arguments => [<<"x-stream-offset">>],
server_named => false}.
+
+resend_all(#stream_client{leader = LeaderPid,
+ writer_id = WriterId,
+ correlation = Corrs} = State) ->
+ Msgs = lists:sort(maps:values(Corrs)),
+ [begin
+ ok = osiris:write(LeaderPid, WriterId, Seq, msg_to_iodata(Msg))
+ end || {Seq, Msg} <- Msgs],
+ State.
diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
index 92d9f8c806..a010869043 100644
--- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
+++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
@@ -47,6 +47,7 @@ groups() ->
delete_quorum_replica,
consume_from_replica,
leader_failover,
+ leader_failover_dedupe,
initial_cluster_size_one,
initial_cluster_size_two,
initial_cluster_size_one_policy,
@@ -1194,6 +1195,76 @@ leader_failover(Config) ->
?assert(NewLeader =/= Server1),
ok = rabbit_ct_broker_helpers:start_node(Config, Server1).
+leader_failover_dedupe(Config) ->
+ %% tests that in-flight messages are automatically handled in the case where
+ %% a leader change happens during publishing
+ [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ check_leader_and_replicas(Config, Q, Server1, [Server2, Server3]),
+
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch2, #'confirm.select'{}),
+
+ Self= self(),
+ F = fun F(N) ->
+ receive
+ go ->
+ [publish(Ch2, Q, integer_to_binary(N + I))
+ || I <- lists:seq(1, 100)],
+ true = amqp_channel:wait_for_confirms(Ch2, 25),
+ F(N + 100);
+ stop ->
+ Self ! {last_msg, N},
+ ct:pal("stop"),
+ ok
+ after 2 ->
+ self() ! go,
+ F(N)
+ end
+ end,
+ Pid = spawn(fun () ->
+ amqp_channel:register_confirm_handler(Ch2, self()),
+ F(0)
+ end),
+ erlang:monitor(process, Pid),
+ Pid ! go,
+ timer:sleep(10),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ %% this should cause a new leader to be elected and the channel on node 2
+ %% to have to resend any pending messages to ensure none is lost
+ timer:sleep(30000),
+ [Info] = lists:filter(
+ fun(Props) ->
+ QName = rabbit_misc:r(<<"/">>, queue, Q),
+ lists:member({name, QName}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader, members]])),
+ NewLeader = proplists:get_value(leader, Info),
+ ?assert(NewLeader =/= Server1),
+ flush(),
+ ?assert(erlang:is_process_alive(Pid)),
+ Pid ! stop,
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
+
+ N = receive
+ {last_msg, X} -> X
+ after 2000 ->
+ exit(last_msg_timeout)
+ end,
+ %% validate that no duplicates were written even though an internal
+ %% resend might have taken place
+ qos(Ch2, 100, false),
+ subscribe(Ch2, Q, false, 0),
+ validate_dedupe(Ch2, 1, N),
+
+ ok.
+
initial_cluster_size_one(Config) ->
[Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1598,6 +1669,30 @@ qos(Ch, Prefetch, Global) ->
amqp_channel:call(Ch, #'basic.qos'{global = Global,
prefetch_count = Prefetch})).
+validate_dedupe(Ch, N, N) ->
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag},
+ #amqp_msg{payload = B}} ->
+ I = binary_to_integer(B),
+ ?assertEqual(N, I),
+ ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false})
+ after 60000 ->
+ exit({missing_record, N})
+ end;
+validate_dedupe(Ch, N, M) ->
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag},
+ #amqp_msg{payload = B}} ->
+ I = binary_to_integer(B),
+ ?assertEqual(N, I),
+ ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false}),
+ validate_dedupe(Ch, N + 1, M)
+ after 60000 ->
+ exit({missing_record, N})
+ end.
+
receive_batch(Ch, N, N) ->
receive
{#'basic.deliver'{delivery_tag = DeliveryTag},
@@ -1642,3 +1737,12 @@ run_proper(Fun, Args, NumTests) ->
{on_output, fun(".", _) -> ok; % don't print the '.'s on new lines
(F, A) -> ct:pal(?LOW_IMPORTANCE, F, A)
end}])).
+
+flush() ->
+ receive
+ Any ->
+ ct:pal("flush ~p", [Any]),
+ flush()
+ after 0 ->
+ ok
+ end.