summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-09-15 11:33:04 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-09-15 11:33:04 +0200
commit5b83dceb87de4a92b07fc8de9e84ff03033efdc1 (patch)
tree83775b879ea22cece04250c9067ae1355f7dd4c5
parent04a06535716ba3dc7fb19b20de6766fa89881242 (diff)
downloadrabbitmq-server-git-5b83dceb87de4a92b07fc8de9e84ff03033efdc1.tar.gz
Return only streams for partition-related commands
The stream partition metadata is based on bindings, so we make sure to return only streams from the binding information.
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl25
1 files changed, 19 insertions, 6 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index 4e202ed5b7..67f3be2214 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -369,8 +369,10 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
[] ->
{ok, no_route};
Routes ->
- %% FIXME filter non-stream resources
- {ok, [Stream || #resource{name = Stream} <- Routes]}
+ {ok,
+ [Stream
+ || #resource{name = Stream} = R <- Routes,
+ is_resource_stream_queue(R)]}
end
catch
exit:Error ->
@@ -383,10 +385,11 @@ handle_call({partitions, VirtualHost, SuperStream}, _From, State) ->
ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream),
Res = try
rabbit_exchange:lookup_or_die(ExchangeName),
- %% FIXME make sure queue is a stream
- %% TODO bindings could be sorted by partition number, by using a binding argument
- %% this would make the spreading of messages stable
- UnorderedBindings = rabbit_binding:list_for_source(ExchangeName),
+ UnorderedBindings =
+ [Binding
+ || Binding = #binding{destination = D}
+ <- rabbit_binding:list_for_source(ExchangeName),
+ is_resource_stream_queue(D)],
OrderedBindings =
rabbit_stream_utils:sort_partitions(UnorderedBindings),
{ok,
@@ -448,3 +451,13 @@ is_stream_queue(Q) ->
_ ->
false
end.
+
+is_resource_stream_queue(#resource{kind = queue} = Resource) ->
+ case rabbit_amqqueue:lookup(Resource) of
+ {ok, Q} ->
+ is_stream_queue(Q);
+ _ ->
+ false
+ end;
+is_resource_stream_queue(_) ->
+ false.