diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-09-15 11:33:04 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-09-15 11:33:04 +0200 |
commit | 5b83dceb87de4a92b07fc8de9e84ff03033efdc1 (patch) | |
tree | 83775b879ea22cece04250c9067ae1355f7dd4c5 | |
parent | 04a06535716ba3dc7fb19b20de6766fa89881242 (diff) | |
download | rabbitmq-server-git-5b83dceb87de4a92b07fc8de9e84ff03033efdc1.tar.gz |
Return only streams for partition-related commands
The stream partition metadata is based on bindings,
so we make sure to return only streams from the binding
information.
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_manager.erl | 25 |
1 files changed, 19 insertions, 6 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 4e202ed5b7..67f3be2214 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -369,8 +369,10 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From, [] -> {ok, no_route}; Routes -> - %% FIXME filter non-stream resources - {ok, [Stream || #resource{name = Stream} <- Routes]} + {ok, + [Stream + || #resource{name = Stream} = R <- Routes, + is_resource_stream_queue(R)]} end catch exit:Error -> @@ -383,10 +385,11 @@ handle_call({partitions, VirtualHost, SuperStream}, _From, State) -> ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream), Res = try rabbit_exchange:lookup_or_die(ExchangeName), - %% FIXME make sure queue is a stream - %% TODO bindings could be sorted by partition number, by using a binding argument - %% this would make the spreading of messages stable - UnorderedBindings = rabbit_binding:list_for_source(ExchangeName), + UnorderedBindings = + [Binding + || Binding = #binding{destination = D} + <- rabbit_binding:list_for_source(ExchangeName), + is_resource_stream_queue(D)], OrderedBindings = rabbit_stream_utils:sort_partitions(UnorderedBindings), {ok, @@ -448,3 +451,13 @@ is_stream_queue(Q) -> _ -> false end. + +is_resource_stream_queue(#resource{kind = queue} = Resource) -> + case rabbit_amqqueue:lookup(Resource) of + {ok, Q} -> + is_stream_queue(Q); + _ -> + false + end; +is_resource_stream_queue(_) -> + false. |