summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-11-26 15:19:02 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-11-26 15:19:02 +0100
commit8cf5fb077548133b141e24e165e667b5351d6d41 (patch)
tree22e825edbc926a84c1c01fdc59a9c8cadfebe878
parent59bc60fe366edf199d891f014a54be031e5050f7 (diff)
downloadrabbitmq-server-git-8cf5fb077548133b141e24e165e667b5351d6d41.tar.gz
Enforce publisher ID uniqueness
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl88
1 files changed, 52 insertions, 36 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 2a09eb1fd7..59e2284960 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -19,26 +19,31 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-include("rabbit_stream.hrl").
+-type stream() :: binary().
+-type publisher_id() :: byte().
+-type publisher_reference() :: binary().
+-type subscription_id() :: byte().
+
-record(publisher, {
- publisher_id :: integer(),
- stream :: binary(),
- reference :: 'undefined' | binary()
+ publisher_id :: publisher_id(),
+ stream :: stream(),
+ reference :: 'undefined' | publisher_reference()
}).
-record(consumer, {
socket :: rabbit_net:socket(), %% ranch_transport:socket(),
member_pid :: pid(),
offset :: osiris:offset(),
- subscription_id :: integer(),
+ subscription_id :: subscription_id(),
segment :: osiris_log:state(),
credit :: integer(),
- stream :: binary()
+ stream :: stream()
}).
-record(stream_connection_state, {
data :: 'none' | binary(),
blocked :: boolean(),
- consumers :: #{integer() => #consumer{}}
+ consumers :: #{subscription_id() => #consumer{}}
}).
-record(stream_connection, {
@@ -55,10 +60,10 @@
connected_at :: integer(),
helper_sup :: pid(),
socket :: rabbit_net:socket(),
- publishers :: #{integer() => #publisher{}}, %% FIXME replace with a list (0-255 lookup faster?)
- publisher_reference_to_ids :: #{binary() => integer()},
- stream_leaders :: #{binary() => pid()},
- stream_subscriptions :: #{binary() => [integer()]},
+ publishers :: #{publisher_id() => #publisher{}}, %% FIXME replace with a list (0-255 lookup faster?)
+ publisher_to_ids :: #{{stream(), publisher_reference()} => publisher_id()},
+ stream_leaders :: #{stream() => pid()},
+ stream_subscriptions :: #{stream() => [subscription_id()]},
credits :: atomics:atomics_ref(),
authentication_state :: atom(),
user :: 'undefined' | #user{},
@@ -68,7 +73,7 @@
heartbeat :: integer(),
heartbeater :: any(),
client_properties = #{} :: #{binary() => binary()},
- monitors = #{} :: #{reference() => binary()},
+ monitors = #{} :: #{reference() => stream()},
stats_timer :: reference(),
send_file_oct :: atomics:atomics_ref()
}).
@@ -132,7 +137,7 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits,
helper_sup = KeepaliveSup,
socket = RealSocket,
publishers = #{},
- publisher_reference_to_ids = #{},
+ publisher_to_ids = #{},
stream_leaders = #{},
stream_subscriptions = #{},
credits = Credits,
@@ -242,7 +247,7 @@ close(Transport, S) ->
listen_loop_post_auth(Transport, #stream_connection{socket = S,
stream_subscriptions = StreamSubscriptions, credits = Credits,
heartbeater = Heartbeater, monitors = Monitors, client_properties = ClientProperties,
- publisher_reference_to_ids = PublisherRefToIds,
+ publisher_to_ids = PublisherRefToIds,
send_file_oct = SendFileOct} = Connection0,
#stream_connection_state{consumers = Consumers, blocked = Blocked} = State,
#configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking} = Configuration) ->
@@ -339,9 +344,9 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S,
State
end,
listen_loop_post_auth(Transport, Connection, State1, Configuration);
- {'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, PublisherReference, CorrelationList}}} ->
+ {'$gen_cast', {queue_event, _QueueResource, {osiris_written, #resource{name = Stream}, PublisherReference, CorrelationList}}} ->
%% FIXME handle case when publisher ID is not found (e.g. deleted before confirms arrive)
- PublisherId = maps:get(PublisherReference, PublisherRefToIds, undefined),
+ PublisherId = maps:get({Stream, PublisherReference}, PublisherRefToIds, undefined),
PubIds = lists:foldl(fun(PublishingId, PublishingIds) ->
[PublishingIds, <<PublishingId:64>>]
end, <<>>, CorrelationList),
@@ -719,8 +724,9 @@ notify_auth_result(Username, AuthResult, ExtraProps, Connection, ConnectionState
ExtraProps,
rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
-handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost,
- user = User} = Connection0, State,
+handle_frame_post_auth(Transport, #stream_connection{
+ virtual_host = VirtualHost, user = User,
+ publishers = Publishers0, publisher_to_ids = RefIds0} = Connection0, State,
<<?COMMAND_DECLARE_PUBLISHER:16, ?VERSION_0:16, CorrelationId:32,
PublisherId:8,
ReferenceSize:16, Reference:ReferenceSize/binary,
@@ -731,36 +737,46 @@ handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost,
User,
#{}) of
ok ->
- case lookup_leader(Stream, Connection0) of
- cluster_not_found ->
- response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST),
- {Connection0, State, Rest};
- {_, #stream_connection{publishers = Publishers, publisher_reference_to_ids = RefIds0} = Connection1} ->
- {PublisherReference, RefIds1} = case Reference of
- <<"">> -> {undefined, RefIds0};
- _ -> {Reference, RefIds0#{Reference => PublisherId}}
- end,
- Publisher = #publisher{publisher_id = PublisherId,
- stream = Stream,
- reference = PublisherReference},
- response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK),
- {Connection1#stream_connection{publishers = Publishers#{PublisherId => Publisher},
- publisher_reference_to_ids = RefIds1}, State, Rest}
+ case {maps:is_key(PublisherId, Publishers0), maps:is_key({Stream, Reference}, RefIds0)} of
+ {false, false} ->
+ case lookup_leader(Stream, Connection0) of
+ cluster_not_found ->
+ response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST),
+ {Connection0, State, Rest};
+ {_, #stream_connection{publishers = Publishers0, publisher_to_ids = RefIds0} = Connection1} ->
+ {PublisherReference, RefIds1} = case Reference of
+ <<"">> -> {undefined, RefIds0};
+ _ -> {Reference, RefIds0#{{Stream, Reference} => PublisherId}}
+ end,
+ Publisher = #publisher{publisher_id = PublisherId,
+ stream = Stream,
+ reference = PublisherReference},
+ response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK),
+ {Connection1#stream_connection{publishers = Publishers0#{PublisherId => Publisher},
+ publisher_to_ids = RefIds1}, State, Rest}
+ end;
+ {_, _} ->
+ response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_PRECONDITION_FAILED),
+ {Connection0, State, Rest}
end;
error ->
response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED),
{Connection0, State, Rest}
end;
-handle_frame_post_auth(Transport, #stream_connection{publishers = Publishers} = Connection0, State,
+handle_frame_post_auth(Transport, #stream_connection{publishers = Publishers,
+ publisher_to_ids = PubToIds} = Connection0, State,
<<?COMMAND_DELETE_PUBLISHER:16, ?VERSION_0:16, CorrelationId:32,
PublisherId:8>>, Rest) ->
case Publishers of
- #{PublisherId := _Publisher} ->
- Connection1 = Connection0#stream_connection{publishers = maps:remove(PublisherId, Publishers)},
+ #{PublisherId := #publisher{stream = Stream, reference = Ref}} ->
+ Connection1 = Connection0#stream_connection{
+ publishers = maps:remove(PublisherId, Publishers),
+ publisher_to_ids = maps:remove({Stream, Ref}, PubToIds)},
response(Transport, Connection1, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK),
{Connection1, State, Rest};
_ ->
- response(Transport, Connection0, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST)
+ response(Transport, Connection0, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST),
+ {Connection0, State, Rest}
end;
handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits,
virtual_host = VirtualHost, user = User, publishers = Publishers} = Connection, State,