diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-11-27 11:16:25 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-27 11:16:25 +0100 |
commit | 60ba89aabe86d23a629359d61d9d9fe8c383e69a (patch) | |
tree | c5bdcb44fe577a4f9e0dcb65754427138562f045 | |
parent | 30811a8244e826454d5f7b3e01ba34b8de6c1f91 (diff) | |
parent | 67308be13c2cac736c9b9184f7e00d01510e924f (diff) | |
download | rabbitmq-server-git-60ba89aabe86d23a629359d61d9d9fe8c383e69a.tar.gz |
Merge pull request #2657 from rabbitmq/rabbitmq-stream-writer-dedupe
Add publishing deduplication to streams
9 files changed, 351 insertions, 87 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index d130c7225b..8841e52ce3 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -286,7 +286,7 @@ deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId}, correlation = Correlation0, soft_limit = SftLmt, slow = Slow0} = State) -> - ok = osiris:write(LeaderPid, Seq, msg_to_iodata(Msg)), + ok = osiris:write(LeaderPid, undefined, Seq, msg_to_iodata(Msg)), Correlation = case MsgId of undefined -> Correlation0; @@ -308,7 +308,7 @@ dequeue(_, _, _, #stream_client{name = Name}) -> {protocol_error, not_implemented, "basic.get not supported by stream queues ~s", [rabbit_misc:rs(Name)]}. -handle_event({osiris_written, From, Corrs}, State = #stream_client{correlation = Correlation0, +handle_event({osiris_written, From, _WriterId, Corrs}, State = #stream_client{correlation = Correlation0, soft_limit = SftLmt, slow = Slow0, name = Name}) -> diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index 14e149664c..025a359d17 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -142,6 +142,21 @@ doest not contain a correlation ID. |17 |Yes +|<<declareproducer>> +|Client +|18 +|Yes + +|<<deleteproducer>> +|Client +|19 +|Yes + +|<<querypublishersequence>> +|Client +|20 +|Yes + |<<create>> |Client |998 @@ -159,7 +174,6 @@ doest not contain a correlation ID. Publish => Key Version Stream PublishedMessages Key => int16 // 0 Version => int16 - Stream => string // the name of the stream PublisherId => uint8 PublishedMessages => [PublishedMessage] PublishedMessage => PublishingId Message @@ -249,7 +263,7 @@ Unsubscribe => Key Version CorrelationId SubscriptionId PublishError => Key Version [PublishingError] Key => int16 // 6 Version => int16 - PublisherId => int8 + PublisherId => uint8 PublishingError => PublishingId Code PublishingId => int64 Code => int16 // code to identify the problem @@ -422,12 +436,64 @@ QueryOffsetRequest => Key Version CorrelationId Reference Stream Reference => string // max 256 characters Stream => string -QueryOffsetResponse => Key Version CorrelationId Reference Stream +QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset Key => int16 // 17 Version => int16 CorrelationId => int32 ResponseCode => int16 - Offset => int64 + Offset => uint64 +``` + +=== DeclarePublisher + +``` +DeclarePublisherRequest => Key Version CorrelationId PublisherId [PublisherReference] Stream + Key => int16 // 18 + Version => int16 + CorrelationId => int32 + PublisherId => uint8 + PublisherReference => string // max 256 characters + Stream => string + +DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId + Key => int16 // 18 + Version => int16 + CorrelationId => int32 + ResponseCode => int16 +``` + +=== DeletePublisher + +``` +DeletePublisherRequest => Key Version CorrelationId PublisherId + Key => int16 // 19 + Version => int16 + CorrelationId => int32 + PublisherId => uint8 + +DeletePublisherResponse => Key Version CorrelationId ResponseCode + Key => int16 // 19 + Version => int16 + CorrelationId => int32 + ResponseCode => int16 +``` + +=== QueryPublisherSequence + +``` +QueryPublisherRequest => Key Version CorrelationId PublisherReference Stream + Key => int16 // 20 + Version => int16 + CorrelationId => int32 + PublisherReference => string // max 256 characters + Stream => string + +QueryPublisherResponse => Key Version CorrelationId ResponseCode Sequence + Key => int16 // 20 + Version => int16 + CorrelationId => int32 + ResponseCode => int16 + Sequence => uint64 ``` === Create diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl index 0593893d93..6bb7e9504c 100644 --- a/deps/rabbitmq_stream/include/rabbit_stream.hrl +++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl @@ -16,6 +16,9 @@ -define(COMMAND_PEER_PROPERTIES, 15). -define(COMMAND_COMMIT_OFFSET, 16). -define(COMMAND_QUERY_OFFSET, 17). +-define(COMMAND_DECLARE_PUBLISHER, 18). +-define(COMMAND_DELETE_PUBLISHER, 19). +-define(COMMAND_QUERY_PUBLISHER_SEQUENCE, 20). -define(COMMAND_CREATE_STREAM, 998). -define(COMMAND_DELETE_STREAM, 999). @@ -38,6 +41,7 @@ -define(RESPONSE_CODE_INTERNAL_ERROR, 14). -define(RESPONSE_CODE_ACCESS_REFUSED, 15). -define(RESPONSE_CODE_PRECONDITION_FAILED, 16). +-define(RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST, 17). -define(OFFSET_TYPE_FIRST, 0). -define(OFFSET_TYPE_LAST, 1). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index e418dd1022..f329479d2e 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -161,6 +161,7 @@ handle_call({lookup_leader, VirtualHost, Stream}, _From, State) -> case is_stream_queue(Q) of true -> #{leader_pid := LeaderPid} = amqqueue:get_type_state(Q), + % FIXME check if pid is alive in case of stale information LeaderPid; _ -> cluster_not_found @@ -184,6 +185,7 @@ handle_call({lookup_local_member, VirtualHost, Stream}, _From, State) -> Acc end end, undefined, [LeaderPid] ++ ReplicaPids), + % FIXME check if pid is alive in case of stale information case LocalMember of undefined -> {error, not_available}; diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index d3b4820256..3aadd03f52 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -19,20 +19,32 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include("rabbit_stream.hrl"). +-type stream() :: binary(). +-type publisher_id() :: byte(). +-type publisher_reference() :: binary(). +-type subscription_id() :: byte(). + +-record(publisher, { + publisher_id :: publisher_id(), + stream :: stream(), + reference :: 'undefined' | publisher_reference(), + leader :: pid() +}). + -record(consumer, { socket :: rabbit_net:socket(), %% ranch_transport:socket(), member_pid :: pid(), offset :: osiris:offset(), - subscription_id :: integer(), + subscription_id :: subscription_id(), segment :: osiris_log:state(), credit :: integer(), - stream :: binary() + stream :: stream() }). -record(stream_connection_state, { data :: 'none' | binary(), blocked :: boolean(), - consumers :: #{integer() => #consumer{}} + consumers :: #{subscription_id() => #consumer{}} }). -record(stream_connection, { @@ -49,8 +61,10 @@ connected_at :: integer(), helper_sup :: pid(), socket :: rabbit_net:socket(), - stream_leaders :: #{binary() => pid()}, - stream_subscriptions :: #{binary() => [integer()]}, + publishers :: #{publisher_id() => #publisher{}}, %% FIXME replace with a list (0-255 lookup faster?) + publisher_to_ids :: #{{stream(), publisher_reference()} => publisher_id()}, + stream_leaders :: #{stream() => pid()}, + stream_subscriptions :: #{stream() => [subscription_id()]}, credits :: atomics:atomics_ref(), authentication_state :: atom(), user :: 'undefined' | #user{}, @@ -60,7 +74,7 @@ heartbeat :: integer(), heartbeater :: any(), client_properties = #{} :: #{binary() => binary()}, - monitors = #{} :: #{reference() => binary()}, + monitors = #{} :: #{reference() => stream()}, stats_timer :: reference(), send_file_oct :: atomics:atomics_ref() }). @@ -123,6 +137,8 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits, auth_mechanism = none, helper_sup = KeepaliveSup, socket = RealSocket, + publishers = #{}, + publisher_to_ids = #{}, stream_leaders = #{}, stream_subscriptions = #{}, credits = Credits, @@ -232,6 +248,7 @@ close(Transport, S) -> listen_loop_post_auth(Transport, #stream_connection{socket = S, stream_subscriptions = StreamSubscriptions, credits = Credits, heartbeater = Heartbeater, monitors = Monitors, client_properties = ClientProperties, + publisher_to_ids = PublisherRefToIds, send_file_oct = SendFileOct} = Connection0, #stream_connection_state{consumers = Consumers, blocked = Blocked} = State, #configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking} = Configuration) -> @@ -292,7 +309,7 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, {Connection, State} end, listen_loop_post_auth(Transport, Connection1, State1, Configuration); - {'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, CorrelationList}}} -> + {'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, undefined, CorrelationList}}} -> {FirstPublisherId, _FirstPublishingId} = lists:nth(1, CorrelationList), {LastPublisherId, LastPublishingIds, LastCount} = lists:foldl(fun({PublisherId, PublishingId}, {CurrentPublisherId, PublishingIds, Count}) -> case PublisherId of @@ -328,6 +345,32 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, State end, listen_loop_post_auth(Transport, Connection, State1, Configuration); + {'$gen_cast', {queue_event, _QueueResource, {osiris_written, #resource{name = Stream}, PublisherReference, CorrelationList}}} -> + %% FIXME handle case when publisher ID is not found (e.g. deleted before confirms arrive) + PublisherId = maps:get({Stream, PublisherReference}, PublisherRefToIds, undefined), + PubIds = lists:foldl(fun(PublishingId, PublishingIds) -> + [PublishingIds, <<PublishingId:64>>] + end, <<>>, CorrelationList), + PublishingIdCount = length(CorrelationList), + FrameSize = 2 + 2 + 1 + 4 + PublishingIdCount * 8, + Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16>>, + <<PublisherId:8>>, + <<PublishingIdCount:32>>, PubIds]), + add_credits(Credits, PublishingIdCount), + State1 = case Blocked of + true -> + case has_enough_credits_to_unblock(Credits, CreditsRequiredForUnblocking) of + true -> + Transport:setopts(S, [{active, once}]), + ok = rabbit_heartbeat:resume_monitor(Heartbeater), + State#stream_connection_state{blocked = false}; + false -> + State + end; + false -> + State + end, + listen_loop_post_auth(Transport, Connection, State1, Configuration); {'$gen_cast', {queue_event, #resource{name = StreamName}, {osiris_offset, _QueueResource, -1}}} -> rabbit_log:info("received osiris offset event for ~p with offset ~p~n", [StreamName, -1]), listen_loop_post_auth(Transport, Connection, State, Configuration); @@ -682,36 +725,92 @@ notify_auth_result(Username, AuthResult, ExtraProps, Connection, ConnectionState ExtraProps, rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']). -handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits, - virtual_host = VirtualHost, user = User} = Connection, State, - <<?COMMAND_PUBLISH:16, ?VERSION_0:16, - StreamSize:16, Stream:StreamSize/binary, - PublisherId:8/unsigned, - MessageCount:32, Messages/binary>>, Rest) -> +handle_frame_post_auth(Transport, #stream_connection{ + virtual_host = VirtualHost, user = User, + publishers = Publishers0, publisher_to_ids = RefIds0} = Connection0, State, + <<?COMMAND_DECLARE_PUBLISHER:16, ?VERSION_0:16, CorrelationId:32, + PublisherId:8, + ReferenceSize:16, Reference:ReferenceSize/binary, + StreamSize:16, Stream:StreamSize/binary>>, Rest) -> case rabbit_stream_utils:check_write_permitted( #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, User, #{}) of ok -> - case lookup_leader(Stream, Connection) of - cluster_not_found -> + case {maps:is_key(PublisherId, Publishers0), maps:is_key({Stream, Reference}, RefIds0)} of + {false, false} -> + case lookup_leader(Stream, Connection0) of + cluster_not_found -> + response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), + {Connection0, State, Rest}; + {ClusterLeader, #stream_connection{publishers = Publishers0, publisher_to_ids = RefIds0} = Connection1} -> + {PublisherReference, RefIds1} = case Reference of + <<"">> -> {undefined, RefIds0}; + _ -> {Reference, RefIds0#{{Stream, Reference} => PublisherId}} + end, + Publisher = #publisher{publisher_id = PublisherId, + stream = Stream, + reference = PublisherReference, + leader = ClusterLeader}, + response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK), + {Connection1#stream_connection{publishers = Publishers0#{PublisherId => Publisher}, + publisher_to_ids = RefIds1}, State, Rest} + end; + {_, _} -> + response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED), + {Connection0, State, Rest} + end; + error -> + response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED), + {Connection0, State, Rest} + end; +handle_frame_post_auth(Transport, #stream_connection{publishers = Publishers, + publisher_to_ids = PubToIds} = Connection0, State, + <<?COMMAND_DELETE_PUBLISHER:16, ?VERSION_0:16, CorrelationId:32, + PublisherId:8>>, Rest) -> + case Publishers of + #{PublisherId := #publisher{stream = Stream, reference = Ref}} -> + Connection1 = Connection0#stream_connection{ + publishers = maps:remove(PublisherId, Publishers), + publisher_to_ids = maps:remove({Stream, Ref}, PubToIds)}, + Connection2 = maybe_clean_connection_from_stream(Stream, Connection1), + response(Transport, Connection1, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK), + {Connection2, State, Rest}; + _ -> + response(Transport, Connection0, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST), + {Connection0, State, Rest} + end; +handle_frame_post_auth(Transport, #stream_connection{ + socket = S, credits = Credits, + virtual_host = VirtualHost, user = User, publishers = Publishers} = Connection, State, + <<?COMMAND_PUBLISH:16, ?VERSION_0:16, + PublisherId:8/unsigned, + MessageCount:32, Messages/binary>>, Rest) -> + case Publishers of + #{PublisherId := Publisher} -> + #publisher{stream = Stream, reference = Reference, leader = Leader} = Publisher, + case rabbit_stream_utils:check_write_permitted( + #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, + User, + #{}) of + ok -> + rabbit_stream_utils:write_messages(Leader, Reference, PublisherId, Messages), + sub_credits(Credits, MessageCount), + {Connection, State, Rest}; + error -> FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount, - Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, Messages), + Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_ACCESS_REFUSED, Messages), Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_ERROR:16, ?VERSION_0:16, PublisherId:8, MessageCount:32, Details/binary>>]), - {Connection, State, Rest}; - {ClusterLeader, Connection1} -> - rabbit_stream_utils:write_messages(ClusterLeader, PublisherId, Messages), - sub_credits(Credits, MessageCount), - {Connection1, State, Rest} + {Connection, State, Rest} end; - error -> + _ -> FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount, - Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_ACCESS_REFUSED, Messages), + Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST, Messages), Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_ERROR:16, ?VERSION_0:16, - PublisherId:8, - MessageCount:32, Details/binary>>]), + PublisherId:8, + MessageCount:32, Details/binary>>]), {Connection, State, Rest} end; handle_frame_post_auth(Transport, #stream_connection{socket = Socket, @@ -720,6 +819,7 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket, #stream_connection_state{consumers = Consumers} = State, <<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:8/unsigned, StreamSize:16, Stream:StreamSize/binary, OffsetType:16/signed, OffsetAndCredit/binary>>, Rest) -> + %% FIXME check the max number of subs is not reached already case rabbit_stream_utils:check_read_permitted( #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, User, @@ -788,8 +888,7 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket, response(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED), {Connection, State, Rest} end; -handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = StreamSubscriptions, - stream_leaders = StreamLeaders} = Connection, +handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = StreamSubscriptions} = Connection, #stream_connection_state{consumers = Consumers} = State, <<?COMMAND_UNSUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:8/unsigned>>, Rest) -> case subscription_exists(StreamSubscriptions, SubscriptionId) of @@ -801,28 +900,19 @@ handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = Stre Stream = Consumer#consumer.stream, #{Stream := SubscriptionsForThisStream} = StreamSubscriptions, SubscriptionsForThisStream1 = lists:delete(SubscriptionId, SubscriptionsForThisStream), - {Connection1, StreamSubscriptions1, StreamLeaders1} = + StreamSubscriptions1 = case length(SubscriptionsForThisStream1) of 0 -> - %% no more subscriptions for this stream - %% we unregister even though it could affect publishing if the stream is published to - %% from this connection and is deleted. - %% to mitigate this, we remove the stream from the leaders cache - %% this way the stream leader will be looked up in the next publish command - %% and registered to. - C = demonitor_stream(Stream, Connection), - {C, maps:remove(Stream, StreamSubscriptions), - maps:remove(Stream, StreamLeaders) - }; + % no more subscription for this stream + maps:remove(Stream, StreamSubscriptions); _ -> - {Connection, StreamSubscriptions#{Stream => SubscriptionsForThisStream1}, StreamLeaders} + StreamSubscriptions#{Stream => SubscriptionsForThisStream1} end, + Connection1 = Connection#stream_connection{stream_subscriptions = StreamSubscriptions1}, Consumers1 = maps:remove(SubscriptionId, Consumers), + Connection2 = maybe_clean_connection_from_stream(Stream, Connection1), response_ok(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId), - {Connection1#stream_connection{ - stream_subscriptions = StreamSubscriptions1, - stream_leaders = StreamLeaders1 - }, State#stream_connection_state{consumers = Consumers1}, Rest} + {Connection2, State#stream_connection_state{consumers = Consumers1}, Rest} end; handle_frame_post_auth(Transport, #stream_connection{socket = S, send_file_oct = SendFileOct} = Connection, #stream_connection_state{consumers = Consumers} = State, @@ -901,6 +991,34 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = Transport:send(S, [<<FrameSize:32, ?COMMAND_QUERY_OFFSET:16, ?VERSION_0:16>>, <<CorrelationId:32>>, <<ResponseCode:16>>, <<Offset:64>>]), {Connection, State, Rest}; +handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = VirtualHost, user = User} = Connection, + State, + <<?COMMAND_QUERY_PUBLISHER_SEQUENCE:16, ?VERSION_0:16, CorrelationId:32, + ReferenceSize:16, Reference:ReferenceSize/binary, + StreamSize:16, Stream:StreamSize/binary>>, Rest) -> + FrameSize = ?RESPONSE_FRAME_SIZE + 8, + {ResponseCode, Sequence} = case rabbit_stream_utils:check_read_permitted( + #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, + User, + #{}) of + ok -> + case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of + {error, not_found} -> + {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0}; + {ok, LocalMemberPid} -> + {?RESPONSE_CODE_OK, case osiris:fetch_writer_seq(LocalMemberPid, Reference) of + undefined -> + 0; + Offt -> + Offt + end} + end; + error -> + {?RESPONSE_CODE_ACCESS_REFUSED, 0} + end, + Transport:send(S, [<<FrameSize:32, ?COMMAND_QUERY_PUBLISHER_SEQUENCE:16, ?VERSION_0:16>>, + <<CorrelationId:32>>, <<ResponseCode:16>>, <<Sequence:64>>]), + {Connection, State, Rest}; handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, user = #user{username = Username} = User} = Connection, State, <<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary, @@ -1080,21 +1198,49 @@ handle_frame_post_close(_Transport, Connection, State, Frame, Rest) -> rabbit_log:warning("ignored frame on close ~p ~p.~n", [Frame, Rest]), {Connection, State, Rest}. -clean_state_after_stream_deletion_or_failure(Stream, #stream_connection{stream_leaders = StreamLeaders, stream_subscriptions = StreamSubscriptions} = Connection, - #stream_connection_state{consumers = Consumers} = State) -> - case {maps:is_key(Stream, StreamSubscriptions), maps:is_key(Stream, StreamLeaders)} of - {true, _} -> +clean_state_after_stream_deletion_or_failure(Stream, + #stream_connection{stream_subscriptions = StreamSubscriptions, + publishers = Publishers, + publisher_to_ids = PublisherToIds, + stream_leaders = Leaders} = C0, + #stream_connection_state{consumers = Consumers} = S0) -> + {SubscriptionsCleaned, C1, S1} = case stream_has_subscriptions(Stream, C0) of + true -> #{Stream := SubscriptionIds} = StreamSubscriptions, - {cleaned, Connection#stream_connection{ - stream_leaders = maps:remove(Stream, StreamLeaders), + {true, C0#stream_connection{ stream_subscriptions = maps:remove(Stream, StreamSubscriptions) - }, State#stream_connection_state{consumers = maps:without(SubscriptionIds, Consumers)}}; - {false, true} -> - {cleaned, Connection#stream_connection{ - stream_leaders = maps:remove(Stream, StreamLeaders) - }, State}; - {false, false} -> - {not_cleaned, Connection, State} + }, S0#stream_connection_state{consumers = maps:without(SubscriptionIds, Consumers)}}; + false -> + {false, C0, S0} + end, + {PublishersCleaned, C2, S2} = case stream_has_publishers(Stream, C1) of + true -> + {PurgedPubs, PurgedPubToIds} = maps:fold( + fun(PubId, #publisher{stream = S, reference = Ref}, {Pubs, PubToIds}) -> + case S of + Stream -> + {maps:remove(PubId, Pubs), maps:remove({Stream, Ref}, PubToIds)}; + _ -> + {Pubs, PubToIds} + end + end, {Publishers, PublisherToIds}, Publishers), + {true, C1#stream_connection{publishers = PurgedPubs, publisher_to_ids = PurgedPubToIds}, + S1}; + false -> + {false, C1, S1} + end, + {LeadersCleaned, Leaders1} = case Leaders of + #{Stream := _} -> + {true, maps:remove(Stream, Leaders)}; + _ -> + {false, Leaders} + end, + case SubscriptionsCleaned orelse PublishersCleaned orelse LeadersCleaned of + true -> + C3 = demonitor_stream(Stream, C2), + {cleaned, C3#stream_connection{stream_leaders = Leaders1}, S2}; + false -> + {not_cleaned, C2#stream_connection{stream_leaders = Leaders1}, S2} end. lookup_leader(Stream, #stream_connection{stream_leaders = StreamLeaders, virtual_host = VirtualHost} = Connection) -> @@ -1114,6 +1260,16 @@ lookup_leader(Stream, #stream_connection{stream_leaders = StreamLeaders, virtual lookup_leader_from_manager(VirtualHost, Stream) -> rabbit_stream_manager:lookup_leader(VirtualHost, Stream). +maybe_clean_connection_from_stream(Stream, #stream_connection{stream_leaders = Leaders} = Connection0) -> + Connection1 = + case {stream_has_publishers(Stream, Connection0), stream_has_subscriptions(Stream, Connection0)} of + {false, false} -> + demonitor_stream(Stream, Connection0); + _ -> + Connection0 + end, + Connection1#stream_connection{stream_leaders = maps:remove(Stream, Leaders)}. + maybe_monitor_stream(Pid, Stream, #stream_connection{monitors = Monitors} = Connection) -> case lists:member(Stream, maps:values(Monitors)) of true -> @@ -1127,6 +1283,7 @@ demonitor_stream(Stream, #stream_connection{monitors = Monitors0} = Connection) Monitors = maps:fold(fun(MonitorRef, Strm, Acc) -> case Strm of Stream -> + demonitor(MonitorRef, [flush]), Acc; _ -> maps:put(MonitorRef, Strm, Acc) @@ -1135,6 +1292,19 @@ demonitor_stream(Stream, #stream_connection{monitors = Monitors0} = Connection) end, #{}, Monitors0), Connection#stream_connection{monitors = Monitors}. +stream_has_subscriptions(Stream, #stream_connection{stream_subscriptions = Subscriptions}) -> + case Subscriptions of + #{Stream := StreamSubscriptions} when length(StreamSubscriptions) > 0 -> + true; + _ -> + false + end. + +stream_has_publishers(Stream, #stream_connection{publishers = Publishers}) -> + lists:any(fun(#publisher{stream = S}) -> + case S of Stream -> true; _ -> false end + end, maps:values(Publishers)). + demonitor_all_streams(#stream_connection{monitors = Monitors} = Connection) -> lists:foreach(fun(MonitorRef) -> demonitor(MonitorRef, [flush]) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index c20aacb12c..1f3f2aded5 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -17,7 +17,7 @@ -module(rabbit_stream_utils). %% API --export([enforce_correct_stream_name/1, write_messages/3, parse_map/2, +-export([enforce_correct_stream_name/1, write_messages/4, parse_map/2, auth_mechanisms/1, auth_mechanism_to_module/2, check_configure_permitted/3, check_write_permitted/3, check_read_permitted/3, extract_stream_list/2]). @@ -41,17 +41,26 @@ check_name(<<"">>) -> check_name(_Name) -> ok. -write_messages(_ClusterLeader, _PublisherId, <<>>) -> +write_messages(_ClusterLeader, undefined, _PublisherId, <<>>) -> ok; -write_messages(ClusterLeader, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) -> +write_messages(ClusterLeader, undefined, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) -> % FIXME handle write error - ok = osiris:write(ClusterLeader, {PublisherId, PublishingId}, Message), - write_messages(ClusterLeader, PublisherId, Rest); -write_messages(ClusterLeader, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) -> + ok = osiris:write(ClusterLeader, undefined, {PublisherId, PublishingId}, Message), + write_messages(ClusterLeader, undefined, PublisherId, Rest); +write_messages(ClusterLeader, undefined, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) -> % FIXME handle write error - ok = osiris:write(ClusterLeader, {PublisherId, PublishingId}, {batch, MessageCount, CompressionType, Batch}), - write_messages(ClusterLeader, PublisherId, Rest). - + ok = osiris:write(ClusterLeader, undefined, {PublisherId, PublishingId}, {batch, MessageCount, CompressionType, Batch}), + write_messages(ClusterLeader, undefined, PublisherId, Rest); +write_messages(_ClusterLeader, _PublisherRef, _PublisherId, <<>>) -> + ok; +write_messages(ClusterLeader, PublisherRef, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) -> + % FIXME handle write error + ok = osiris:write(ClusterLeader, PublisherRef, PublishingId, Message), + write_messages(ClusterLeader, PublisherRef, PublisherId, Rest); +write_messages(ClusterLeader, PublisherRef, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) -> + % FIXME handle write error + ok = osiris:write(ClusterLeader, PublisherRef, PublishingId, {batch, MessageCount, CompressionType, Batch}), + write_messages(ClusterLeader, PublisherRef, PublisherId, Rest). parse_map(<<>>, _Count) -> {#{}, <<>>}; diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 4197b1de71..003a879718 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -114,8 +114,10 @@ test_server(Port) -> test_authenticate(S), Stream = <<"stream1">>, test_create_stream(S, Stream), + PublisherId = 42, + test_declare_publisher(S, PublisherId, Stream), Body = <<"hello">>, - test_publish_confirm(S, Stream, Body), + test_publish_confirm(S, PublisherId, Body), SubscriptionId = 42, Rest = test_subscribe(S, SubscriptionId, Stream), test_deliver(S, Rest, SubscriptionId, Body), @@ -200,13 +202,24 @@ test_delete_stream(S, Stream) -> ResponseFrameSize = 10, {ok, <<ResponseFrameSize:32, ?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 4 + 10, 5000). -test_publish_confirm(S, Stream, Body) -> - BodySize = byte_size(Body), +test_declare_publisher(S, PublisherId, Stream) -> StreamSize = byte_size(Stream), - PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary, 42:8, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>, + DeclarePublisherFrame = <<?COMMAND_DECLARE_PUBLISHER:16, ?VERSION_0:16, 1:32, PublisherId:8, + 0:16, %% empty publisher reference + StreamSize:16, Stream:StreamSize/binary>>, + FrameSize = byte_size(DeclarePublisherFrame), + gen_tcp:send(S, <<FrameSize:32, DeclarePublisherFrame/binary>>), + Res = gen_tcp:recv(S, 0, 5000), + {ok, <<_Size:32, ?COMMAND_DECLARE_PUBLISHER:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, Rest/binary>>} = Res, + Rest. + +test_publish_confirm(S, PublisherId, Body) -> + BodySize = byte_size(Body), + PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, + PublisherId:8, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>, FrameSize = byte_size(PublishFrame), gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>), - {ok, <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, 42:8, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000). + {ok, <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, PublisherId:8, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000). test_subscribe(S, SubscriptionId, Stream) -> StreamSize = byte_size(Stream), @@ -221,9 +234,9 @@ test_subscribe(S, SubscriptionId, Stream) -> test_deliver(S, Rest, SubscriptionId, Body) -> BodySize = byte_size(Body), Frame = read_frame(S, Rest), - <<54:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:8, 5:4/unsigned, 0:4/unsigned, 0:8, + <<58:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:8, 5:4/unsigned, 0:4/unsigned, 0:8, 1:16, 1:32, - _Timestamp:64, _Epoch:64, 0:64, _Crc:32, _DataLength:32, + _Timestamp:64, _Epoch:64, 0:64, _Crc:32, _DataLength:32, _TrailerLength:32, 0:1, BodySize:31/unsigned, Body/binary>> = Frame. test_metadata_update_stream_deleted(S, Stream) -> diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java index c7a390f00d..fcb7b26ad1 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java @@ -79,8 +79,8 @@ public class FailureTest { (publisherId, publishingId) -> confirmLatch.get().countDown())); String message = "all nodes available"; messages.add(message); + publisher.declarePublisher((byte) 1, null, stream); publisher.publish( - stream, (byte) 1, Collections.singletonList( publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build())); @@ -109,8 +109,9 @@ public class FailureTest { confirmLatch.set(new CountDownLatch(1)); message = "2 nodes available"; messages.add(message); + + publisher.declarePublisher((byte) 1, null, stream); publisher.publish( - stream, (byte) 1, Collections.singletonList( publisher @@ -135,7 +136,6 @@ public class FailureTest { message = "all nodes are back"; messages.add(message); publisher.publish( - stream, (byte) 1, Collections.singletonList( publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build())); @@ -233,6 +233,7 @@ public class FailureTest { generation.incrementAndGet(); published.clear(); + newPublisher.declarePublisher((byte) 1, null, stream); publisher.set(newPublisher); connected.set(true); @@ -249,6 +250,7 @@ public class FailureTest { .shutdownListener(shutdownListener) .publishConfirmListener(publishConfirmListener)); + client.declarePublisher((byte) 1, null, stream); publisher.set(client); AtomicBoolean keepPublishing = new AtomicBoolean(true); @@ -270,10 +272,7 @@ public class FailureTest { .build(); try { long publishingId = - publisher - .get() - .publish(stream, (byte) 1, Collections.singletonList(message)) - .get(0); + publisher.get().publish((byte) 1, Collections.singletonList(message)).get(0); published.put(publishingId, message); } catch (Exception e) { // keep going @@ -389,6 +388,7 @@ public class FailureTest { .port(streamMetadata.getLeader().getPort()) .publishConfirmListener(publishConfirmListener)); + publisher.declarePublisher((byte) 1, null, stream); AtomicLong generation = new AtomicLong(0); AtomicLong sequence = new AtomicLong(0); AtomicBoolean keepPublishing = new AtomicBoolean(true); @@ -408,7 +408,7 @@ public class FailureTest { .build(); try { long publishingId = - publisher.publish(stream, (byte) 1, Collections.singletonList(message)).get(0); + publisher.publish((byte) 1, Collections.singletonList(message)).get(0); published.put(publishingId, message); } catch (Exception e) { // keep going diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java index 08024a12bf..1af7513d28 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java @@ -92,11 +92,11 @@ public class StreamTest { .publishConfirmListener( (publisherId, publishingId) -> publishingLatch.countDown())); + publisher.declarePublisher((byte) 1, null, stream); IntStream.range(0, messageCount) .forEach( i -> publisher.publish( - stream, (byte) 1, Collections.singletonList( publisher |