summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-11-24 17:48:41 +0100
committerMichael Klishin <michael@clojurewerkz.org>2020-11-30 13:29:17 +0300
commite79fd45414df02c6eef2faee6cd44ca7c9212c20 (patch)
tree9dfd432942874ee0916b1c933aa2a92855473ba7
parent87eea4acad1231ef3ce434672a8ce59a9dfba474 (diff)
downloadrabbitmq-server-git-e79fd45414df02c6eef2faee6cd44ca7c9212c20.tar.gz
Start adding publishing dedup support for streams
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl4
-rw-r--r--deps/rabbitmq_stream/docs/PROTOCOL.adoc10
-rw-r--r--deps/rabbitmq_stream/include/rabbit_stream.hrl3
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl99
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_utils.erl23
5 files changed, 120 insertions, 19 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index d130c7225b..8841e52ce3 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -286,7 +286,7 @@ deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId},
correlation = Correlation0,
soft_limit = SftLmt,
slow = Slow0} = State) ->
- ok = osiris:write(LeaderPid, Seq, msg_to_iodata(Msg)),
+ ok = osiris:write(LeaderPid, undefined, Seq, msg_to_iodata(Msg)),
Correlation = case MsgId of
undefined ->
Correlation0;
@@ -308,7 +308,7 @@ dequeue(_, _, _, #stream_client{name = Name}) ->
{protocol_error, not_implemented, "basic.get not supported by stream queues ~s",
[rabbit_misc:rs(Name)]}.
-handle_event({osiris_written, From, Corrs}, State = #stream_client{correlation = Correlation0,
+handle_event({osiris_written, From, _WriterId, Corrs}, State = #stream_client{correlation = Correlation0,
soft_limit = SftLmt,
slow = Slow0,
name = Name}) ->
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
index 6cb2019ec7..709763b57e 100644
--- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc
+++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
@@ -442,32 +442,32 @@ QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset
=== DeclarePublisher
```
-DeclarePublisherRequest => Key Version CorrelationId [Reference] Stream
+DeclarePublisherRequest => Key Version CorrelationId PublisherId [Reference] Stream
Key => int16 // 18
Version => int16
CorrelationId => int32
+ PublisherId => uint8
Reference => string // max 256 characters
Stream => string
DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId
- Key => int16 // 17
+ Key => int16 // 18
Version => int16
CorrelationId => int32
ResponseCode => int16
- PublisherId => uint8
```
=== DeletePublisher
```
DeletePublisherRequest => Key Version CorrelationId PublisherId
- Key => int16 // 18
+ Key => int16 // 19
Version => int16
CorrelationId => int32
PublisherId => uint8
DeletePublisherResponse => Key Version CorrelationId ResponseCode
- Key => int16 // 17
+ Key => int16 // 19
Version => int16
CorrelationId => int32
ResponseCode => int16
diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl
index 0593893d93..4ba630a0fe 100644
--- a/deps/rabbitmq_stream/include/rabbit_stream.hrl
+++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl
@@ -16,6 +16,8 @@
-define(COMMAND_PEER_PROPERTIES, 15).
-define(COMMAND_COMMIT_OFFSET, 16).
-define(COMMAND_QUERY_OFFSET, 17).
+-define(COMMAND_DECLARE_PUBLISHER, 18).
+-define(COMMAND_DELETE_PUBLISHER, 19).
-define(COMMAND_CREATE_STREAM, 998).
-define(COMMAND_DELETE_STREAM, 999).
@@ -38,6 +40,7 @@
-define(RESPONSE_CODE_INTERNAL_ERROR, 14).
-define(RESPONSE_CODE_ACCESS_REFUSED, 15).
-define(RESPONSE_CODE_PRECONDITION_FAILED, 16).
+-define(RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST, 17).
-define(OFFSET_TYPE_FIRST, 0).
-define(OFFSET_TYPE_LAST, 1).
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index d3b4820256..338a03f92d 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -19,6 +19,12 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-include("rabbit_stream.hrl").
+-record(publisher, {
+ publisher_id :: integer(),
+ stream :: binary(),
+ reference :: 'undefined' | binary()
+}).
+
-record(consumer, {
socket :: rabbit_net:socket(), %% ranch_transport:socket(),
member_pid :: pid(),
@@ -49,6 +55,8 @@
connected_at :: integer(),
helper_sup :: pid(),
socket :: rabbit_net:socket(),
+ publishers :: #{integer() => #publisher{}}, %% FIXME replace with a list (0-255 lookup faster?)
+ publisher_reference_to_ids :: #{binary() => integer()},
stream_leaders :: #{binary() => pid()},
stream_subscriptions :: #{binary() => [integer()]},
credits :: atomics:atomics_ref(),
@@ -123,6 +131,8 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits,
auth_mechanism = none,
helper_sup = KeepaliveSup,
socket = RealSocket,
+ publishers = #{},
+ publisher_reference_to_ids = #{},
stream_leaders = #{},
stream_subscriptions = #{},
credits = Credits,
@@ -232,6 +242,7 @@ close(Transport, S) ->
listen_loop_post_auth(Transport, #stream_connection{socket = S,
stream_subscriptions = StreamSubscriptions, credits = Credits,
heartbeater = Heartbeater, monitors = Monitors, client_properties = ClientProperties,
+ publisher_reference_to_ids = PublisherRefToIds,
send_file_oct = SendFileOct} = Connection0,
#stream_connection_state{consumers = Consumers, blocked = Blocked} = State,
#configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking} = Configuration) ->
@@ -292,7 +303,7 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S,
{Connection, State}
end,
listen_loop_post_auth(Transport, Connection1, State1, Configuration);
- {'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, CorrelationList}}} ->
+ {'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, undefined, CorrelationList}}} ->
{FirstPublisherId, _FirstPublishingId} = lists:nth(1, CorrelationList),
{LastPublisherId, LastPublishingIds, LastCount} = lists:foldl(fun({PublisherId, PublishingId}, {CurrentPublisherId, PublishingIds, Count}) ->
case PublisherId of
@@ -328,6 +339,32 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S,
State
end,
listen_loop_post_auth(Transport, Connection, State1, Configuration);
+ {'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, PublisherReference, CorrelationList}}} ->
+ %% FIXME handle case when publisher ID is not found (e.g. deleted before confirms arrive)
+ PublisherId = maps:get(PublisherReference, PublisherRefToIds, undefined),
+ PubIds = lists:foldl(fun(PublishingId, PublishingIds) ->
+ [PublishingIds, <<PublishingId:64>>]
+ end, <<>>, CorrelationList),
+ PublishingIdCount = length(CorrelationList),
+ FrameSize = 2 + 2 + 1 + 4 + PublishingIdCount * 8,
+ Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16>>,
+ <<PublisherId:8>>,
+ <<PublishingIdCount:32>>, PubIds]),
+ add_credits(Credits, PublishingIdCount),
+ State1 = case Blocked of
+ true ->
+ case has_enough_credits_to_unblock(Credits, CreditsRequiredForUnblocking) of
+ true ->
+ Transport:setopts(S, [{active, once}]),
+ ok = rabbit_heartbeat:resume_monitor(Heartbeater),
+ State#stream_connection_state{blocked = false};
+ false ->
+ State
+ end;
+ false ->
+ State
+ end,
+ listen_loop_post_auth(Transport, Connection, State1, Configuration);
{'$gen_cast', {queue_event, #resource{name = StreamName}, {osiris_offset, _QueueResource, -1}}} ->
rabbit_log:info("received osiris offset event for ~p with offset ~p~n", [StreamName, -1]),
listen_loop_post_auth(Transport, Connection, State, Configuration);
@@ -682,18 +719,69 @@ notify_auth_result(Username, AuthResult, ExtraProps, Connection, ConnectionState
ExtraProps,
rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
+handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost,
+ user = User} = Connection0, State,
+ <<?COMMAND_DECLARE_PUBLISHER:16, ?VERSION_0:16, CorrelationId:32,
+ PublisherId:8,
+ ReferenceSize:16, Reference:ReferenceSize/binary,
+ StreamSize:16, Stream:StreamSize/binary>>, Rest) ->
+ %% FIXME check if the publisher ID and reference do not already exist
+ case rabbit_stream_utils:check_write_permitted(
+ #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
+ User,
+ #{}) of
+ ok ->
+ case lookup_leader(Stream, Connection0) of
+ cluster_not_found ->
+ response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST),
+ {Connection0, State, Rest};
+ {_, #stream_connection{publishers = Publishers, publisher_reference_to_ids = RefIds0} = Connection1} ->
+ {PublisherReference, RefIds1} = case Reference of
+ <<"">> -> {undefined, RefIds0};
+ _ -> {Reference, RefIds0#{Reference => PublisherId}}
+ end,
+ Publisher = #publisher{publisher_id = PublisherId,
+ stream = Stream,
+ reference = PublisherReference},
+ response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK),
+ {Connection1#stream_connection{publishers = Publishers#{PublisherId => Publisher},
+ publisher_reference_to_ids = RefIds1}, State, Rest}
+ end;
+ error ->
+ response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED),
+ {Connection0, State, Rest}
+ end;
+handle_frame_post_auth(Transport, #stream_connection{publishers = Publishers} = Connection0, State,
+ <<?COMMAND_DELETE_PUBLISHER:16, ?VERSION_0:16, CorrelationId:32,
+ PublisherId:8>>, Rest) ->
+ case Publishers of
+ #{PublisherId := _Publisher} ->
+ Connection1 = Connection0#stream_connection{publishers = maps:remove(PublisherId, Publishers)},
+ response(Transport, Connection1, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK),
+ {Connection1, State, Rest};
+ _ ->
+ response(Transport, Connection0, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST)
+ end;
handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits,
- virtual_host = VirtualHost, user = User} = Connection, State,
+ virtual_host = VirtualHost, user = User, publishers = Publishers} = Connection, State,
<<?COMMAND_PUBLISH:16, ?VERSION_0:16,
StreamSize:16, Stream:StreamSize/binary,
PublisherId:8/unsigned,
MessageCount:32, Messages/binary>>, Rest) ->
+ %% FIXME fail messages if publisher is unknown
+ {StreamToPublishTo, PublisherRef} = case Publishers of
+ #{PublisherId := Publisher} ->
+ #publisher{stream = St, reference = R} = Publisher,
+ {St, R};
+ _ ->
+ {Stream, undefined}
+ end,
case rabbit_stream_utils:check_write_permitted(
- #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
+ #resource{name = StreamToPublishTo, kind = queue, virtual_host = VirtualHost},
User,
#{}) of
ok ->
- case lookup_leader(Stream, Connection) of
+ case lookup_leader(StreamToPublishTo, Connection) of
cluster_not_found ->
FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount,
Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, Messages),
@@ -702,7 +790,7 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credi
MessageCount:32, Details/binary>>]),
{Connection, State, Rest};
{ClusterLeader, Connection1} ->
- rabbit_stream_utils:write_messages(ClusterLeader, PublisherId, Messages),
+ rabbit_stream_utils:write_messages(ClusterLeader, PublisherRef, PublisherId, Messages),
sub_credits(Credits, MessageCount),
{Connection1, State, Rest}
end;
@@ -720,6 +808,7 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket,
#stream_connection_state{consumers = Consumers} = State,
<<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:8/unsigned, StreamSize:16, Stream:StreamSize/binary,
OffsetType:16/signed, OffsetAndCredit/binary>>, Rest) ->
+ %% FIXME check the max number of subs is not reached already
case rabbit_stream_utils:check_read_permitted(
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
User,
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
index b5d8ce7760..1f3f2aded5 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
@@ -17,7 +17,7 @@
-module(rabbit_stream_utils).
%% API
--export([enforce_correct_stream_name/1, write_messages/3, parse_map/2,
+-export([enforce_correct_stream_name/1, write_messages/4, parse_map/2,
auth_mechanisms/1, auth_mechanism_to_module/2,
check_configure_permitted/3, check_write_permitted/3, check_read_permitted/3,
extract_stream_list/2]).
@@ -41,17 +41,26 @@ check_name(<<"">>) ->
check_name(_Name) ->
ok.
-write_messages(_ClusterLeader, _PublisherId, <<>>) ->
+write_messages(_ClusterLeader, undefined, _PublisherId, <<>>) ->
ok;
-write_messages(ClusterLeader, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) ->
+write_messages(ClusterLeader, undefined, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) ->
% FIXME handle write error
ok = osiris:write(ClusterLeader, undefined, {PublisherId, PublishingId}, Message),
- write_messages(ClusterLeader, PublisherId, Rest);
-write_messages(ClusterLeader, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) ->
+ write_messages(ClusterLeader, undefined, PublisherId, Rest);
+write_messages(ClusterLeader, undefined, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) ->
% FIXME handle write error
ok = osiris:write(ClusterLeader, undefined, {PublisherId, PublishingId}, {batch, MessageCount, CompressionType, Batch}),
- write_messages(ClusterLeader, PublisherId, Rest).
-
+ write_messages(ClusterLeader, undefined, PublisherId, Rest);
+write_messages(_ClusterLeader, _PublisherRef, _PublisherId, <<>>) ->
+ ok;
+write_messages(ClusterLeader, PublisherRef, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) ->
+ % FIXME handle write error
+ ok = osiris:write(ClusterLeader, PublisherRef, PublishingId, Message),
+ write_messages(ClusterLeader, PublisherRef, PublisherId, Rest);
+write_messages(ClusterLeader, PublisherRef, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) ->
+ % FIXME handle write error
+ ok = osiris:write(ClusterLeader, PublisherRef, PublishingId, {batch, MessageCount, CompressionType, Batch}),
+ write_messages(ClusterLeader, PublisherRef, PublisherId, Rest).
parse_map(<<>>, _Count) ->
{#{}, <<>>};