diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-09-29 11:29:56 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-09-29 11:29:56 +0200 |
commit | 03a11e07713dd8b9ce9a3609bd3fa1a1d7df3327 (patch) | |
tree | 6928b4c4531e5b916918ec20171e7f2a1bbf687e | |
parent | 04e981e9e6636184fd7ff88b3623062e75c677dd (diff) | |
download | rabbitmq-server-git-03a11e07713dd8b9ce9a3609bd3fa1a1d7df3327.tar.gz |
Adapt failure tests
They are a bit more defensive. The subscription is also now more
reliable by returning a stream-not-available code if necessary.
Using also Aten poll interval to 1 second (bumped to 5 seconds in master
now).
5 files changed, 33 insertions, 15 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index f3e089430b..130f85590a 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -10,7 +10,7 @@ is currently the reference implementation. int8, int16, int32, int64 - Signed integers (big endian order) -uint16, uint32, uint64 - Unsigned integers (big endian order) +uint8, uint16, uint32, uint64 - Unsigned integers (big endian order) bytes - int32 for the length followed by the bytes of content, length of -1 indicates null. @@ -160,9 +160,10 @@ Publish => Key Version Stream PublishedMessages Key => int16 // 0 Version => int16 Stream => string // the name of the stream - PublisherId => int8 + PublisherId => uint8 PublishedMessages => [PublishedMessage] PublishedMessage => PublishingId Message + PublishingId => int64 Message => bytes ``` @@ -172,7 +173,7 @@ Publish => Key Version Stream PublishedMessages PublishConfirm => Key Version PublishingIds Key => int16 // 1 Version => int16 - PublisherId => int8 + PublisherId => uint8 PublishingIds => [int64] // to correlate with the messages sent ``` @@ -183,7 +184,7 @@ Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification Key => int16 // 2 Version => int16 CorrelationId => int32 // correlation id to correlate the response - SubscriptionId => int8 // client-supplied id to identify the subscription + SubscriptionId => uint8 // client-supplied id to identify the subscription Stream => string // the name of the stream OffsetSpecification => OffsetType Offset OffsetType => int16 // 0 (first), 1 (last), 2 (next), 3 (offset), 4 (timestamp) @@ -197,7 +198,7 @@ Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification Deliver => Key Version SubscriptionId OsirisChunk Key => int16 // 3 Version => int32 - SubscriptionId => int8 + SubscriptionId => uint8 OsirisChunk => MagicVersion NumEntries NumRecords Epoch ChunkFirstOffset ChunkCrc DataLength Messages MagicVersion => int8 NumEntries => uint16 @@ -406,7 +407,7 @@ CommitOffset => Key Version Reference Stream Offset Key => int16 // 16 Version => int16 Reference => string // max 256 characters - Stream => string + SubscriptionId => uint8 Offset => int64 ``` diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 904d9d5ade..9992688d52 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -146,16 +146,22 @@ handle_call({lookup_local_member, VirtualHost, Stream}, _From, State) -> Acc end end, undefined, [LeaderPid] ++ ReplicaPids), - %% FIXME: if no local member, maybe return not_available response code case LocalMember of undefined -> - {error, not_found}; + {error, not_available}; Pid -> {ok, Pid} end; _ -> {error, not_found} end; + {error, not_found} -> + case rabbit_amqqueue:not_found_or_absent_dirty(Name) of + not_found -> + {error, not_found}; + _ -> + {error, not_available} + end; _ -> {error, not_found} end, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 7c288476f2..fd3f651ba8 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -590,6 +590,9 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket, case check_read_permitted(#resource{name = Stream, kind = queue, virtual_host = VirtualHost}, User, #{}) of ok -> case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of + {error, not_available} -> + response(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE), + {Connection, State, Rest}; {error, not_found} -> response(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), {Connection, State, Rest}; diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 26315d7e10..4197b1de71 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -51,6 +51,10 @@ init_per_group(cluster = Group, Config) -> {rmq_nodename_suffix, Group}, {tcp_ports_base}]), rabbit_ct_helpers:run_setup_steps(Config2, + [fun(StepConfig) -> + rabbit_ct_helpers:merge_app_env(StepConfig, + {aten, [{poll_interval, 1000}]}) + end] ++ rabbit_ct_broker_helpers:setup_steps()); init_per_group(_, Config) -> rabbit_ct_helpers:run_setup_steps(Config). diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java index 5c43b9d0f6..ecabe0ee5c 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java @@ -96,7 +96,7 @@ public class FailureTest { assertThat(metadataLatch.await(10, TimeUnit.SECONDS)).isTrue(); // wait until there's a new leader - TestUtils.waitAtMost(Duration.ofSeconds(5), () -> { + TestUtils.waitAtMost(Duration.ofSeconds(10), () -> { Client.StreamMetadata m = publisher.metadata(stream).get(stream); return m.getLeader() != null && m.getLeader().getPort() != TestUtils.streamPortNode1(); }); @@ -112,10 +112,12 @@ public class FailureTest { Host.rabbitmqctl("start_app"); } - // wait until all the replicas are there - TestUtils.waitAtMost(Duration.ofSeconds(5), () -> { - Client.StreamMetadata m = publisher.metadata(stream).get(stream); - return m.getReplicas().size() == 2; + // wait until all the replicas are there + TestUtils.waitAtMost( + Duration.ofSeconds(5), + () -> { + Client.StreamMetadata m = publisher.metadata(stream).get(stream); + return m.getReplicas().size() == 2; }); confirmLatch.set(new CountDownLatch(1)); @@ -135,8 +137,10 @@ public class FailureTest { consumeLatch.countDown(); })); - Client.Response response = consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); - assertThat(response.isOk()).isTrue(); + TestUtils.waitAtMost(Duration.ofSeconds(5), () -> { + Client.Response response = consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); + return response.isOk(); + }); assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(bodies).hasSize(3).contains("all nodes available", "2 nodes available", "all nodes are back"); } |