diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-09-13 17:53:25 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-09-13 17:53:25 +0200 |
commit | 8f207e3c5f729a364f8d31a85f8d6ea313922e6a (patch) | |
tree | 7622e5cd0fbb296f4398f270a6ca7594e6d4b00f | |
parent | 3d7afcdc18da5bba2d999e750e7ff867b84d1882 (diff) | |
download | rabbitmq-server-git-8f207e3c5f729a364f8d31a85f8d6ea313922e6a.tar.gz |
Make stream protocol route command return several streams
We expect to have 1 stream for each routing key, but
as binding can return several queues for a given key we
let that possibility open in the stream protocol.
-rw-r--r-- | deps/rabbitmq_stream/docs/PROTOCOL.adoc | 2 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream.erl | 6 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_manager.erl | 9 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 99 | ||||
-rw-r--r-- | deps/rabbitmq_stream/test/commands_SUITE.erl | 4 |
5 files changed, 62 insertions, 58 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index 80577961aa..bbe12d04ab 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -587,7 +587,7 @@ RouteQuery => Key Version CorrelationId RoutingKey SuperStream RoutingKey => string SuperStream => string -RouteResponse => Key Version CorrelationId Stream +RouteResponse => Key Version CorrelationId [Stream] Key => uint16 // 24 Version => uint16 CorrelationId => uint32 diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl index d3925214d6..9f22b1a11f 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream.erl @@ -118,10 +118,8 @@ kill_connection(ConnectionName) -> {ConnectionPid, #{<<"connection_name">> := ConnectionNameBin}} -> exit(ConnectionPid, kill); - {ConnectionPid, _ClientProperties} -> - ok - after 1000 -> - ok + {ConnectionPid, _ClientProperties} -> ok + after 1000 -> ok end end, pg_local:get_members(rabbit_stream_connections)). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index a258833f8e..0be72d714f 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -75,7 +75,7 @@ topology(VirtualHost, Stream) -> gen_server:call(?MODULE, {topology, VirtualHost, Stream}). -spec route(binary(), binary(), binary()) -> - {ok, binary() | no_route} | {error, stream_not_found}. + {ok, [binary()] | no_route} | {error, stream_not_found}. route(RoutingKey, VirtualHost, SuperStream) -> gen_server:call(?MODULE, {route, RoutingKey, VirtualHost, SuperStream}). @@ -368,10 +368,9 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From, case rabbit_exchange:route(Exchange, Delivery) of [] -> {ok, no_route}; - [#resource{name = Stream}] -> - {ok, Stream}; - [#resource{name = Stream} | _] -> - {ok, Stream} + Routes -> + %% FIXME filter non-stream resources + {ok, [Stream || #resource{name = Stream} <- Routes]} end catch exit:Error -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 412be10650..319d442e73 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -445,7 +445,8 @@ tuned(info, Msg, StateData) -> end). state_timeout(State, Transport, Socket) -> - rabbit_log_connection:warning("Closing connection because of timeout in state '~s' likely due to lack of client action.", + rabbit_log_connection:warning("Closing connection because of timeout in state " + "'~s' likely due to lack of client action.", [State]), close_immediately(Transport, Socket), stop. @@ -524,7 +525,8 @@ transition_to_opened(Transport, config = Configuration}}. invalid_transition(Transport, Socket, From, To) -> - rabbit_log_connection:warning("Closing socket ~w. Invalid transition from ~s to ~s.", + rabbit_log_connection:warning("Closing socket ~w. Invalid transition from ~s " + "to ~s.", [Socket, From, To]), close_immediately(Transport, Socket), stop. @@ -882,8 +884,7 @@ open(cast, Ids -> Acc#{PublisherId => [PublishingId | Ids]} end; - false -> - Acc + false -> Acc end end, #{}, CorrelationList), @@ -963,7 +964,8 @@ open(cast, {queue_event, #resource{name = StreamName}, {osiris_offset, _QueueResource, -1}}, _StatemData) -> - rabbit_log:debug("Stream protocol connection received osiris offset event for ~p with offset ~p", + rabbit_log:debug("Stream protocol connection received osiris offset " + "event for ~p with offset ~p", [StreamName, -1]), keep_state_and_data; open(cast, @@ -982,11 +984,14 @@ open(cast, {Connection1, State1} = case maps:get(StreamName, StreamSubscriptions, undefined) of undefined -> - rabbit_log:debug("Stream protocol connection: osiris offset event for ~p, but no subscription (leftover messages after unsubscribe?)", + rabbit_log:debug("Stream protocol connection: osiris offset event " + "for ~p, but no subscription (leftover messages " + "after unsubscribe?)", [StreamName]), {Connection, State}; [] -> - rabbit_log:debug("Stream protocol connection: osiris offset event for ~p, but no registered consumers!", + rabbit_log:debug("Stream protocol connection: osiris offset event " + "for ~p, but no registered consumers!", [StreamName]), {Connection#stream_connection{stream_subscriptions = maps:remove(StreamName, @@ -999,15 +1004,15 @@ open(cast, #consumer{credit = Credit} = Consumer, Consumer1 = case Credit of - 0 -> - Consumer; + 0 -> Consumer; _ -> case send_chunks(Transport, Consumer, SendFileOct) of {error, closed} -> - rabbit_log_connection:info("Stream protocol connection has been closed by peer", + rabbit_log_connection:info("Stream protocol connection has been closed by " + "peer", []), throw({stop, normal}); {error, Reason} -> @@ -1058,7 +1063,8 @@ close_sent(state_timeout, close, #statem_data{transport = Transport, connection = #stream_connection{socket = Socket}, connection_state = State}) -> - rabbit_log_connection:warning("Closing connection because of timeout in state '~s' likely due to lack of client action.", + rabbit_log_connection:warning("Closing connection because of timeout in state " + "'~s' likely due to lack of client action.", [?FUNCTION_NAME]), close(Transport, Socket, State), stop; @@ -1089,13 +1095,15 @@ close_sent(info, {tcp_closed, S}, _StatemData) -> stop; close_sent(info, {tcp_error, S, Reason}, #statem_data{transport = Transport, connection_state = State}) -> - rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] [~w]", + rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] " + "[~w]", [Reason, S, self()]), close(Transport, S, State), stop; close_sent(info, {resource_alarm, IsThereAlarm}, StatemData = #statem_data{connection = Connection}) -> - rabbit_log:warning("Stream protocol connection ignored a resource alarm ~p in state ~s", + rabbit_log:warning("Stream protocol connection ignored a resource " + "alarm ~p in state ~s", [IsThereAlarm, ?FUNCTION_NAME]), {keep_state, StatemData#statem_data{connection = @@ -1828,7 +1836,8 @@ handle_frame_post_auth(Transport, SendFileOct) of {error, closed} -> - rabbit_log_connection:info("Stream protocol connection has been closed by peer", + rabbit_log_connection:info("Stream protocol connection has been closed by " + "peer", []), throw({stop, normal}); {{segment, Segment1}, {credit, Credit1}} -> @@ -1909,7 +1918,8 @@ handle_frame_post_auth(Transport, SendFileOct) of {error, closed} -> - rabbit_log_connection:info("Stream protocol connection has been closed by peer", + rabbit_log_connection:info("Stream protocol connection has been closed by " + "peer", []), throw({stop, normal}); {{segment, Segment1}, {credit, Credit1}} -> @@ -2061,7 +2071,8 @@ handle_frame_post_auth(Transport, {ok, #{leader_node := LeaderPid, replica_nodes := ReturnedReplicas}} -> - rabbit_log:debug("Created stream cluster with leader on ~p and replicas on ~p", + rabbit_log:debug("Created stream cluster with leader on ~p and " + "replicas on ~p", [LeaderPid, ReturnedReplicas]), response_ok(Transport, Connection, @@ -2222,8 +2233,7 @@ handle_frame_post_auth(Transport, NodesAcc) end, Acc1, ReplicaNodes); - {error, _} -> - Acc + {error, _} -> Acc end end, #{}, Streams), @@ -2235,16 +2245,13 @@ handle_frame_post_auth(Transport, lists:foldr(fun(Node, Acc) -> PortFunction = case TransportLayer of - tcp -> - port; - ssl -> - tls_port + tcp -> port; + ssl -> tls_port end, Host = rpc:call(Node, rabbit_stream, host, []), Port = rpc:call(Node, rabbit_stream, PortFunction, []), case {is_binary(Host), is_integer(Port)} of - {true, true} -> - Acc#{Node => {Host, Port}}; + {true, true} -> Acc#{Node => {Host, Port}}; _ -> rabbit_log:warning("Error when retrieving broker metadata: ~p ~p", [Host, Port]), @@ -2256,25 +2263,21 @@ handle_frame_post_auth(Transport, Metadata = lists:foldl(fun(Stream, Acc) -> case maps:get(Stream, Topology) of - {error, Err} -> - Acc#{Stream => Err}; + {error, Err} -> Acc#{Stream => Err}; {ok, #{leader_node := LeaderNode, replica_nodes := Replicas}} -> LeaderInfo = case NodeEndpoints of - #{LeaderNode := Info} -> - Info; - _ -> - undefined + #{LeaderNode := Info} -> Info; + _ -> undefined end, ReplicaInfos = lists:foldr(fun(Replica, A) -> case NodeEndpoints of #{Replica := I} -> [I | A]; - _ -> - A + _ -> A end end, [], Replicas), @@ -2301,16 +2304,21 @@ handle_frame_post_auth(Transport, case rabbit_stream_manager:route(RoutingKey, VirtualHost, SuperStream) of {ok, no_route} -> - {?RESPONSE_CODE_OK, <<(-1):16>>}; - {ok, Stream} -> - StreamSize = byte_size(Stream), - {?RESPONSE_CODE_OK, - <<StreamSize:16, Stream:StreamSize/binary>>}; + {?RESPONSE_CODE_OK, <<0:32>>}; + {ok, Streams} -> + StreamCount = length(Streams), + Bin = lists:foldl(fun(Stream, Acc) -> + StreamSize = byte_size(Stream), + <<Acc/binary, StreamSize:16, + Stream:StreamSize/binary>> + end, + <<StreamCount:32>>, Streams), + {?RESPONSE_CODE_OK, Bin}; {error, _} -> rabbit_global_counters:increase_protocol_counter(stream, ?STREAM_DOES_NOT_EXIST, 1), - {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, <<(-1):16>>} + {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, <<0:32>>} end, Frame = @@ -2362,7 +2370,8 @@ handle_frame_post_auth(Transport, State, {request, CorrelationId, {close, ClosingCode, ClosingReason}}) -> - rabbit_log:debug("Stream protocol reader received close command ~p ~p", + rabbit_log:debug("Stream protocol reader received close command " + "~p ~p", [ClosingCode, ClosingReason]), Frame = rabbit_stream_core:frame({response, CorrelationId, @@ -2485,8 +2494,7 @@ clean_state_after_stream_deletion_or_failure(Stream, PubId), {maps:remove(PubId, Pubs), maps:remove({Stream, Ref}, PubToIds)}; - _ -> - {Pubs, PubToIds} + _ -> {Pubs, PubToIds} end end, {Publishers, PublisherToIds}, Publishers), @@ -2603,8 +2611,7 @@ demonitor_stream(Stream, Stream -> demonitor(MonitorRef, [flush]), Acc; - _ -> - maps:put(MonitorRef, Strm, Acc) + _ -> maps:put(MonitorRef, Strm, Acc) end end, #{}, Monitors0), @@ -2625,10 +2632,8 @@ stream_has_publishers(Stream, #stream_connection{publishers = Publishers}) -> lists:any(fun(#publisher{stream = S}) -> case S of - Stream -> - true; - _ -> - false + Stream -> true; + _ -> false end end, maps:values(Publishers)). diff --git a/deps/rabbitmq_stream/test/commands_SUITE.erl b/deps/rabbitmq_stream/test/commands_SUITE.erl index 8fd3eef939..c2652b4ad0 100644 --- a/deps/rabbitmq_stream/test/commands_SUITE.erl +++ b/deps/rabbitmq_stream/test/commands_SUITE.erl @@ -40,7 +40,9 @@ groups() -> init_per_suite(Config) -> case rabbit_ct_helpers:is_mixed_versions() of true -> - {skip, "mixed version clusters are not supported for this suite"}; + {skip, + "mixed version clusters are not supported for " + "this suite"}; _ -> Config1 = rabbit_ct_helpers:set_config(Config, |