summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-11-27 11:16:25 +0100
committerGitHub <noreply@github.com>2020-11-27 11:16:25 +0100
commit60ba89aabe86d23a629359d61d9d9fe8c383e69a (patch)
treec5bdcb44fe577a4f9e0dcb65754427138562f045
parent30811a8244e826454d5f7b3e01ba34b8de6c1f91 (diff)
parent67308be13c2cac736c9b9184f7e00d01510e924f (diff)
downloadrabbitmq-server-git-60ba89aabe86d23a629359d61d9d9fe8c383e69a.tar.gz
Merge pull request #2657 from rabbitmq/rabbitmq-stream-writer-dedupe
Add publishing deduplication to streams
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl4
-rw-r--r--deps/rabbitmq_stream/docs/PROTOCOL.adoc74
-rw-r--r--deps/rabbitmq_stream/include/rabbit_stream.hrl4
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl2
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl282
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_utils.erl27
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl27
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java16
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java2
9 files changed, 351 insertions, 87 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index d130c7225b..8841e52ce3 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -286,7 +286,7 @@ deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId},
correlation = Correlation0,
soft_limit = SftLmt,
slow = Slow0} = State) ->
- ok = osiris:write(LeaderPid, Seq, msg_to_iodata(Msg)),
+ ok = osiris:write(LeaderPid, undefined, Seq, msg_to_iodata(Msg)),
Correlation = case MsgId of
undefined ->
Correlation0;
@@ -308,7 +308,7 @@ dequeue(_, _, _, #stream_client{name = Name}) ->
{protocol_error, not_implemented, "basic.get not supported by stream queues ~s",
[rabbit_misc:rs(Name)]}.
-handle_event({osiris_written, From, Corrs}, State = #stream_client{correlation = Correlation0,
+handle_event({osiris_written, From, _WriterId, Corrs}, State = #stream_client{correlation = Correlation0,
soft_limit = SftLmt,
slow = Slow0,
name = Name}) ->
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
index 14e149664c..025a359d17 100644
--- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc
+++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
@@ -142,6 +142,21 @@ doest not contain a correlation ID.
|17
|Yes
+|<<declareproducer>>
+|Client
+|18
+|Yes
+
+|<<deleteproducer>>
+|Client
+|19
+|Yes
+
+|<<querypublishersequence>>
+|Client
+|20
+|Yes
+
|<<create>>
|Client
|998
@@ -159,7 +174,6 @@ doest not contain a correlation ID.
Publish => Key Version Stream PublishedMessages
Key => int16 // 0
Version => int16
- Stream => string // the name of the stream
PublisherId => uint8
PublishedMessages => [PublishedMessage]
PublishedMessage => PublishingId Message
@@ -249,7 +263,7 @@ Unsubscribe => Key Version CorrelationId SubscriptionId
PublishError => Key Version [PublishingError]
Key => int16 // 6
Version => int16
- PublisherId => int8
+ PublisherId => uint8
PublishingError => PublishingId Code
PublishingId => int64
Code => int16 // code to identify the problem
@@ -422,12 +436,64 @@ QueryOffsetRequest => Key Version CorrelationId Reference Stream
Reference => string // max 256 characters
Stream => string
-QueryOffsetResponse => Key Version CorrelationId Reference Stream
+QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset
Key => int16 // 17
Version => int16
CorrelationId => int32
ResponseCode => int16
- Offset => int64
+ Offset => uint64
+```
+
+=== DeclarePublisher
+
+```
+DeclarePublisherRequest => Key Version CorrelationId PublisherId [PublisherReference] Stream
+ Key => int16 // 18
+ Version => int16
+ CorrelationId => int32
+ PublisherId => uint8
+ PublisherReference => string // max 256 characters
+ Stream => string
+
+DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId
+ Key => int16 // 18
+ Version => int16
+ CorrelationId => int32
+ ResponseCode => int16
+```
+
+=== DeletePublisher
+
+```
+DeletePublisherRequest => Key Version CorrelationId PublisherId
+ Key => int16 // 19
+ Version => int16
+ CorrelationId => int32
+ PublisherId => uint8
+
+DeletePublisherResponse => Key Version CorrelationId ResponseCode
+ Key => int16 // 19
+ Version => int16
+ CorrelationId => int32
+ ResponseCode => int16
+```
+
+=== QueryPublisherSequence
+
+```
+QueryPublisherRequest => Key Version CorrelationId PublisherReference Stream
+ Key => int16 // 20
+ Version => int16
+ CorrelationId => int32
+ PublisherReference => string // max 256 characters
+ Stream => string
+
+QueryPublisherResponse => Key Version CorrelationId ResponseCode Sequence
+ Key => int16 // 20
+ Version => int16
+ CorrelationId => int32
+ ResponseCode => int16
+ Sequence => uint64
```
=== Create
diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl
index 0593893d93..6bb7e9504c 100644
--- a/deps/rabbitmq_stream/include/rabbit_stream.hrl
+++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl
@@ -16,6 +16,9 @@
-define(COMMAND_PEER_PROPERTIES, 15).
-define(COMMAND_COMMIT_OFFSET, 16).
-define(COMMAND_QUERY_OFFSET, 17).
+-define(COMMAND_DECLARE_PUBLISHER, 18).
+-define(COMMAND_DELETE_PUBLISHER, 19).
+-define(COMMAND_QUERY_PUBLISHER_SEQUENCE, 20).
-define(COMMAND_CREATE_STREAM, 998).
-define(COMMAND_DELETE_STREAM, 999).
@@ -38,6 +41,7 @@
-define(RESPONSE_CODE_INTERNAL_ERROR, 14).
-define(RESPONSE_CODE_ACCESS_REFUSED, 15).
-define(RESPONSE_CODE_PRECONDITION_FAILED, 16).
+-define(RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST, 17).
-define(OFFSET_TYPE_FIRST, 0).
-define(OFFSET_TYPE_LAST, 1).
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index e418dd1022..f329479d2e 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -161,6 +161,7 @@ handle_call({lookup_leader, VirtualHost, Stream}, _From, State) ->
case is_stream_queue(Q) of
true ->
#{leader_pid := LeaderPid} = amqqueue:get_type_state(Q),
+ % FIXME check if pid is alive in case of stale information
LeaderPid;
_ ->
cluster_not_found
@@ -184,6 +185,7 @@ handle_call({lookup_local_member, VirtualHost, Stream}, _From, State) ->
Acc
end
end, undefined, [LeaderPid] ++ ReplicaPids),
+ % FIXME check if pid is alive in case of stale information
case LocalMember of
undefined ->
{error, not_available};
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index d3b4820256..3aadd03f52 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -19,20 +19,32 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-include("rabbit_stream.hrl").
+-type stream() :: binary().
+-type publisher_id() :: byte().
+-type publisher_reference() :: binary().
+-type subscription_id() :: byte().
+
+-record(publisher, {
+ publisher_id :: publisher_id(),
+ stream :: stream(),
+ reference :: 'undefined' | publisher_reference(),
+ leader :: pid()
+}).
+
-record(consumer, {
socket :: rabbit_net:socket(), %% ranch_transport:socket(),
member_pid :: pid(),
offset :: osiris:offset(),
- subscription_id :: integer(),
+ subscription_id :: subscription_id(),
segment :: osiris_log:state(),
credit :: integer(),
- stream :: binary()
+ stream :: stream()
}).
-record(stream_connection_state, {
data :: 'none' | binary(),
blocked :: boolean(),
- consumers :: #{integer() => #consumer{}}
+ consumers :: #{subscription_id() => #consumer{}}
}).
-record(stream_connection, {
@@ -49,8 +61,10 @@
connected_at :: integer(),
helper_sup :: pid(),
socket :: rabbit_net:socket(),
- stream_leaders :: #{binary() => pid()},
- stream_subscriptions :: #{binary() => [integer()]},
+ publishers :: #{publisher_id() => #publisher{}}, %% FIXME replace with a list (0-255 lookup faster?)
+ publisher_to_ids :: #{{stream(), publisher_reference()} => publisher_id()},
+ stream_leaders :: #{stream() => pid()},
+ stream_subscriptions :: #{stream() => [subscription_id()]},
credits :: atomics:atomics_ref(),
authentication_state :: atom(),
user :: 'undefined' | #user{},
@@ -60,7 +74,7 @@
heartbeat :: integer(),
heartbeater :: any(),
client_properties = #{} :: #{binary() => binary()},
- monitors = #{} :: #{reference() => binary()},
+ monitors = #{} :: #{reference() => stream()},
stats_timer :: reference(),
send_file_oct :: atomics:atomics_ref()
}).
@@ -123,6 +137,8 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits,
auth_mechanism = none,
helper_sup = KeepaliveSup,
socket = RealSocket,
+ publishers = #{},
+ publisher_to_ids = #{},
stream_leaders = #{},
stream_subscriptions = #{},
credits = Credits,
@@ -232,6 +248,7 @@ close(Transport, S) ->
listen_loop_post_auth(Transport, #stream_connection{socket = S,
stream_subscriptions = StreamSubscriptions, credits = Credits,
heartbeater = Heartbeater, monitors = Monitors, client_properties = ClientProperties,
+ publisher_to_ids = PublisherRefToIds,
send_file_oct = SendFileOct} = Connection0,
#stream_connection_state{consumers = Consumers, blocked = Blocked} = State,
#configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking} = Configuration) ->
@@ -292,7 +309,7 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S,
{Connection, State}
end,
listen_loop_post_auth(Transport, Connection1, State1, Configuration);
- {'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, CorrelationList}}} ->
+ {'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, undefined, CorrelationList}}} ->
{FirstPublisherId, _FirstPublishingId} = lists:nth(1, CorrelationList),
{LastPublisherId, LastPublishingIds, LastCount} = lists:foldl(fun({PublisherId, PublishingId}, {CurrentPublisherId, PublishingIds, Count}) ->
case PublisherId of
@@ -328,6 +345,32 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S,
State
end,
listen_loop_post_auth(Transport, Connection, State1, Configuration);
+ {'$gen_cast', {queue_event, _QueueResource, {osiris_written, #resource{name = Stream}, PublisherReference, CorrelationList}}} ->
+ %% FIXME handle case when publisher ID is not found (e.g. deleted before confirms arrive)
+ PublisherId = maps:get({Stream, PublisherReference}, PublisherRefToIds, undefined),
+ PubIds = lists:foldl(fun(PublishingId, PublishingIds) ->
+ [PublishingIds, <<PublishingId:64>>]
+ end, <<>>, CorrelationList),
+ PublishingIdCount = length(CorrelationList),
+ FrameSize = 2 + 2 + 1 + 4 + PublishingIdCount * 8,
+ Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16>>,
+ <<PublisherId:8>>,
+ <<PublishingIdCount:32>>, PubIds]),
+ add_credits(Credits, PublishingIdCount),
+ State1 = case Blocked of
+ true ->
+ case has_enough_credits_to_unblock(Credits, CreditsRequiredForUnblocking) of
+ true ->
+ Transport:setopts(S, [{active, once}]),
+ ok = rabbit_heartbeat:resume_monitor(Heartbeater),
+ State#stream_connection_state{blocked = false};
+ false ->
+ State
+ end;
+ false ->
+ State
+ end,
+ listen_loop_post_auth(Transport, Connection, State1, Configuration);
{'$gen_cast', {queue_event, #resource{name = StreamName}, {osiris_offset, _QueueResource, -1}}} ->
rabbit_log:info("received osiris offset event for ~p with offset ~p~n", [StreamName, -1]),
listen_loop_post_auth(Transport, Connection, State, Configuration);
@@ -682,36 +725,92 @@ notify_auth_result(Username, AuthResult, ExtraProps, Connection, ConnectionState
ExtraProps,
rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
-handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits,
- virtual_host = VirtualHost, user = User} = Connection, State,
- <<?COMMAND_PUBLISH:16, ?VERSION_0:16,
- StreamSize:16, Stream:StreamSize/binary,
- PublisherId:8/unsigned,
- MessageCount:32, Messages/binary>>, Rest) ->
+handle_frame_post_auth(Transport, #stream_connection{
+ virtual_host = VirtualHost, user = User,
+ publishers = Publishers0, publisher_to_ids = RefIds0} = Connection0, State,
+ <<?COMMAND_DECLARE_PUBLISHER:16, ?VERSION_0:16, CorrelationId:32,
+ PublisherId:8,
+ ReferenceSize:16, Reference:ReferenceSize/binary,
+ StreamSize:16, Stream:StreamSize/binary>>, Rest) ->
case rabbit_stream_utils:check_write_permitted(
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
User,
#{}) of
ok ->
- case lookup_leader(Stream, Connection) of
- cluster_not_found ->
+ case {maps:is_key(PublisherId, Publishers0), maps:is_key({Stream, Reference}, RefIds0)} of
+ {false, false} ->
+ case lookup_leader(Stream, Connection0) of
+ cluster_not_found ->
+ response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST),
+ {Connection0, State, Rest};
+ {ClusterLeader, #stream_connection{publishers = Publishers0, publisher_to_ids = RefIds0} = Connection1} ->
+ {PublisherReference, RefIds1} = case Reference of
+ <<"">> -> {undefined, RefIds0};
+ _ -> {Reference, RefIds0#{{Stream, Reference} => PublisherId}}
+ end,
+ Publisher = #publisher{publisher_id = PublisherId,
+ stream = Stream,
+ reference = PublisherReference,
+ leader = ClusterLeader},
+ response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK),
+ {Connection1#stream_connection{publishers = Publishers0#{PublisherId => Publisher},
+ publisher_to_ids = RefIds1}, State, Rest}
+ end;
+ {_, _} ->
+ response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED),
+ {Connection0, State, Rest}
+ end;
+ error ->
+ response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED),
+ {Connection0, State, Rest}
+ end;
+handle_frame_post_auth(Transport, #stream_connection{publishers = Publishers,
+ publisher_to_ids = PubToIds} = Connection0, State,
+ <<?COMMAND_DELETE_PUBLISHER:16, ?VERSION_0:16, CorrelationId:32,
+ PublisherId:8>>, Rest) ->
+ case Publishers of
+ #{PublisherId := #publisher{stream = Stream, reference = Ref}} ->
+ Connection1 = Connection0#stream_connection{
+ publishers = maps:remove(PublisherId, Publishers),
+ publisher_to_ids = maps:remove({Stream, Ref}, PubToIds)},
+ Connection2 = maybe_clean_connection_from_stream(Stream, Connection1),
+ response(Transport, Connection1, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK),
+ {Connection2, State, Rest};
+ _ ->
+ response(Transport, Connection0, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST),
+ {Connection0, State, Rest}
+ end;
+handle_frame_post_auth(Transport, #stream_connection{
+ socket = S, credits = Credits,
+ virtual_host = VirtualHost, user = User, publishers = Publishers} = Connection, State,
+ <<?COMMAND_PUBLISH:16, ?VERSION_0:16,
+ PublisherId:8/unsigned,
+ MessageCount:32, Messages/binary>>, Rest) ->
+ case Publishers of
+ #{PublisherId := Publisher} ->
+ #publisher{stream = Stream, reference = Reference, leader = Leader} = Publisher,
+ case rabbit_stream_utils:check_write_permitted(
+ #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
+ User,
+ #{}) of
+ ok ->
+ rabbit_stream_utils:write_messages(Leader, Reference, PublisherId, Messages),
+ sub_credits(Credits, MessageCount),
+ {Connection, State, Rest};
+ error ->
FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount,
- Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, Messages),
+ Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_ACCESS_REFUSED, Messages),
Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_ERROR:16, ?VERSION_0:16,
PublisherId:8,
MessageCount:32, Details/binary>>]),
- {Connection, State, Rest};
- {ClusterLeader, Connection1} ->
- rabbit_stream_utils:write_messages(ClusterLeader, PublisherId, Messages),
- sub_credits(Credits, MessageCount),
- {Connection1, State, Rest}
+ {Connection, State, Rest}
end;
- error ->
+ _ ->
FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount,
- Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_ACCESS_REFUSED, Messages),
+ Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST, Messages),
Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_ERROR:16, ?VERSION_0:16,
- PublisherId:8,
- MessageCount:32, Details/binary>>]),
+ PublisherId:8,
+ MessageCount:32, Details/binary>>]),
{Connection, State, Rest}
end;
handle_frame_post_auth(Transport, #stream_connection{socket = Socket,
@@ -720,6 +819,7 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket,
#stream_connection_state{consumers = Consumers} = State,
<<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:8/unsigned, StreamSize:16, Stream:StreamSize/binary,
OffsetType:16/signed, OffsetAndCredit/binary>>, Rest) ->
+ %% FIXME check the max number of subs is not reached already
case rabbit_stream_utils:check_read_permitted(
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
User,
@@ -788,8 +888,7 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket,
response(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED),
{Connection, State, Rest}
end;
-handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = StreamSubscriptions,
- stream_leaders = StreamLeaders} = Connection,
+handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = StreamSubscriptions} = Connection,
#stream_connection_state{consumers = Consumers} = State,
<<?COMMAND_UNSUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:8/unsigned>>, Rest) ->
case subscription_exists(StreamSubscriptions, SubscriptionId) of
@@ -801,28 +900,19 @@ handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = Stre
Stream = Consumer#consumer.stream,
#{Stream := SubscriptionsForThisStream} = StreamSubscriptions,
SubscriptionsForThisStream1 = lists:delete(SubscriptionId, SubscriptionsForThisStream),
- {Connection1, StreamSubscriptions1, StreamLeaders1} =
+ StreamSubscriptions1 =
case length(SubscriptionsForThisStream1) of
0 ->
- %% no more subscriptions for this stream
- %% we unregister even though it could affect publishing if the stream is published to
- %% from this connection and is deleted.
- %% to mitigate this, we remove the stream from the leaders cache
- %% this way the stream leader will be looked up in the next publish command
- %% and registered to.
- C = demonitor_stream(Stream, Connection),
- {C, maps:remove(Stream, StreamSubscriptions),
- maps:remove(Stream, StreamLeaders)
- };
+ % no more subscription for this stream
+ maps:remove(Stream, StreamSubscriptions);
_ ->
- {Connection, StreamSubscriptions#{Stream => SubscriptionsForThisStream1}, StreamLeaders}
+ StreamSubscriptions#{Stream => SubscriptionsForThisStream1}
end,
+ Connection1 = Connection#stream_connection{stream_subscriptions = StreamSubscriptions1},
Consumers1 = maps:remove(SubscriptionId, Consumers),
+ Connection2 = maybe_clean_connection_from_stream(Stream, Connection1),
response_ok(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId),
- {Connection1#stream_connection{
- stream_subscriptions = StreamSubscriptions1,
- stream_leaders = StreamLeaders1
- }, State#stream_connection_state{consumers = Consumers1}, Rest}
+ {Connection2, State#stream_connection_state{consumers = Consumers1}, Rest}
end;
handle_frame_post_auth(Transport, #stream_connection{socket = S, send_file_oct = SendFileOct} = Connection,
#stream_connection_state{consumers = Consumers} = State,
@@ -901,6 +991,34 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host =
Transport:send(S, [<<FrameSize:32, ?COMMAND_QUERY_OFFSET:16, ?VERSION_0:16>>,
<<CorrelationId:32>>, <<ResponseCode:16>>, <<Offset:64>>]),
{Connection, State, Rest};
+handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = VirtualHost, user = User} = Connection,
+ State,
+ <<?COMMAND_QUERY_PUBLISHER_SEQUENCE:16, ?VERSION_0:16, CorrelationId:32,
+ ReferenceSize:16, Reference:ReferenceSize/binary,
+ StreamSize:16, Stream:StreamSize/binary>>, Rest) ->
+ FrameSize = ?RESPONSE_FRAME_SIZE + 8,
+ {ResponseCode, Sequence} = case rabbit_stream_utils:check_read_permitted(
+ #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
+ User,
+ #{}) of
+ ok ->
+ case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of
+ {error, not_found} ->
+ {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0};
+ {ok, LocalMemberPid} ->
+ {?RESPONSE_CODE_OK, case osiris:fetch_writer_seq(LocalMemberPid, Reference) of
+ undefined ->
+ 0;
+ Offt ->
+ Offt
+ end}
+ end;
+ error ->
+ {?RESPONSE_CODE_ACCESS_REFUSED, 0}
+ end,
+ Transport:send(S, [<<FrameSize:32, ?COMMAND_QUERY_PUBLISHER_SEQUENCE:16, ?VERSION_0:16>>,
+ <<CorrelationId:32>>, <<ResponseCode:16>>, <<Sequence:64>>]),
+ {Connection, State, Rest};
handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, user = #user{username = Username} = User} = Connection,
State,
<<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary,
@@ -1080,21 +1198,49 @@ handle_frame_post_close(_Transport, Connection, State, Frame, Rest) ->
rabbit_log:warning("ignored frame on close ~p ~p.~n", [Frame, Rest]),
{Connection, State, Rest}.
-clean_state_after_stream_deletion_or_failure(Stream, #stream_connection{stream_leaders = StreamLeaders, stream_subscriptions = StreamSubscriptions} = Connection,
- #stream_connection_state{consumers = Consumers} = State) ->
- case {maps:is_key(Stream, StreamSubscriptions), maps:is_key(Stream, StreamLeaders)} of
- {true, _} ->
+clean_state_after_stream_deletion_or_failure(Stream,
+ #stream_connection{stream_subscriptions = StreamSubscriptions,
+ publishers = Publishers,
+ publisher_to_ids = PublisherToIds,
+ stream_leaders = Leaders} = C0,
+ #stream_connection_state{consumers = Consumers} = S0) ->
+ {SubscriptionsCleaned, C1, S1} = case stream_has_subscriptions(Stream, C0) of
+ true ->
#{Stream := SubscriptionIds} = StreamSubscriptions,
- {cleaned, Connection#stream_connection{
- stream_leaders = maps:remove(Stream, StreamLeaders),
+ {true, C0#stream_connection{
stream_subscriptions = maps:remove(Stream, StreamSubscriptions)
- }, State#stream_connection_state{consumers = maps:without(SubscriptionIds, Consumers)}};
- {false, true} ->
- {cleaned, Connection#stream_connection{
- stream_leaders = maps:remove(Stream, StreamLeaders)
- }, State};
- {false, false} ->
- {not_cleaned, Connection, State}
+ }, S0#stream_connection_state{consumers = maps:without(SubscriptionIds, Consumers)}};
+ false ->
+ {false, C0, S0}
+ end,
+ {PublishersCleaned, C2, S2} = case stream_has_publishers(Stream, C1) of
+ true ->
+ {PurgedPubs, PurgedPubToIds} = maps:fold(
+ fun(PubId, #publisher{stream = S, reference = Ref}, {Pubs, PubToIds}) ->
+ case S of
+ Stream ->
+ {maps:remove(PubId, Pubs), maps:remove({Stream, Ref}, PubToIds)};
+ _ ->
+ {Pubs, PubToIds}
+ end
+ end, {Publishers, PublisherToIds}, Publishers),
+ {true, C1#stream_connection{publishers = PurgedPubs, publisher_to_ids = PurgedPubToIds},
+ S1};
+ false ->
+ {false, C1, S1}
+ end,
+ {LeadersCleaned, Leaders1} = case Leaders of
+ #{Stream := _} ->
+ {true, maps:remove(Stream, Leaders)};
+ _ ->
+ {false, Leaders}
+ end,
+ case SubscriptionsCleaned orelse PublishersCleaned orelse LeadersCleaned of
+ true ->
+ C3 = demonitor_stream(Stream, C2),
+ {cleaned, C3#stream_connection{stream_leaders = Leaders1}, S2};
+ false ->
+ {not_cleaned, C2#stream_connection{stream_leaders = Leaders1}, S2}
end.
lookup_leader(Stream, #stream_connection{stream_leaders = StreamLeaders, virtual_host = VirtualHost} = Connection) ->
@@ -1114,6 +1260,16 @@ lookup_leader(Stream, #stream_connection{stream_leaders = StreamLeaders, virtual
lookup_leader_from_manager(VirtualHost, Stream) ->
rabbit_stream_manager:lookup_leader(VirtualHost, Stream).
+maybe_clean_connection_from_stream(Stream, #stream_connection{stream_leaders = Leaders} = Connection0) ->
+ Connection1 =
+ case {stream_has_publishers(Stream, Connection0), stream_has_subscriptions(Stream, Connection0)} of
+ {false, false} ->
+ demonitor_stream(Stream, Connection0);
+ _ ->
+ Connection0
+ end,
+ Connection1#stream_connection{stream_leaders = maps:remove(Stream, Leaders)}.
+
maybe_monitor_stream(Pid, Stream, #stream_connection{monitors = Monitors} = Connection) ->
case lists:member(Stream, maps:values(Monitors)) of
true ->
@@ -1127,6 +1283,7 @@ demonitor_stream(Stream, #stream_connection{monitors = Monitors0} = Connection)
Monitors = maps:fold(fun(MonitorRef, Strm, Acc) ->
case Strm of
Stream ->
+ demonitor(MonitorRef, [flush]),
Acc;
_ ->
maps:put(MonitorRef, Strm, Acc)
@@ -1135,6 +1292,19 @@ demonitor_stream(Stream, #stream_connection{monitors = Monitors0} = Connection)
end, #{}, Monitors0),
Connection#stream_connection{monitors = Monitors}.
+stream_has_subscriptions(Stream, #stream_connection{stream_subscriptions = Subscriptions}) ->
+ case Subscriptions of
+ #{Stream := StreamSubscriptions} when length(StreamSubscriptions) > 0 ->
+ true;
+ _ ->
+ false
+ end.
+
+stream_has_publishers(Stream, #stream_connection{publishers = Publishers}) ->
+ lists:any(fun(#publisher{stream = S}) ->
+ case S of Stream -> true; _ -> false end
+ end, maps:values(Publishers)).
+
demonitor_all_streams(#stream_connection{monitors = Monitors} = Connection) ->
lists:foreach(fun(MonitorRef) ->
demonitor(MonitorRef, [flush])
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
index c20aacb12c..1f3f2aded5 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
@@ -17,7 +17,7 @@
-module(rabbit_stream_utils).
%% API
--export([enforce_correct_stream_name/1, write_messages/3, parse_map/2,
+-export([enforce_correct_stream_name/1, write_messages/4, parse_map/2,
auth_mechanisms/1, auth_mechanism_to_module/2,
check_configure_permitted/3, check_write_permitted/3, check_read_permitted/3,
extract_stream_list/2]).
@@ -41,17 +41,26 @@ check_name(<<"">>) ->
check_name(_Name) ->
ok.
-write_messages(_ClusterLeader, _PublisherId, <<>>) ->
+write_messages(_ClusterLeader, undefined, _PublisherId, <<>>) ->
ok;
-write_messages(ClusterLeader, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) ->
+write_messages(ClusterLeader, undefined, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) ->
% FIXME handle write error
- ok = osiris:write(ClusterLeader, {PublisherId, PublishingId}, Message),
- write_messages(ClusterLeader, PublisherId, Rest);
-write_messages(ClusterLeader, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) ->
+ ok = osiris:write(ClusterLeader, undefined, {PublisherId, PublishingId}, Message),
+ write_messages(ClusterLeader, undefined, PublisherId, Rest);
+write_messages(ClusterLeader, undefined, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) ->
% FIXME handle write error
- ok = osiris:write(ClusterLeader, {PublisherId, PublishingId}, {batch, MessageCount, CompressionType, Batch}),
- write_messages(ClusterLeader, PublisherId, Rest).
-
+ ok = osiris:write(ClusterLeader, undefined, {PublisherId, PublishingId}, {batch, MessageCount, CompressionType, Batch}),
+ write_messages(ClusterLeader, undefined, PublisherId, Rest);
+write_messages(_ClusterLeader, _PublisherRef, _PublisherId, <<>>) ->
+ ok;
+write_messages(ClusterLeader, PublisherRef, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) ->
+ % FIXME handle write error
+ ok = osiris:write(ClusterLeader, PublisherRef, PublishingId, Message),
+ write_messages(ClusterLeader, PublisherRef, PublisherId, Rest);
+write_messages(ClusterLeader, PublisherRef, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) ->
+ % FIXME handle write error
+ ok = osiris:write(ClusterLeader, PublisherRef, PublishingId, {batch, MessageCount, CompressionType, Batch}),
+ write_messages(ClusterLeader, PublisherRef, PublisherId, Rest).
parse_map(<<>>, _Count) ->
{#{}, <<>>};
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
index 4197b1de71..003a879718 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
@@ -114,8 +114,10 @@ test_server(Port) ->
test_authenticate(S),
Stream = <<"stream1">>,
test_create_stream(S, Stream),
+ PublisherId = 42,
+ test_declare_publisher(S, PublisherId, Stream),
Body = <<"hello">>,
- test_publish_confirm(S, Stream, Body),
+ test_publish_confirm(S, PublisherId, Body),
SubscriptionId = 42,
Rest = test_subscribe(S, SubscriptionId, Stream),
test_deliver(S, Rest, SubscriptionId, Body),
@@ -200,13 +202,24 @@ test_delete_stream(S, Stream) ->
ResponseFrameSize = 10,
{ok, <<ResponseFrameSize:32, ?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 4 + 10, 5000).
-test_publish_confirm(S, Stream, Body) ->
- BodySize = byte_size(Body),
+test_declare_publisher(S, PublisherId, Stream) ->
StreamSize = byte_size(Stream),
- PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary, 42:8, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>,
+ DeclarePublisherFrame = <<?COMMAND_DECLARE_PUBLISHER:16, ?VERSION_0:16, 1:32, PublisherId:8,
+ 0:16, %% empty publisher reference
+ StreamSize:16, Stream:StreamSize/binary>>,
+ FrameSize = byte_size(DeclarePublisherFrame),
+ gen_tcp:send(S, <<FrameSize:32, DeclarePublisherFrame/binary>>),
+ Res = gen_tcp:recv(S, 0, 5000),
+ {ok, <<_Size:32, ?COMMAND_DECLARE_PUBLISHER:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, Rest/binary>>} = Res,
+ Rest.
+
+test_publish_confirm(S, PublisherId, Body) ->
+ BodySize = byte_size(Body),
+ PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16,
+ PublisherId:8, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>,
FrameSize = byte_size(PublishFrame),
gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>),
- {ok, <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, 42:8, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000).
+ {ok, <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, PublisherId:8, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000).
test_subscribe(S, SubscriptionId, Stream) ->
StreamSize = byte_size(Stream),
@@ -221,9 +234,9 @@ test_subscribe(S, SubscriptionId, Stream) ->
test_deliver(S, Rest, SubscriptionId, Body) ->
BodySize = byte_size(Body),
Frame = read_frame(S, Rest),
- <<54:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:8, 5:4/unsigned, 0:4/unsigned, 0:8,
+ <<58:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:8, 5:4/unsigned, 0:4/unsigned, 0:8,
1:16, 1:32,
- _Timestamp:64, _Epoch:64, 0:64, _Crc:32, _DataLength:32,
+ _Timestamp:64, _Epoch:64, 0:64, _Crc:32, _DataLength:32, _TrailerLength:32,
0:1, BodySize:31/unsigned, Body/binary>> = Frame.
test_metadata_update_stream_deleted(S, Stream) ->
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
index c7a390f00d..fcb7b26ad1 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
@@ -79,8 +79,8 @@ public class FailureTest {
(publisherId, publishingId) -> confirmLatch.get().countDown()));
String message = "all nodes available";
messages.add(message);
+ publisher.declarePublisher((byte) 1, null, stream);
publisher.publish(
- stream,
(byte) 1,
Collections.singletonList(
publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build()));
@@ -109,8 +109,9 @@ public class FailureTest {
confirmLatch.set(new CountDownLatch(1));
message = "2 nodes available";
messages.add(message);
+
+ publisher.declarePublisher((byte) 1, null, stream);
publisher.publish(
- stream,
(byte) 1,
Collections.singletonList(
publisher
@@ -135,7 +136,6 @@ public class FailureTest {
message = "all nodes are back";
messages.add(message);
publisher.publish(
- stream,
(byte) 1,
Collections.singletonList(
publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build()));
@@ -233,6 +233,7 @@ public class FailureTest {
generation.incrementAndGet();
published.clear();
+ newPublisher.declarePublisher((byte) 1, null, stream);
publisher.set(newPublisher);
connected.set(true);
@@ -249,6 +250,7 @@ public class FailureTest {
.shutdownListener(shutdownListener)
.publishConfirmListener(publishConfirmListener));
+ client.declarePublisher((byte) 1, null, stream);
publisher.set(client);
AtomicBoolean keepPublishing = new AtomicBoolean(true);
@@ -270,10 +272,7 @@ public class FailureTest {
.build();
try {
long publishingId =
- publisher
- .get()
- .publish(stream, (byte) 1, Collections.singletonList(message))
- .get(0);
+ publisher.get().publish((byte) 1, Collections.singletonList(message)).get(0);
published.put(publishingId, message);
} catch (Exception e) {
// keep going
@@ -389,6 +388,7 @@ public class FailureTest {
.port(streamMetadata.getLeader().getPort())
.publishConfirmListener(publishConfirmListener));
+ publisher.declarePublisher((byte) 1, null, stream);
AtomicLong generation = new AtomicLong(0);
AtomicLong sequence = new AtomicLong(0);
AtomicBoolean keepPublishing = new AtomicBoolean(true);
@@ -408,7 +408,7 @@ public class FailureTest {
.build();
try {
long publishingId =
- publisher.publish(stream, (byte) 1, Collections.singletonList(message)).get(0);
+ publisher.publish((byte) 1, Collections.singletonList(message)).get(0);
published.put(publishingId, message);
} catch (Exception e) {
// keep going
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java
index 08024a12bf..1af7513d28 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java
@@ -92,11 +92,11 @@ public class StreamTest {
.publishConfirmListener(
(publisherId, publishingId) -> publishingLatch.countDown()));
+ publisher.declarePublisher((byte) 1, null, stream);
IntStream.range(0, messageCount)
.forEach(
i ->
publisher.publish(
- stream,
(byte) 1,
Collections.singletonList(
publisher