summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-09-13 17:53:25 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-09-13 17:53:25 +0200
commit8f207e3c5f729a364f8d31a85f8d6ea313922e6a (patch)
tree7622e5cd0fbb296f4398f270a6ca7594e6d4b00f
parent3d7afcdc18da5bba2d999e750e7ff867b84d1882 (diff)
downloadrabbitmq-server-git-8f207e3c5f729a364f8d31a85f8d6ea313922e6a.tar.gz
Make stream protocol route command return several streams
We expect to have 1 stream for each routing key, but as binding can return several queues for a given key we let that possibility open in the stream protocol.
-rw-r--r--deps/rabbitmq_stream/docs/PROTOCOL.adoc2
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream.erl6
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl9
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl99
-rw-r--r--deps/rabbitmq_stream/test/commands_SUITE.erl4
5 files changed, 62 insertions, 58 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
index 80577961aa..bbe12d04ab 100644
--- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc
+++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
@@ -587,7 +587,7 @@ RouteQuery => Key Version CorrelationId RoutingKey SuperStream
RoutingKey => string
SuperStream => string
-RouteResponse => Key Version CorrelationId Stream
+RouteResponse => Key Version CorrelationId [Stream]
Key => uint16 // 24
Version => uint16
CorrelationId => uint32
diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl
index d3925214d6..9f22b1a11f 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream.erl
@@ -118,10 +118,8 @@ kill_connection(ConnectionName) ->
{ConnectionPid,
#{<<"connection_name">> := ConnectionNameBin}} ->
exit(ConnectionPid, kill);
- {ConnectionPid, _ClientProperties} ->
- ok
- after 1000 ->
- ok
+ {ConnectionPid, _ClientProperties} -> ok
+ after 1000 -> ok
end
end,
pg_local:get_members(rabbit_stream_connections)).
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index a258833f8e..0be72d714f 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -75,7 +75,7 @@ topology(VirtualHost, Stream) ->
gen_server:call(?MODULE, {topology, VirtualHost, Stream}).
-spec route(binary(), binary(), binary()) ->
- {ok, binary() | no_route} | {error, stream_not_found}.
+ {ok, [binary()] | no_route} | {error, stream_not_found}.
route(RoutingKey, VirtualHost, SuperStream) ->
gen_server:call(?MODULE,
{route, RoutingKey, VirtualHost, SuperStream}).
@@ -368,10 +368,9 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
case rabbit_exchange:route(Exchange, Delivery) of
[] ->
{ok, no_route};
- [#resource{name = Stream}] ->
- {ok, Stream};
- [#resource{name = Stream} | _] ->
- {ok, Stream}
+ Routes ->
+ %% FIXME filter non-stream resources
+ {ok, [Stream || #resource{name = Stream} <- Routes]}
end
catch
exit:Error ->
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 412be10650..319d442e73 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -445,7 +445,8 @@ tuned(info, Msg, StateData) ->
end).
state_timeout(State, Transport, Socket) ->
- rabbit_log_connection:warning("Closing connection because of timeout in state '~s' likely due to lack of client action.",
+ rabbit_log_connection:warning("Closing connection because of timeout in state "
+ "'~s' likely due to lack of client action.",
[State]),
close_immediately(Transport, Socket),
stop.
@@ -524,7 +525,8 @@ transition_to_opened(Transport,
config = Configuration}}.
invalid_transition(Transport, Socket, From, To) ->
- rabbit_log_connection:warning("Closing socket ~w. Invalid transition from ~s to ~s.",
+ rabbit_log_connection:warning("Closing socket ~w. Invalid transition from ~s "
+ "to ~s.",
[Socket, From, To]),
close_immediately(Transport, Socket),
stop.
@@ -882,8 +884,7 @@ open(cast,
Ids ->
Acc#{PublisherId => [PublishingId | Ids]}
end;
- false ->
- Acc
+ false -> Acc
end
end,
#{}, CorrelationList),
@@ -963,7 +964,8 @@ open(cast,
{queue_event, #resource{name = StreamName},
{osiris_offset, _QueueResource, -1}},
_StatemData) ->
- rabbit_log:debug("Stream protocol connection received osiris offset event for ~p with offset ~p",
+ rabbit_log:debug("Stream protocol connection received osiris offset "
+ "event for ~p with offset ~p",
[StreamName, -1]),
keep_state_and_data;
open(cast,
@@ -982,11 +984,14 @@ open(cast,
{Connection1, State1} =
case maps:get(StreamName, StreamSubscriptions, undefined) of
undefined ->
- rabbit_log:debug("Stream protocol connection: osiris offset event for ~p, but no subscription (leftover messages after unsubscribe?)",
+ rabbit_log:debug("Stream protocol connection: osiris offset event "
+ "for ~p, but no subscription (leftover messages "
+ "after unsubscribe?)",
[StreamName]),
{Connection, State};
[] ->
- rabbit_log:debug("Stream protocol connection: osiris offset event for ~p, but no registered consumers!",
+ rabbit_log:debug("Stream protocol connection: osiris offset event "
+ "for ~p, but no registered consumers!",
[StreamName]),
{Connection#stream_connection{stream_subscriptions =
maps:remove(StreamName,
@@ -999,15 +1004,15 @@ open(cast,
#consumer{credit = Credit} = Consumer,
Consumer1 =
case Credit of
- 0 ->
- Consumer;
+ 0 -> Consumer;
_ ->
case send_chunks(Transport,
Consumer,
SendFileOct)
of
{error, closed} ->
- rabbit_log_connection:info("Stream protocol connection has been closed by peer",
+ rabbit_log_connection:info("Stream protocol connection has been closed by "
+ "peer",
[]),
throw({stop, normal});
{error, Reason} ->
@@ -1058,7 +1063,8 @@ close_sent(state_timeout, close,
#statem_data{transport = Transport,
connection = #stream_connection{socket = Socket},
connection_state = State}) ->
- rabbit_log_connection:warning("Closing connection because of timeout in state '~s' likely due to lack of client action.",
+ rabbit_log_connection:warning("Closing connection because of timeout in state "
+ "'~s' likely due to lack of client action.",
[?FUNCTION_NAME]),
close(Transport, Socket, State),
stop;
@@ -1089,13 +1095,15 @@ close_sent(info, {tcp_closed, S}, _StatemData) ->
stop;
close_sent(info, {tcp_error, S, Reason},
#statem_data{transport = Transport, connection_state = State}) ->
- rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] [~w]",
+ rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] "
+ "[~w]",
[Reason, S, self()]),
close(Transport, S, State),
stop;
close_sent(info, {resource_alarm, IsThereAlarm},
StatemData = #statem_data{connection = Connection}) ->
- rabbit_log:warning("Stream protocol connection ignored a resource alarm ~p in state ~s",
+ rabbit_log:warning("Stream protocol connection ignored a resource "
+ "alarm ~p in state ~s",
[IsThereAlarm, ?FUNCTION_NAME]),
{keep_state,
StatemData#statem_data{connection =
@@ -1828,7 +1836,8 @@ handle_frame_post_auth(Transport,
SendFileOct)
of
{error, closed} ->
- rabbit_log_connection:info("Stream protocol connection has been closed by peer",
+ rabbit_log_connection:info("Stream protocol connection has been closed by "
+ "peer",
[]),
throw({stop, normal});
{{segment, Segment1}, {credit, Credit1}} ->
@@ -1909,7 +1918,8 @@ handle_frame_post_auth(Transport,
SendFileOct)
of
{error, closed} ->
- rabbit_log_connection:info("Stream protocol connection has been closed by peer",
+ rabbit_log_connection:info("Stream protocol connection has been closed by "
+ "peer",
[]),
throw({stop, normal});
{{segment, Segment1}, {credit, Credit1}} ->
@@ -2061,7 +2071,8 @@ handle_frame_post_auth(Transport,
{ok,
#{leader_node := LeaderPid,
replica_nodes := ReturnedReplicas}} ->
- rabbit_log:debug("Created stream cluster with leader on ~p and replicas on ~p",
+ rabbit_log:debug("Created stream cluster with leader on ~p and "
+ "replicas on ~p",
[LeaderPid, ReturnedReplicas]),
response_ok(Transport,
Connection,
@@ -2222,8 +2233,7 @@ handle_frame_post_auth(Transport,
NodesAcc)
end,
Acc1, ReplicaNodes);
- {error, _} ->
- Acc
+ {error, _} -> Acc
end
end,
#{}, Streams),
@@ -2235,16 +2245,13 @@ handle_frame_post_auth(Transport,
lists:foldr(fun(Node, Acc) ->
PortFunction =
case TransportLayer of
- tcp ->
- port;
- ssl ->
- tls_port
+ tcp -> port;
+ ssl -> tls_port
end,
Host = rpc:call(Node, rabbit_stream, host, []),
Port = rpc:call(Node, rabbit_stream, PortFunction, []),
case {is_binary(Host), is_integer(Port)} of
- {true, true} ->
- Acc#{Node => {Host, Port}};
+ {true, true} -> Acc#{Node => {Host, Port}};
_ ->
rabbit_log:warning("Error when retrieving broker metadata: ~p ~p",
[Host, Port]),
@@ -2256,25 +2263,21 @@ handle_frame_post_auth(Transport,
Metadata =
lists:foldl(fun(Stream, Acc) ->
case maps:get(Stream, Topology) of
- {error, Err} ->
- Acc#{Stream => Err};
+ {error, Err} -> Acc#{Stream => Err};
{ok,
#{leader_node := LeaderNode,
replica_nodes := Replicas}} ->
LeaderInfo =
case NodeEndpoints of
- #{LeaderNode := Info} ->
- Info;
- _ ->
- undefined
+ #{LeaderNode := Info} -> Info;
+ _ -> undefined
end,
ReplicaInfos =
lists:foldr(fun(Replica, A) ->
case NodeEndpoints of
#{Replica := I} ->
[I | A];
- _ ->
- A
+ _ -> A
end
end,
[], Replicas),
@@ -2301,16 +2304,21 @@ handle_frame_post_auth(Transport,
case rabbit_stream_manager:route(RoutingKey, VirtualHost, SuperStream)
of
{ok, no_route} ->
- {?RESPONSE_CODE_OK, <<(-1):16>>};
- {ok, Stream} ->
- StreamSize = byte_size(Stream),
- {?RESPONSE_CODE_OK,
- <<StreamSize:16, Stream:StreamSize/binary>>};
+ {?RESPONSE_CODE_OK, <<0:32>>};
+ {ok, Streams} ->
+ StreamCount = length(Streams),
+ Bin = lists:foldl(fun(Stream, Acc) ->
+ StreamSize = byte_size(Stream),
+ <<Acc/binary, StreamSize:16,
+ Stream:StreamSize/binary>>
+ end,
+ <<StreamCount:32>>, Streams),
+ {?RESPONSE_CODE_OK, Bin};
{error, _} ->
rabbit_global_counters:increase_protocol_counter(stream,
?STREAM_DOES_NOT_EXIST,
1),
- {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, <<(-1):16>>}
+ {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, <<0:32>>}
end,
Frame =
@@ -2362,7 +2370,8 @@ handle_frame_post_auth(Transport,
State,
{request, CorrelationId,
{close, ClosingCode, ClosingReason}}) ->
- rabbit_log:debug("Stream protocol reader received close command ~p ~p",
+ rabbit_log:debug("Stream protocol reader received close command "
+ "~p ~p",
[ClosingCode, ClosingReason]),
Frame =
rabbit_stream_core:frame({response, CorrelationId,
@@ -2485,8 +2494,7 @@ clean_state_after_stream_deletion_or_failure(Stream,
PubId),
{maps:remove(PubId, Pubs),
maps:remove({Stream, Ref}, PubToIds)};
- _ ->
- {Pubs, PubToIds}
+ _ -> {Pubs, PubToIds}
end
end,
{Publishers, PublisherToIds}, Publishers),
@@ -2603,8 +2611,7 @@ demonitor_stream(Stream,
Stream ->
demonitor(MonitorRef, [flush]),
Acc;
- _ ->
- maps:put(MonitorRef, Strm, Acc)
+ _ -> maps:put(MonitorRef, Strm, Acc)
end
end,
#{}, Monitors0),
@@ -2625,10 +2632,8 @@ stream_has_publishers(Stream,
#stream_connection{publishers = Publishers}) ->
lists:any(fun(#publisher{stream = S}) ->
case S of
- Stream ->
- true;
- _ ->
- false
+ Stream -> true;
+ _ -> false
end
end,
maps:values(Publishers)).
diff --git a/deps/rabbitmq_stream/test/commands_SUITE.erl b/deps/rabbitmq_stream/test/commands_SUITE.erl
index 8fd3eef939..c2652b4ad0 100644
--- a/deps/rabbitmq_stream/test/commands_SUITE.erl
+++ b/deps/rabbitmq_stream/test/commands_SUITE.erl
@@ -40,7 +40,9 @@ groups() ->
init_per_suite(Config) ->
case rabbit_ct_helpers:is_mixed_versions() of
true ->
- {skip, "mixed version clusters are not supported for this suite"};
+ {skip,
+ "mixed version clusters are not supported for "
+ "this suite"};
_ ->
Config1 =
rabbit_ct_helpers:set_config(Config,