diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-09-17 09:30:20 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-09-17 09:30:20 +0200 |
commit | 04e981e9e6636184fd7ff88b3623062e75c677dd (patch) | |
tree | 0bc51a4c743ed2dbc393eb2f2e4d8803e3e34788 | |
parent | cf5e99cd2908636fa16cab4b1726f060dda81b49 (diff) | |
download | rabbitmq-server-git-04e981e9e6636184fd7ff88b3623062e75c677dd.tar.gz |
Use rabbit_durable_queue to check if stream exists
This is done after checking rabbit_queue and if it returns that the
queue does not exist. The coordinator may be recovering the queue, so
thanks to this double check we know the queue exists but is not
available, instead of thinking it does not exist at all.
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_manager.erl | 7 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 3 |
2 files changed, 10 insertions, 0 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index e250d0fe69..904d9d5ade 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -170,6 +170,13 @@ handle_call({topology, VirtualHost, Stream}, _From, State) -> _ -> {error, stream_not_found} end; + {error, not_found} -> + case rabbit_amqqueue:not_found_or_absent_dirty(Name) of + not_found -> + {error, stream_not_found}; + _ -> + {error, stream_not_available} + end; _ -> {error, stream_not_found} end, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index d41ff48a8c..7c288476f2 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -844,6 +844,9 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = {error, stream_not_found} -> <<Acc/binary, StreamLength:16, Stream:StreamLength/binary, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST:16, -1:16, 0:32>>; + {error, stream_not_available} -> + <<Acc/binary, StreamLength:16, Stream:StreamLength/binary, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16, + -1:16, 0:32>>; {ok, #{leader_node := LeaderNode, replica_nodes := Replicas}} -> LeaderIndex = case NodesInfo of #{LeaderNode := NodeInfo} -> |