diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-02-25 17:23:22 +0100 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-02-25 17:23:22 +0100 |
commit | 6fd081b719da08b5270809a7c33c359b1360f50c (patch) | |
tree | 22dd7227e1cd125cdc099592ab66774c4c9307bc | |
parent | a2f98f25e99a33824510a4bbf7809a47764d44bb (diff) | |
parent | d70b55aa0ce51cfae0664cc8d22ae207d3b99cab (diff) | |
download | rabbitmq-server-git-6fd081b719da08b5270809a7c33c359b1360f50c.tar.gz |
Merge branch 'super-streams-publishing'
-rw-r--r-- | deps/rabbitmq_stream/docs/PROTOCOL.adoc | 43 | ||||
-rw-r--r-- | deps/rabbitmq_stream/include/rabbit_stream.hrl | 2 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_manager.erl | 60 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 92 |
4 files changed, 190 insertions, 7 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index 2844c8618b..4ed907ed33 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -195,6 +195,16 @@ Some responses may carry additional information than just the response code, thi |Client & Server |22 |No + +|<<route>> (experimental) +|Client +|23 +|Yes + +|<<partitions>> (experimental) +|Client +|24 +|Yes |=== === DeclarePublisher @@ -549,6 +559,39 @@ Heartbeat => Key Version Version => int16 ``` +=== Route (experimental) + +``` +RouteQuery => Key Version CorrelationId RoutingKey SuperStream + Key => int16 // 23 + Version => int16 + CorrelationId => int32 + RoutingKey => string + SuperStream => string + +RouteResponse => Key Version CorrelationId Stream + Key => int16 // 14 + Version => int16 + CorrelationId => int32 + Stream => string +``` + +=== Partitions (experimental) + +``` +PartitionsQuery => Key Version CorrelationId SuperStream + Key => int16 // 24 + Version => int16 + CorrelationId => int32 + SuperStream => string + +PartitionsResponse => Key Version CorrelationId [Stream] + Key => int16 // 23 + Version => int16 + CorrelationId => int32 + Stream => string +``` + == Authentication Once a client is connected to the server, it initiates an authentication diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl index 4ac5c3ce98..6be20bb95f 100644 --- a/deps/rabbitmq_stream/include/rabbit_stream.hrl +++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl @@ -21,6 +21,8 @@ -define(COMMAND_OPEN, 20). -define(COMMAND_CLOSE, 21). -define(COMMAND_HEARTBEAT, 22). +-define(COMMAND_ROUTE, 23). +-define(COMMAND_PARTITIONS, 24). -define(VERSION_0, 0). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index fae3280390..3e7d97abb8 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -30,7 +30,9 @@ delete/3, lookup_leader/2, lookup_local_member/2, - topology/2]). + topology/2, + route/3, + partitions/2]). -record(state, {configuration}). @@ -72,6 +74,17 @@ lookup_local_member(VirtualHost, Stream) -> topology(VirtualHost, Stream) -> gen_server:call(?MODULE, {topology, VirtualHost, Stream}). +-spec route(binary(), binary(), binary()) -> + {ok, binary()} | {error, stream_not_found}. +route(RoutingKey, VirtualHost, SuperStream) -> + gen_server:call(?MODULE, + {route, RoutingKey, VirtualHost, SuperStream}). + +-spec partitions(binary(), binary()) -> + {ok, [binary()] | {error, stream_not_found}}. +partitions(VirtualHost, SuperStream) -> + gen_server:call(?MODULE, {partitions, VirtualHost, SuperStream}). + stream_queue_arguments(Arguments) -> stream_queue_arguments([{<<"x-queue-type">>, longstr, <<"stream">>}], Arguments). @@ -330,6 +343,51 @@ handle_call({topology, VirtualHost, Stream}, _From, State) -> end end, {reply, Res, State}; +handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From, + State) -> + ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream), + Res = try + Exchange = rabbit_exchange:lookup_or_die(ExchangeName), + Delivery = + #delivery{message = + #basic_message{routing_keys = [RoutingKey]}}, + case rabbit_exchange:route(Exchange, Delivery) of + [] -> + {ok, no_route}; + [#resource{name = Stream}] -> + {ok, Stream}; + [#resource{name = Stream} | _] -> + {ok, Stream} + end + catch + exit:Error -> + rabbit_log:info("Error while looking up exchange ~p, ~p~n", + [ExchangeName, Error]), + {error, stream_not_found} + end, + {reply, Res, State}; +handle_call({partitions, VirtualHost, SuperStream}, _From, State) -> + ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream), + Res = try + rabbit_exchange:lookup_or_die(ExchangeName), + %% FIXME make sure queue is a stream + %% TODO bindings could be sorted by partition number, by using a binding argument + {ok, + lists:foldl(fun (#binding{destination = + #resource{kind = queue, name = Q}}, + Acc) -> + Acc ++ [Q]; + (_Binding, Acc) -> + Acc + end, + [], rabbit_binding:list_for_source(ExchangeName))} + catch + exit:Error -> + rabbit_log:info("Error while looking up exchange ~p, ~p~n", + [ExchangeName, Error]), + {error, stream_not_found} + end, + {reply, Res, State}; handle_call(which_children, _From, State) -> {reply, [], State}. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 10b07c7946..df5ccd0736 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -1483,10 +1483,15 @@ handle_frame_post_auth(Transport, OffsetAndCredit, {{timestamp, Timestamp}, Crdt} end, - rabbit_log:info("Creating subscription ~p to ~p, with offset specification ~p~n", [SubscriptionId, Stream, OffsetSpec]), + rabbit_log:info("Creating subscription ~p to ~p, with offset specificat" + "ion ~p~n", + [SubscriptionId, Stream, + OffsetSpec]), {ok, Segment} = osiris:init_reader(LocalMemberPid, OffsetSpec), - rabbit_log:info("Next offset for subscription ~p is ~p~n", [SubscriptionId, osiris_log:next_offset(Segment)]), + rabbit_log:info("Next offset for subscription ~p is ~p~n", + [SubscriptionId, + osiris_log:next_offset(Segment)]), ConsumerCounters = atomics:new(2, [{signed, false}]), ConsumerState = @@ -1508,7 +1513,9 @@ handle_frame_post_auth(Transport, ?COMMAND_SUBSCRIBE, CorrelationId), - rabbit_log:info("Distributing existing messages to subscription ~p~n", [SubscriptionId]), + rabbit_log:info("Distributing existing messages to subscription " + "~p~n", + [SubscriptionId]), {{segment, Segment1}, {credit, Credit1}} = send_chunks(Transport, ConsumerState, SendFileOct), @@ -1534,9 +1541,10 @@ handle_frame_post_auth(Transport, ConsumerOffset = osiris_log:next_offset(Segment1), - rabbit_log:info("Subscription ~p is now at offset ~p with ~p message(s) distributed after subscription~n",[ - SubscriptionId, ConsumerOffset, messages_consumed(ConsumerCounters1) - ]), + rabbit_log:info("Subscription ~p is now at offset ~p with ~p message(s) " + "distributed after subscription~n", + [SubscriptionId, ConsumerOffset, + messages_consumed(ConsumerCounters1)]), rabbit_stream_metrics:consumer_created(self(), stream_r(Stream, @@ -2019,6 +2027,78 @@ handle_frame_post_auth(Transport, Transport:send(S, <<FrameSize:32, Frame/binary>>), {Connection, State, Rest}; handle_frame_post_auth(Transport, + #stream_connection{socket = S, + virtual_host = VirtualHost} = + Connection, + State, + <<?COMMAND_ROUTE:16, + ?VERSION_0:16, + CorrelationId:32, + RoutingKeySize:16, + RoutingKey:RoutingKeySize/binary, + SuperStreamSize:16, + SuperStream:SuperStreamSize/binary>>, + Rest) -> + {ResponseCode, StreamBin} = + case rabbit_stream_manager:route(RoutingKey, VirtualHost, SuperStream) + of + {ok, no_route} -> + {?RESPONSE_CODE_OK, <<(-1):16>>}; + {ok, Stream} -> + StreamSize = byte_size(Stream), + {?RESPONSE_CODE_OK, + <<StreamSize:16, Stream:StreamSize/binary>>}; + {error, _} -> + {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, <<(-1):16>>} + end, + + Frame = + <<?COMMAND_ROUTE:16, + ?VERSION_0:16, + CorrelationId:32, + ResponseCode:16, + StreamBin/binary>>, + FrameSize = byte_size(Frame), + Transport:send(S, <<FrameSize:32, Frame/binary>>), + {Connection, State, Rest}; +handle_frame_post_auth(Transport, + #stream_connection{socket = S, + virtual_host = VirtualHost} = + Connection, + State, + <<?COMMAND_PARTITIONS:16, + ?VERSION_0:16, + CorrelationId:32, + SuperStreamSize:16, + SuperStream:SuperStreamSize/binary>>, + Rest) -> + {ResponseCode, PartitionsBin} = + case rabbit_stream_manager:partitions(VirtualHost, SuperStream) of + {ok, []} -> + {?RESPONSE_CODE_OK, <<0:32>>}; + {ok, Streams} -> + StreamCount = length(Streams), + Bin = lists:foldl(fun(Stream, Acc) -> + StreamSize = byte_size(Stream), + <<Acc/binary, StreamSize:16, + Stream:StreamSize/binary>> + end, + <<StreamCount:32>>, Streams), + {?RESPONSE_CODE_OK, Bin}; + {error, _} -> + {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, <<0:32>>} + end, + + Frame = + <<?COMMAND_PARTITIONS:16, + ?VERSION_0:16, + CorrelationId:32, + ResponseCode:16, + PartitionsBin/binary>>, + FrameSize = byte_size(Frame), + Transport:send(S, <<FrameSize:32, Frame/binary>>), + {Connection, State, Rest}; +handle_frame_post_auth(Transport, Connection, State, <<?COMMAND_CLOSE:16, |