summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-02-25 17:23:22 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-02-25 17:23:22 +0100
commit6fd081b719da08b5270809a7c33c359b1360f50c (patch)
tree22dd7227e1cd125cdc099592ab66774c4c9307bc
parenta2f98f25e99a33824510a4bbf7809a47764d44bb (diff)
parentd70b55aa0ce51cfae0664cc8d22ae207d3b99cab (diff)
downloadrabbitmq-server-git-6fd081b719da08b5270809a7c33c359b1360f50c.tar.gz
Merge branch 'super-streams-publishing'
-rw-r--r--deps/rabbitmq_stream/docs/PROTOCOL.adoc43
-rw-r--r--deps/rabbitmq_stream/include/rabbit_stream.hrl2
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl60
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl92
4 files changed, 190 insertions, 7 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
index 2844c8618b..4ed907ed33 100644
--- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc
+++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
@@ -195,6 +195,16 @@ Some responses may carry additional information than just the response code, thi
|Client & Server
|22
|No
+
+|<<route>> (experimental)
+|Client
+|23
+|Yes
+
+|<<partitions>> (experimental)
+|Client
+|24
+|Yes
|===
=== DeclarePublisher
@@ -549,6 +559,39 @@ Heartbeat => Key Version
Version => int16
```
+=== Route (experimental)
+
+```
+RouteQuery => Key Version CorrelationId RoutingKey SuperStream
+ Key => int16 // 23
+ Version => int16
+ CorrelationId => int32
+ RoutingKey => string
+ SuperStream => string
+
+RouteResponse => Key Version CorrelationId Stream
+ Key => int16 // 14
+ Version => int16
+ CorrelationId => int32
+ Stream => string
+```
+
+=== Partitions (experimental)
+
+```
+PartitionsQuery => Key Version CorrelationId SuperStream
+ Key => int16 // 24
+ Version => int16
+ CorrelationId => int32
+ SuperStream => string
+
+PartitionsResponse => Key Version CorrelationId [Stream]
+ Key => int16 // 23
+ Version => int16
+ CorrelationId => int32
+ Stream => string
+```
+
== Authentication
Once a client is connected to the server, it initiates an authentication
diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl
index 4ac5c3ce98..6be20bb95f 100644
--- a/deps/rabbitmq_stream/include/rabbit_stream.hrl
+++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl
@@ -21,6 +21,8 @@
-define(COMMAND_OPEN, 20).
-define(COMMAND_CLOSE, 21).
-define(COMMAND_HEARTBEAT, 22).
+-define(COMMAND_ROUTE, 23).
+-define(COMMAND_PARTITIONS, 24).
-define(VERSION_0, 0).
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index fae3280390..3e7d97abb8 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -30,7 +30,9 @@
delete/3,
lookup_leader/2,
lookup_local_member/2,
- topology/2]).
+ topology/2,
+ route/3,
+ partitions/2]).
-record(state, {configuration}).
@@ -72,6 +74,17 @@ lookup_local_member(VirtualHost, Stream) ->
topology(VirtualHost, Stream) ->
gen_server:call(?MODULE, {topology, VirtualHost, Stream}).
+-spec route(binary(), binary(), binary()) ->
+ {ok, binary()} | {error, stream_not_found}.
+route(RoutingKey, VirtualHost, SuperStream) ->
+ gen_server:call(?MODULE,
+ {route, RoutingKey, VirtualHost, SuperStream}).
+
+-spec partitions(binary(), binary()) ->
+ {ok, [binary()] | {error, stream_not_found}}.
+partitions(VirtualHost, SuperStream) ->
+ gen_server:call(?MODULE, {partitions, VirtualHost, SuperStream}).
+
stream_queue_arguments(Arguments) ->
stream_queue_arguments([{<<"x-queue-type">>, longstr, <<"stream">>}],
Arguments).
@@ -330,6 +343,51 @@ handle_call({topology, VirtualHost, Stream}, _From, State) ->
end
end,
{reply, Res, State};
+handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
+ State) ->
+ ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream),
+ Res = try
+ Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
+ Delivery =
+ #delivery{message =
+ #basic_message{routing_keys = [RoutingKey]}},
+ case rabbit_exchange:route(Exchange, Delivery) of
+ [] ->
+ {ok, no_route};
+ [#resource{name = Stream}] ->
+ {ok, Stream};
+ [#resource{name = Stream} | _] ->
+ {ok, Stream}
+ end
+ catch
+ exit:Error ->
+ rabbit_log:info("Error while looking up exchange ~p, ~p~n",
+ [ExchangeName, Error]),
+ {error, stream_not_found}
+ end,
+ {reply, Res, State};
+handle_call({partitions, VirtualHost, SuperStream}, _From, State) ->
+ ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream),
+ Res = try
+ rabbit_exchange:lookup_or_die(ExchangeName),
+ %% FIXME make sure queue is a stream
+ %% TODO bindings could be sorted by partition number, by using a binding argument
+ {ok,
+ lists:foldl(fun (#binding{destination =
+ #resource{kind = queue, name = Q}},
+ Acc) ->
+ Acc ++ [Q];
+ (_Binding, Acc) ->
+ Acc
+ end,
+ [], rabbit_binding:list_for_source(ExchangeName))}
+ catch
+ exit:Error ->
+ rabbit_log:info("Error while looking up exchange ~p, ~p~n",
+ [ExchangeName, Error]),
+ {error, stream_not_found}
+ end,
+ {reply, Res, State};
handle_call(which_children, _From, State) ->
{reply, [], State}.
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 10b07c7946..df5ccd0736 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -1483,10 +1483,15 @@ handle_frame_post_auth(Transport,
OffsetAndCredit,
{{timestamp, Timestamp}, Crdt}
end,
- rabbit_log:info("Creating subscription ~p to ~p, with offset specification ~p~n", [SubscriptionId, Stream, OffsetSpec]),
+ rabbit_log:info("Creating subscription ~p to ~p, with offset specificat"
+ "ion ~p~n",
+ [SubscriptionId, Stream,
+ OffsetSpec]),
{ok, Segment} =
osiris:init_reader(LocalMemberPid, OffsetSpec),
- rabbit_log:info("Next offset for subscription ~p is ~p~n", [SubscriptionId, osiris_log:next_offset(Segment)]),
+ rabbit_log:info("Next offset for subscription ~p is ~p~n",
+ [SubscriptionId,
+ osiris_log:next_offset(Segment)]),
ConsumerCounters =
atomics:new(2, [{signed, false}]),
ConsumerState =
@@ -1508,7 +1513,9 @@ handle_frame_post_auth(Transport,
?COMMAND_SUBSCRIBE,
CorrelationId),
- rabbit_log:info("Distributing existing messages to subscription ~p~n", [SubscriptionId]),
+ rabbit_log:info("Distributing existing messages to subscription "
+ "~p~n",
+ [SubscriptionId]),
{{segment, Segment1}, {credit, Credit1}} =
send_chunks(Transport, ConsumerState,
SendFileOct),
@@ -1534,9 +1541,10 @@ handle_frame_post_auth(Transport,
ConsumerOffset = osiris_log:next_offset(Segment1),
- rabbit_log:info("Subscription ~p is now at offset ~p with ~p message(s) distributed after subscription~n",[
- SubscriptionId, ConsumerOffset, messages_consumed(ConsumerCounters1)
- ]),
+ rabbit_log:info("Subscription ~p is now at offset ~p with ~p message(s) "
+ "distributed after subscription~n",
+ [SubscriptionId, ConsumerOffset,
+ messages_consumed(ConsumerCounters1)]),
rabbit_stream_metrics:consumer_created(self(),
stream_r(Stream,
@@ -2019,6 +2027,78 @@ handle_frame_post_auth(Transport,
Transport:send(S, <<FrameSize:32, Frame/binary>>),
{Connection, State, Rest};
handle_frame_post_auth(Transport,
+ #stream_connection{socket = S,
+ virtual_host = VirtualHost} =
+ Connection,
+ State,
+ <<?COMMAND_ROUTE:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ RoutingKeySize:16,
+ RoutingKey:RoutingKeySize/binary,
+ SuperStreamSize:16,
+ SuperStream:SuperStreamSize/binary>>,
+ Rest) ->
+ {ResponseCode, StreamBin} =
+ case rabbit_stream_manager:route(RoutingKey, VirtualHost, SuperStream)
+ of
+ {ok, no_route} ->
+ {?RESPONSE_CODE_OK, <<(-1):16>>};
+ {ok, Stream} ->
+ StreamSize = byte_size(Stream),
+ {?RESPONSE_CODE_OK,
+ <<StreamSize:16, Stream:StreamSize/binary>>};
+ {error, _} ->
+ {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, <<(-1):16>>}
+ end,
+
+ Frame =
+ <<?COMMAND_ROUTE:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ ResponseCode:16,
+ StreamBin/binary>>,
+ FrameSize = byte_size(Frame),
+ Transport:send(S, <<FrameSize:32, Frame/binary>>),
+ {Connection, State, Rest};
+handle_frame_post_auth(Transport,
+ #stream_connection{socket = S,
+ virtual_host = VirtualHost} =
+ Connection,
+ State,
+ <<?COMMAND_PARTITIONS:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ SuperStreamSize:16,
+ SuperStream:SuperStreamSize/binary>>,
+ Rest) ->
+ {ResponseCode, PartitionsBin} =
+ case rabbit_stream_manager:partitions(VirtualHost, SuperStream) of
+ {ok, []} ->
+ {?RESPONSE_CODE_OK, <<0:32>>};
+ {ok, Streams} ->
+ StreamCount = length(Streams),
+ Bin = lists:foldl(fun(Stream, Acc) ->
+ StreamSize = byte_size(Stream),
+ <<Acc/binary, StreamSize:16,
+ Stream:StreamSize/binary>>
+ end,
+ <<StreamCount:32>>, Streams),
+ {?RESPONSE_CODE_OK, Bin};
+ {error, _} ->
+ {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, <<0:32>>}
+ end,
+
+ Frame =
+ <<?COMMAND_PARTITIONS:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ ResponseCode:16,
+ PartitionsBin/binary>>,
+ FrameSize = byte_size(Frame),
+ Transport:send(S, <<FrameSize:32, Frame/binary>>),
+ {Connection, State, Rest};
+handle_frame_post_auth(Transport,
Connection,
State,
<<?COMMAND_CLOSE:16,