diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-11-26 15:19:02 +0100 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-11-26 15:19:02 +0100 |
commit | 8cf5fb077548133b141e24e165e667b5351d6d41 (patch) | |
tree | 22e825edbc926a84c1c01fdc59a9c8cadfebe878 | |
parent | 59bc60fe366edf199d891f014a54be031e5050f7 (diff) | |
download | rabbitmq-server-git-8cf5fb077548133b141e24e165e667b5351d6d41.tar.gz |
Enforce publisher ID uniqueness
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 88 |
1 files changed, 52 insertions, 36 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 2a09eb1fd7..59e2284960 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -19,26 +19,31 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include("rabbit_stream.hrl"). +-type stream() :: binary(). +-type publisher_id() :: byte(). +-type publisher_reference() :: binary(). +-type subscription_id() :: byte(). + -record(publisher, { - publisher_id :: integer(), - stream :: binary(), - reference :: 'undefined' | binary() + publisher_id :: publisher_id(), + stream :: stream(), + reference :: 'undefined' | publisher_reference() }). -record(consumer, { socket :: rabbit_net:socket(), %% ranch_transport:socket(), member_pid :: pid(), offset :: osiris:offset(), - subscription_id :: integer(), + subscription_id :: subscription_id(), segment :: osiris_log:state(), credit :: integer(), - stream :: binary() + stream :: stream() }). -record(stream_connection_state, { data :: 'none' | binary(), blocked :: boolean(), - consumers :: #{integer() => #consumer{}} + consumers :: #{subscription_id() => #consumer{}} }). -record(stream_connection, { @@ -55,10 +60,10 @@ connected_at :: integer(), helper_sup :: pid(), socket :: rabbit_net:socket(), - publishers :: #{integer() => #publisher{}}, %% FIXME replace with a list (0-255 lookup faster?) - publisher_reference_to_ids :: #{binary() => integer()}, - stream_leaders :: #{binary() => pid()}, - stream_subscriptions :: #{binary() => [integer()]}, + publishers :: #{publisher_id() => #publisher{}}, %% FIXME replace with a list (0-255 lookup faster?) + publisher_to_ids :: #{{stream(), publisher_reference()} => publisher_id()}, + stream_leaders :: #{stream() => pid()}, + stream_subscriptions :: #{stream() => [subscription_id()]}, credits :: atomics:atomics_ref(), authentication_state :: atom(), user :: 'undefined' | #user{}, @@ -68,7 +73,7 @@ heartbeat :: integer(), heartbeater :: any(), client_properties = #{} :: #{binary() => binary()}, - monitors = #{} :: #{reference() => binary()}, + monitors = #{} :: #{reference() => stream()}, stats_timer :: reference(), send_file_oct :: atomics:atomics_ref() }). @@ -132,7 +137,7 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits, helper_sup = KeepaliveSup, socket = RealSocket, publishers = #{}, - publisher_reference_to_ids = #{}, + publisher_to_ids = #{}, stream_leaders = #{}, stream_subscriptions = #{}, credits = Credits, @@ -242,7 +247,7 @@ close(Transport, S) -> listen_loop_post_auth(Transport, #stream_connection{socket = S, stream_subscriptions = StreamSubscriptions, credits = Credits, heartbeater = Heartbeater, monitors = Monitors, client_properties = ClientProperties, - publisher_reference_to_ids = PublisherRefToIds, + publisher_to_ids = PublisherRefToIds, send_file_oct = SendFileOct} = Connection0, #stream_connection_state{consumers = Consumers, blocked = Blocked} = State, #configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking} = Configuration) -> @@ -339,9 +344,9 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, State end, listen_loop_post_auth(Transport, Connection, State1, Configuration); - {'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, PublisherReference, CorrelationList}}} -> + {'$gen_cast', {queue_event, _QueueResource, {osiris_written, #resource{name = Stream}, PublisherReference, CorrelationList}}} -> %% FIXME handle case when publisher ID is not found (e.g. deleted before confirms arrive) - PublisherId = maps:get(PublisherReference, PublisherRefToIds, undefined), + PublisherId = maps:get({Stream, PublisherReference}, PublisherRefToIds, undefined), PubIds = lists:foldl(fun(PublishingId, PublishingIds) -> [PublishingIds, <<PublishingId:64>>] end, <<>>, CorrelationList), @@ -719,8 +724,9 @@ notify_auth_result(Username, AuthResult, ExtraProps, Connection, ConnectionState ExtraProps, rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']). -handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, - user = User} = Connection0, State, +handle_frame_post_auth(Transport, #stream_connection{ + virtual_host = VirtualHost, user = User, + publishers = Publishers0, publisher_to_ids = RefIds0} = Connection0, State, <<?COMMAND_DECLARE_PUBLISHER:16, ?VERSION_0:16, CorrelationId:32, PublisherId:8, ReferenceSize:16, Reference:ReferenceSize/binary, @@ -731,36 +737,46 @@ handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, User, #{}) of ok -> - case lookup_leader(Stream, Connection0) of - cluster_not_found -> - response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), - {Connection0, State, Rest}; - {_, #stream_connection{publishers = Publishers, publisher_reference_to_ids = RefIds0} = Connection1} -> - {PublisherReference, RefIds1} = case Reference of - <<"">> -> {undefined, RefIds0}; - _ -> {Reference, RefIds0#{Reference => PublisherId}} - end, - Publisher = #publisher{publisher_id = PublisherId, - stream = Stream, - reference = PublisherReference}, - response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK), - {Connection1#stream_connection{publishers = Publishers#{PublisherId => Publisher}, - publisher_reference_to_ids = RefIds1}, State, Rest} + case {maps:is_key(PublisherId, Publishers0), maps:is_key({Stream, Reference}, RefIds0)} of + {false, false} -> + case lookup_leader(Stream, Connection0) of + cluster_not_found -> + response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), + {Connection0, State, Rest}; + {_, #stream_connection{publishers = Publishers0, publisher_to_ids = RefIds0} = Connection1} -> + {PublisherReference, RefIds1} = case Reference of + <<"">> -> {undefined, RefIds0}; + _ -> {Reference, RefIds0#{{Stream, Reference} => PublisherId}} + end, + Publisher = #publisher{publisher_id = PublisherId, + stream = Stream, + reference = PublisherReference}, + response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK), + {Connection1#stream_connection{publishers = Publishers0#{PublisherId => Publisher}, + publisher_to_ids = RefIds1}, State, Rest} + end; + {_, _} -> + response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED), + {Connection0, State, Rest} end; error -> response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED), {Connection0, State, Rest} end; -handle_frame_post_auth(Transport, #stream_connection{publishers = Publishers} = Connection0, State, +handle_frame_post_auth(Transport, #stream_connection{publishers = Publishers, + publisher_to_ids = PubToIds} = Connection0, State, <<?COMMAND_DELETE_PUBLISHER:16, ?VERSION_0:16, CorrelationId:32, PublisherId:8>>, Rest) -> case Publishers of - #{PublisherId := _Publisher} -> - Connection1 = Connection0#stream_connection{publishers = maps:remove(PublisherId, Publishers)}, + #{PublisherId := #publisher{stream = Stream, reference = Ref}} -> + Connection1 = Connection0#stream_connection{ + publishers = maps:remove(PublisherId, Publishers), + publisher_to_ids = maps:remove({Stream, Ref}, PubToIds)}, response(Transport, Connection1, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK), {Connection1, State, Rest}; _ -> - response(Transport, Connection0, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST) + response(Transport, Connection0, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST), + {Connection0, State, Rest} end; handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits, virtual_host = VirtualHost, user = User, publishers = Publishers} = Connection, State, |