summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-22 15:41:42 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-22 15:41:42 +0200
commitbb510b0e140b507b1920535d4514a3056856c25e (patch)
tree8aaceaadfb4a39e34296821820638f416d678e23
parentdd8b3ea008a635c11e7e95d67138ebab11608538 (diff)
parent43898e59b1331fc23251a68d3be983501f54686e (diff)
downloadrabbitmq-server-git-bb510b0e140b507b1920535d4514a3056856c25e.tar.gz
Merge branch 'master' into management-integration
Conflicts: src/rabbit_stream_manager.erl
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl5
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java170
2 files changed, 175 insertions, 0 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index 1a0d982884..5b318e0635 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -66,6 +66,11 @@ stream_queue_arguments(ArgumentsAcc, #{<<"max-length-bytes">> := Value} = Argume
[{<<"x-max-length-bytes">>, long, binary_to_integer(Value)}] ++ ArgumentsAcc,
maps:remove(<<"max-length-bytes">>, Arguments)
);
+stream_queue_arguments(ArgumentsAcc, #{<<"max-age">> := Value} = Arguments) ->
+ stream_queue_arguments(
+ [{<<"x-max-age">>, longstr, Value}] ++ ArgumentsAcc,
+ maps:remove(<<"max-age">>, Arguments)
+ );
stream_queue_arguments(ArgumentsAcc, #{<<"max-segment-size">> := Value} = Arguments) ->
stream_queue_arguments(
[{<<"x-max-segment-size">>, long, binary_to_integer(Value)}] ++ ArgumentsAcc,
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java
new file mode 100644
index 0000000000..5dc2256643
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java
@@ -0,0 +1,170 @@
+// The contents of this file are subject to the Mozilla Public License
+// Version 2.0 (the "License"); you may not use this file except in
+// compliance with the License. You may obtain a copy of the License
+// at https://www.mozilla.org/en-US/MPL/2.0/
+//
+// Software distributed under the License is distributed on an "AS IS"
+// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+// the License for the specific language governing rights and
+// limitations under the License.
+//
+// The Original Code is RabbitMQ.
+//
+// The Initial Developer of the Original Code is Pivotal Software, Inc.
+// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+//
+
+package com.rabbitmq.stream;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.rabbitmq.stream.impl.Client;
+import com.rabbitmq.stream.impl.Client.Broker;
+import com.rabbitmq.stream.impl.Client.ClientParameters;
+import com.rabbitmq.stream.impl.Client.Response;
+import com.rabbitmq.stream.impl.Client.StreamMetadata;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.LoggerFactory;
+
+@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
+public class LeaderLocatorTest {
+
+ TestUtils.ClientFactory cf;
+
+ @Test
+ void invalidLocatorShouldReturnError() {
+ Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ String s = UUID.randomUUID().toString();
+ Response response = client.create(s, Collections.singletonMap("queue-leader-locator", "foo"));
+ assertThat(response.isOk()).isFalse();
+ assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_PRECONDITION_FAILED);
+ }
+
+ @Test
+ void clientLocalLocatorShouldMakeLeaderOnConnectedNode() {
+ int[] ports = new int[] {TestUtils.streamPortNode1(), TestUtils.streamPortNode2()};
+ for (int port : ports) {
+ Client client = cf.get(new Client.ClientParameters().port(port));
+ String s = UUID.randomUUID().toString();
+ try {
+ Response response =
+ client.create(s, Collections.singletonMap("queue-leader-locator", "client-local"));
+ assertThat(response.isOk()).isTrue();
+ StreamMetadata metadata = client.metadata(s).get(s);
+ assertThat(metadata).isNotNull();
+ assertThat(metadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
+ assertThat(metadata.getLeader()).isNotNull().extracting(b -> b.getPort()).isEqualTo(port);
+ } finally {
+ client.delete(s);
+ }
+ }
+ }
+
+ @Test
+ void randomLocatorShouldCreateOnAllNodesAfterSomeTime() throws Exception {
+ int clusterSize = 3;
+ Set<String> createdStreams = ConcurrentHashMap.newKeySet();
+ Set<Broker> leaderNodes = ConcurrentHashMap.newKeySet(clusterSize);
+ CountDownLatch latch = new CountDownLatch(1);
+ Client client = cf.get(new ClientParameters().port(TestUtils.streamPortNode1()));
+ Runnable runnable =
+ () -> {
+ while (leaderNodes.size() < clusterSize && !Thread.interrupted()) {
+ String s = UUID.randomUUID().toString();
+ Response response =
+ client.create(s, Collections.singletonMap("queue-leader-locator", "random"));
+ if (!response.isOk()) {
+ break;
+ }
+ createdStreams.add(s);
+ StreamMetadata metadata = client.metadata(s).get(s);
+ if (metadata == null || !metadata.isResponseOk() || metadata.getLeader() == null) {
+ break;
+ }
+ leaderNodes.add(metadata.getLeader());
+ }
+ latch.countDown();
+ };
+
+ Thread worker = new Thread(runnable);
+ worker.start();
+
+ try {
+ assertThat(latch.await(10, SECONDS)).isTrue();
+ assertThat(leaderNodes).hasSize(clusterSize);
+ // in case Broker class is broken
+ assertThat(leaderNodes.stream().map(b -> b.getPort()).collect(Collectors.toSet()))
+ .hasSize(clusterSize);
+ } finally {
+ if (worker.isAlive()) {
+ worker.interrupt();
+ }
+ createdStreams.forEach(
+ s -> {
+ Response response = client.delete(s);
+ if (!response.isOk()) {
+ LoggerFactory.getLogger(LeaderLocatorTest.class).warn("Error while deleting stream");
+ }
+ });
+ }
+ }
+
+ @Test
+ void leastLeadersShouldStreamLeadersOnTheCluster() {
+ int clusterSize = 3;
+ int streamsByNode = 5;
+ int streamCount = clusterSize * streamsByNode;
+ Set<String> createdStreams = ConcurrentHashMap.newKeySet();
+ Client client = cf.get(new ClientParameters().port(TestUtils.streamPortNode1()));
+
+ try {
+ IntStream.range(0, streamCount)
+ .forEach(
+ i -> {
+ String s = UUID.randomUUID().toString();
+ Response response =
+ client.create(
+ s, Collections.singletonMap("queue-leader-locator", "least-leaders"));
+ assertThat(response.isOk()).isTrue();
+ createdStreams.add(s);
+ });
+
+ Map<Integer, Integer> leaderCount = new HashMap<>();
+ Map<String, StreamMetadata> metadata =
+ client.metadata(createdStreams.toArray(new String[] {}));
+ assertThat(metadata).hasSize(streamCount);
+
+ metadata
+ .values()
+ .forEach(
+ streamMetadata -> {
+ assertThat(streamMetadata.isResponseOk()).isTrue();
+ assertThat(streamMetadata.getLeader()).isNotNull();
+ leaderCount.compute(
+ streamMetadata.getLeader().getPort(),
+ (port, value) -> value == null ? 1 : ++value);
+ });
+ assertThat(leaderCount).hasSize(clusterSize);
+ leaderCount.values().forEach(count -> assertThat(count).isEqualTo(streamsByNode));
+ } finally {
+ createdStreams.forEach(
+ s -> {
+ Response response = client.delete(s);
+ if (!response.isOk()) {
+ LoggerFactory.getLogger(LeaderLocatorTest.class).warn("Error while deleting stream");
+ }
+ });
+ }
+ }
+}