summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-09-29 11:29:56 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-09-29 11:29:56 +0200
commit03a11e07713dd8b9ce9a3609bd3fa1a1d7df3327 (patch)
tree6928b4c4531e5b916918ec20171e7f2a1bbf687e
parent04e981e9e6636184fd7ff88b3623062e75c677dd (diff)
downloadrabbitmq-server-git-03a11e07713dd8b9ce9a3609bd3fa1a1d7df3327.tar.gz
Adapt failure tests
They are a bit more defensive. The subscription is also now more reliable by returning a stream-not-available code if necessary. Using also Aten poll interval to 1 second (bumped to 5 seconds in master now).
-rw-r--r--deps/rabbitmq_stream/docs/PROTOCOL.adoc13
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl10
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl3
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl4
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java18
5 files changed, 33 insertions, 15 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
index f3e089430b..130f85590a 100644
--- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc
+++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
@@ -10,7 +10,7 @@ is currently the reference implementation.
int8, int16, int32, int64 - Signed integers (big endian order)
-uint16, uint32, uint64 - Unsigned integers (big endian order)
+uint8, uint16, uint32, uint64 - Unsigned integers (big endian order)
bytes - int32 for the length followed by the bytes of content, length of -1 indicates null.
@@ -160,9 +160,10 @@ Publish => Key Version Stream PublishedMessages
Key => int16 // 0
Version => int16
Stream => string // the name of the stream
- PublisherId => int8
+ PublisherId => uint8
PublishedMessages => [PublishedMessage]
PublishedMessage => PublishingId Message
+ PublishingId => int64
Message => bytes
```
@@ -172,7 +173,7 @@ Publish => Key Version Stream PublishedMessages
PublishConfirm => Key Version PublishingIds
Key => int16 // 1
Version => int16
- PublisherId => int8
+ PublisherId => uint8
PublishingIds => [int64] // to correlate with the messages sent
```
@@ -183,7 +184,7 @@ Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification
Key => int16 // 2
Version => int16
CorrelationId => int32 // correlation id to correlate the response
- SubscriptionId => int8 // client-supplied id to identify the subscription
+ SubscriptionId => uint8 // client-supplied id to identify the subscription
Stream => string // the name of the stream
OffsetSpecification => OffsetType Offset
OffsetType => int16 // 0 (first), 1 (last), 2 (next), 3 (offset), 4 (timestamp)
@@ -197,7 +198,7 @@ Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification
Deliver => Key Version SubscriptionId OsirisChunk
Key => int16 // 3
Version => int32
- SubscriptionId => int8
+ SubscriptionId => uint8
OsirisChunk => MagicVersion NumEntries NumRecords Epoch ChunkFirstOffset ChunkCrc DataLength Messages
MagicVersion => int8
NumEntries => uint16
@@ -406,7 +407,7 @@ CommitOffset => Key Version Reference Stream Offset
Key => int16 // 16
Version => int16
Reference => string // max 256 characters
- Stream => string
+ SubscriptionId => uint8
Offset => int64
```
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index 904d9d5ade..9992688d52 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -146,16 +146,22 @@ handle_call({lookup_local_member, VirtualHost, Stream}, _From, State) ->
Acc
end
end, undefined, [LeaderPid] ++ ReplicaPids),
- %% FIXME: if no local member, maybe return not_available response code
case LocalMember of
undefined ->
- {error, not_found};
+ {error, not_available};
Pid ->
{ok, Pid}
end;
_ ->
{error, not_found}
end;
+ {error, not_found} ->
+ case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
+ not_found ->
+ {error, not_found};
+ _ ->
+ {error, not_available}
+ end;
_ ->
{error, not_found}
end,
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 7c288476f2..fd3f651ba8 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -590,6 +590,9 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket,
case check_read_permitted(#resource{name = Stream, kind = queue, virtual_host = VirtualHost}, User, #{}) of
ok ->
case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of
+ {error, not_available} ->
+ response(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE),
+ {Connection, State, Rest};
{error, not_found} ->
response(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST),
{Connection, State, Rest};
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
index 26315d7e10..4197b1de71 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
@@ -51,6 +51,10 @@ init_per_group(cluster = Group, Config) ->
{rmq_nodename_suffix, Group},
{tcp_ports_base}]),
rabbit_ct_helpers:run_setup_steps(Config2,
+ [fun(StepConfig) ->
+ rabbit_ct_helpers:merge_app_env(StepConfig,
+ {aten, [{poll_interval, 1000}]})
+ end] ++
rabbit_ct_broker_helpers:setup_steps());
init_per_group(_, Config) ->
rabbit_ct_helpers:run_setup_steps(Config).
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
index 5c43b9d0f6..ecabe0ee5c 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
@@ -96,7 +96,7 @@ public class FailureTest {
assertThat(metadataLatch.await(10, TimeUnit.SECONDS)).isTrue();
// wait until there's a new leader
- TestUtils.waitAtMost(Duration.ofSeconds(5), () -> {
+ TestUtils.waitAtMost(Duration.ofSeconds(10), () -> {
Client.StreamMetadata m = publisher.metadata(stream).get(stream);
return m.getLeader() != null && m.getLeader().getPort() != TestUtils.streamPortNode1();
});
@@ -112,10 +112,12 @@ public class FailureTest {
Host.rabbitmqctl("start_app");
}
- // wait until all the replicas are there
- TestUtils.waitAtMost(Duration.ofSeconds(5), () -> {
- Client.StreamMetadata m = publisher.metadata(stream).get(stream);
- return m.getReplicas().size() == 2;
+ // wait until all the replicas are there
+ TestUtils.waitAtMost(
+ Duration.ofSeconds(5),
+ () -> {
+ Client.StreamMetadata m = publisher.metadata(stream).get(stream);
+ return m.getReplicas().size() == 2;
});
confirmLatch.set(new CountDownLatch(1));
@@ -135,8 +137,10 @@ public class FailureTest {
consumeLatch.countDown();
}));
- Client.Response response = consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
- assertThat(response.isOk()).isTrue();
+ TestUtils.waitAtMost(Duration.ofSeconds(5), () -> {
+ Client.Response response = consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
+ return response.isOk();
+ });
assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(bodies).hasSize(3).contains("all nodes available", "2 nodes available", "all nodes are back");
}