summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-09-17 09:30:20 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-09-17 09:30:20 +0200
commit04e981e9e6636184fd7ff88b3623062e75c677dd (patch)
tree0bc51a4c743ed2dbc393eb2f2e4d8803e3e34788
parentcf5e99cd2908636fa16cab4b1726f060dda81b49 (diff)
downloadrabbitmq-server-git-04e981e9e6636184fd7ff88b3623062e75c677dd.tar.gz
Use rabbit_durable_queue to check if stream exists
This is done after checking rabbit_queue and if it returns that the queue does not exist. The coordinator may be recovering the queue, so thanks to this double check we know the queue exists but is not available, instead of thinking it does not exist at all.
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl7
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl3
2 files changed, 10 insertions, 0 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index e250d0fe69..904d9d5ade 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -170,6 +170,13 @@ handle_call({topology, VirtualHost, Stream}, _From, State) ->
_ ->
{error, stream_not_found}
end;
+ {error, not_found} ->
+ case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
+ not_found ->
+ {error, stream_not_found};
+ _ ->
+ {error, stream_not_available}
+ end;
_ ->
{error, stream_not_found}
end,
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index d41ff48a8c..7c288476f2 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -844,6 +844,9 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host =
{error, stream_not_found} ->
<<Acc/binary, StreamLength:16, Stream:StreamLength/binary, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST:16,
-1:16, 0:32>>;
+ {error, stream_not_available} ->
+ <<Acc/binary, StreamLength:16, Stream:StreamLength/binary, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16,
+ -1:16, 0:32>>;
{ok, #{leader_node := LeaderNode, replica_nodes := Replicas}} ->
LeaderIndex = case NodesInfo of
#{LeaderNode := NodeInfo} ->