diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-11-24 17:48:41 +0100 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-11-24 17:48:41 +0100 |
commit | 8f97ea400a8276432b9b11b5f07217db42efc909 (patch) | |
tree | 09f422ed827c5233adc6994b15e52b995e490580 | |
parent | 580beb21bf016b47aba0c29d60249b23c26d2907 (diff) | |
download | rabbitmq-server-git-8f97ea400a8276432b9b11b5f07217db42efc909.tar.gz |
Start adding publishing dedup support for streams
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 4 | ||||
-rw-r--r-- | deps/rabbitmq_stream/docs/PROTOCOL.adoc | 10 | ||||
-rw-r--r-- | deps/rabbitmq_stream/include/rabbit_stream.hrl | 3 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 99 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_utils.erl | 23 |
5 files changed, 120 insertions, 19 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index d130c7225b..8841e52ce3 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -286,7 +286,7 @@ deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId}, correlation = Correlation0, soft_limit = SftLmt, slow = Slow0} = State) -> - ok = osiris:write(LeaderPid, Seq, msg_to_iodata(Msg)), + ok = osiris:write(LeaderPid, undefined, Seq, msg_to_iodata(Msg)), Correlation = case MsgId of undefined -> Correlation0; @@ -308,7 +308,7 @@ dequeue(_, _, _, #stream_client{name = Name}) -> {protocol_error, not_implemented, "basic.get not supported by stream queues ~s", [rabbit_misc:rs(Name)]}. -handle_event({osiris_written, From, Corrs}, State = #stream_client{correlation = Correlation0, +handle_event({osiris_written, From, _WriterId, Corrs}, State = #stream_client{correlation = Correlation0, soft_limit = SftLmt, slow = Slow0, name = Name}) -> diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index 6cb2019ec7..709763b57e 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -442,32 +442,32 @@ QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset === DeclarePublisher ``` -DeclarePublisherRequest => Key Version CorrelationId [Reference] Stream +DeclarePublisherRequest => Key Version CorrelationId PublisherId [Reference] Stream Key => int16 // 18 Version => int16 CorrelationId => int32 + PublisherId => uint8 Reference => string // max 256 characters Stream => string DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId - Key => int16 // 17 + Key => int16 // 18 Version => int16 CorrelationId => int32 ResponseCode => int16 - PublisherId => uint8 ``` === DeletePublisher ``` DeletePublisherRequest => Key Version CorrelationId PublisherId - Key => int16 // 18 + Key => int16 // 19 Version => int16 CorrelationId => int32 PublisherId => uint8 DeletePublisherResponse => Key Version CorrelationId ResponseCode - Key => int16 // 17 + Key => int16 // 19 Version => int16 CorrelationId => int32 ResponseCode => int16 diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl index 0593893d93..4ba630a0fe 100644 --- a/deps/rabbitmq_stream/include/rabbit_stream.hrl +++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl @@ -16,6 +16,8 @@ -define(COMMAND_PEER_PROPERTIES, 15). -define(COMMAND_COMMIT_OFFSET, 16). -define(COMMAND_QUERY_OFFSET, 17). +-define(COMMAND_DECLARE_PUBLISHER, 18). +-define(COMMAND_DELETE_PUBLISHER, 19). -define(COMMAND_CREATE_STREAM, 998). -define(COMMAND_DELETE_STREAM, 999). @@ -38,6 +40,7 @@ -define(RESPONSE_CODE_INTERNAL_ERROR, 14). -define(RESPONSE_CODE_ACCESS_REFUSED, 15). -define(RESPONSE_CODE_PRECONDITION_FAILED, 16). +-define(RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST, 17). -define(OFFSET_TYPE_FIRST, 0). -define(OFFSET_TYPE_LAST, 1). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index d3b4820256..338a03f92d 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -19,6 +19,12 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include("rabbit_stream.hrl"). +-record(publisher, { + publisher_id :: integer(), + stream :: binary(), + reference :: 'undefined' | binary() +}). + -record(consumer, { socket :: rabbit_net:socket(), %% ranch_transport:socket(), member_pid :: pid(), @@ -49,6 +55,8 @@ connected_at :: integer(), helper_sup :: pid(), socket :: rabbit_net:socket(), + publishers :: #{integer() => #publisher{}}, %% FIXME replace with a list (0-255 lookup faster?) + publisher_reference_to_ids :: #{binary() => integer()}, stream_leaders :: #{binary() => pid()}, stream_subscriptions :: #{binary() => [integer()]}, credits :: atomics:atomics_ref(), @@ -123,6 +131,8 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits, auth_mechanism = none, helper_sup = KeepaliveSup, socket = RealSocket, + publishers = #{}, + publisher_reference_to_ids = #{}, stream_leaders = #{}, stream_subscriptions = #{}, credits = Credits, @@ -232,6 +242,7 @@ close(Transport, S) -> listen_loop_post_auth(Transport, #stream_connection{socket = S, stream_subscriptions = StreamSubscriptions, credits = Credits, heartbeater = Heartbeater, monitors = Monitors, client_properties = ClientProperties, + publisher_reference_to_ids = PublisherRefToIds, send_file_oct = SendFileOct} = Connection0, #stream_connection_state{consumers = Consumers, blocked = Blocked} = State, #configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking} = Configuration) -> @@ -292,7 +303,7 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, {Connection, State} end, listen_loop_post_auth(Transport, Connection1, State1, Configuration); - {'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, CorrelationList}}} -> + {'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, undefined, CorrelationList}}} -> {FirstPublisherId, _FirstPublishingId} = lists:nth(1, CorrelationList), {LastPublisherId, LastPublishingIds, LastCount} = lists:foldl(fun({PublisherId, PublishingId}, {CurrentPublisherId, PublishingIds, Count}) -> case PublisherId of @@ -328,6 +339,32 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, State end, listen_loop_post_auth(Transport, Connection, State1, Configuration); + {'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, PublisherReference, CorrelationList}}} -> + %% FIXME handle case when publisher ID is not found (e.g. deleted before confirms arrive) + PublisherId = maps:get(PublisherReference, PublisherRefToIds, undefined), + PubIds = lists:foldl(fun(PublishingId, PublishingIds) -> + [PublishingIds, <<PublishingId:64>>] + end, <<>>, CorrelationList), + PublishingIdCount = length(CorrelationList), + FrameSize = 2 + 2 + 1 + 4 + PublishingIdCount * 8, + Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16>>, + <<PublisherId:8>>, + <<PublishingIdCount:32>>, PubIds]), + add_credits(Credits, PublishingIdCount), + State1 = case Blocked of + true -> + case has_enough_credits_to_unblock(Credits, CreditsRequiredForUnblocking) of + true -> + Transport:setopts(S, [{active, once}]), + ok = rabbit_heartbeat:resume_monitor(Heartbeater), + State#stream_connection_state{blocked = false}; + false -> + State + end; + false -> + State + end, + listen_loop_post_auth(Transport, Connection, State1, Configuration); {'$gen_cast', {queue_event, #resource{name = StreamName}, {osiris_offset, _QueueResource, -1}}} -> rabbit_log:info("received osiris offset event for ~p with offset ~p~n", [StreamName, -1]), listen_loop_post_auth(Transport, Connection, State, Configuration); @@ -682,18 +719,69 @@ notify_auth_result(Username, AuthResult, ExtraProps, Connection, ConnectionState ExtraProps, rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']). +handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, + user = User} = Connection0, State, + <<?COMMAND_DECLARE_PUBLISHER:16, ?VERSION_0:16, CorrelationId:32, + PublisherId:8, + ReferenceSize:16, Reference:ReferenceSize/binary, + StreamSize:16, Stream:StreamSize/binary>>, Rest) -> + %% FIXME check if the publisher ID and reference do not already exist + case rabbit_stream_utils:check_write_permitted( + #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, + User, + #{}) of + ok -> + case lookup_leader(Stream, Connection0) of + cluster_not_found -> + response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), + {Connection0, State, Rest}; + {_, #stream_connection{publishers = Publishers, publisher_reference_to_ids = RefIds0} = Connection1} -> + {PublisherReference, RefIds1} = case Reference of + <<"">> -> {undefined, RefIds0}; + _ -> {Reference, RefIds0#{Reference => PublisherId}} + end, + Publisher = #publisher{publisher_id = PublisherId, + stream = Stream, + reference = PublisherReference}, + response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK), + {Connection1#stream_connection{publishers = Publishers#{PublisherId => Publisher}, + publisher_reference_to_ids = RefIds1}, State, Rest} + end; + error -> + response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED), + {Connection0, State, Rest} + end; +handle_frame_post_auth(Transport, #stream_connection{publishers = Publishers} = Connection0, State, + <<?COMMAND_DELETE_PUBLISHER:16, ?VERSION_0:16, CorrelationId:32, + PublisherId:8>>, Rest) -> + case Publishers of + #{PublisherId := _Publisher} -> + Connection1 = Connection0#stream_connection{publishers = maps:remove(PublisherId, Publishers)}, + response(Transport, Connection1, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK), + {Connection1, State, Rest}; + _ -> + response(Transport, Connection0, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST) + end; handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits, - virtual_host = VirtualHost, user = User} = Connection, State, + virtual_host = VirtualHost, user = User, publishers = Publishers} = Connection, State, <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary, PublisherId:8/unsigned, MessageCount:32, Messages/binary>>, Rest) -> + %% FIXME fail messages if publisher is unknown + {StreamToPublishTo, PublisherRef} = case Publishers of + #{PublisherId := Publisher} -> + #publisher{stream = St, reference = R} = Publisher, + {St, R}; + _ -> + {Stream, undefined} + end, case rabbit_stream_utils:check_write_permitted( - #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, + #resource{name = StreamToPublishTo, kind = queue, virtual_host = VirtualHost}, User, #{}) of ok -> - case lookup_leader(Stream, Connection) of + case lookup_leader(StreamToPublishTo, Connection) of cluster_not_found -> FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount, Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, Messages), @@ -702,7 +790,7 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credi MessageCount:32, Details/binary>>]), {Connection, State, Rest}; {ClusterLeader, Connection1} -> - rabbit_stream_utils:write_messages(ClusterLeader, PublisherId, Messages), + rabbit_stream_utils:write_messages(ClusterLeader, PublisherRef, PublisherId, Messages), sub_credits(Credits, MessageCount), {Connection1, State, Rest} end; @@ -720,6 +808,7 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket, #stream_connection_state{consumers = Consumers} = State, <<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:8/unsigned, StreamSize:16, Stream:StreamSize/binary, OffsetType:16/signed, OffsetAndCredit/binary>>, Rest) -> + %% FIXME check the max number of subs is not reached already case rabbit_stream_utils:check_read_permitted( #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, User, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index b5d8ce7760..1f3f2aded5 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -17,7 +17,7 @@ -module(rabbit_stream_utils). %% API --export([enforce_correct_stream_name/1, write_messages/3, parse_map/2, +-export([enforce_correct_stream_name/1, write_messages/4, parse_map/2, auth_mechanisms/1, auth_mechanism_to_module/2, check_configure_permitted/3, check_write_permitted/3, check_read_permitted/3, extract_stream_list/2]). @@ -41,17 +41,26 @@ check_name(<<"">>) -> check_name(_Name) -> ok. -write_messages(_ClusterLeader, _PublisherId, <<>>) -> +write_messages(_ClusterLeader, undefined, _PublisherId, <<>>) -> ok; -write_messages(ClusterLeader, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) -> +write_messages(ClusterLeader, undefined, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) -> % FIXME handle write error ok = osiris:write(ClusterLeader, undefined, {PublisherId, PublishingId}, Message), - write_messages(ClusterLeader, PublisherId, Rest); -write_messages(ClusterLeader, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) -> + write_messages(ClusterLeader, undefined, PublisherId, Rest); +write_messages(ClusterLeader, undefined, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) -> % FIXME handle write error ok = osiris:write(ClusterLeader, undefined, {PublisherId, PublishingId}, {batch, MessageCount, CompressionType, Batch}), - write_messages(ClusterLeader, PublisherId, Rest). - + write_messages(ClusterLeader, undefined, PublisherId, Rest); +write_messages(_ClusterLeader, _PublisherRef, _PublisherId, <<>>) -> + ok; +write_messages(ClusterLeader, PublisherRef, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) -> + % FIXME handle write error + ok = osiris:write(ClusterLeader, PublisherRef, PublishingId, Message), + write_messages(ClusterLeader, PublisherRef, PublisherId, Rest); +write_messages(ClusterLeader, PublisherRef, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) -> + % FIXME handle write error + ok = osiris:write(ClusterLeader, PublisherRef, PublishingId, {batch, MessageCount, CompressionType, Batch}), + write_messages(ClusterLeader, PublisherRef, PublisherId, Rest). parse_map(<<>>, _Count) -> {#{}, <<>>}; |