summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-21 15:25:31 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-21 15:25:31 +0200
commita1f600635608e8dd935a8465bf44a29ccaa8215e (patch)
tree0f23c64376ccc7187e7794f50f436e5b167b9a2c
parentcc030ac195c00dc7f34616f0d2303deee6864371 (diff)
downloadrabbitmq-server-git-a1f600635608e8dd935a8465bf44a29ccaa8215e.tar.gz
Support leader locator strategies
References rabbitmq/rabbitmq-server#2471
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl14
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java170
2 files changed, 184 insertions, 0 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index 9fa4e3521d..cf1829bc62 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -76,6 +76,11 @@ stream_queue_arguments(ArgumentsAcc, #{<<"initial-cluster-size">> := Value} = Ar
[{<<"x-initial-cluster-size">>, long, binary_to_integer(Value)}] ++ ArgumentsAcc,
maps:remove(<<"initial-cluster-size">>, Arguments)
);
+stream_queue_arguments(ArgumentsAcc, #{<<"queue-leader-locator">> := Value} = Arguments) ->
+ stream_queue_arguments(
+ [{<<"x-queue-leader-locator">>, longstr, Value}] ++ ArgumentsAcc,
+ maps:remove(<<"queue-leader-locator">>, Arguments)
+ );
stream_queue_arguments(ArgumentsAcc, _Arguments) ->
ArgumentsAcc.
@@ -83,6 +88,15 @@ validate_stream_queue_arguments([]) ->
ok;
validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long, ClusterSize} | _]) when ClusterSize =< 0 ->
error;
+validate_stream_queue_arguments([{<<"x-queue-leader-locator">>, longstr, Locator} | T]) ->
+ case lists:member(Locator, [<<"client-local">>,
+ <<"random">>,
+ <<"least-leaders">>]) of
+ true ->
+ validate_stream_queue_arguments(T);
+ false ->
+ error
+ end;
validate_stream_queue_arguments([_ | T]) ->
validate_stream_queue_arguments(T).
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java
new file mode 100644
index 0000000000..5dc2256643
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/LeaderLocatorTest.java
@@ -0,0 +1,170 @@
+// The contents of this file are subject to the Mozilla Public License
+// Version 2.0 (the "License"); you may not use this file except in
+// compliance with the License. You may obtain a copy of the License
+// at https://www.mozilla.org/en-US/MPL/2.0/
+//
+// Software distributed under the License is distributed on an "AS IS"
+// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+// the License for the specific language governing rights and
+// limitations under the License.
+//
+// The Original Code is RabbitMQ.
+//
+// The Initial Developer of the Original Code is Pivotal Software, Inc.
+// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+//
+
+package com.rabbitmq.stream;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.rabbitmq.stream.impl.Client;
+import com.rabbitmq.stream.impl.Client.Broker;
+import com.rabbitmq.stream.impl.Client.ClientParameters;
+import com.rabbitmq.stream.impl.Client.Response;
+import com.rabbitmq.stream.impl.Client.StreamMetadata;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.LoggerFactory;
+
+@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
+public class LeaderLocatorTest {
+
+ TestUtils.ClientFactory cf;
+
+ @Test
+ void invalidLocatorShouldReturnError() {
+ Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ String s = UUID.randomUUID().toString();
+ Response response = client.create(s, Collections.singletonMap("queue-leader-locator", "foo"));
+ assertThat(response.isOk()).isFalse();
+ assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_PRECONDITION_FAILED);
+ }
+
+ @Test
+ void clientLocalLocatorShouldMakeLeaderOnConnectedNode() {
+ int[] ports = new int[] {TestUtils.streamPortNode1(), TestUtils.streamPortNode2()};
+ for (int port : ports) {
+ Client client = cf.get(new Client.ClientParameters().port(port));
+ String s = UUID.randomUUID().toString();
+ try {
+ Response response =
+ client.create(s, Collections.singletonMap("queue-leader-locator", "client-local"));
+ assertThat(response.isOk()).isTrue();
+ StreamMetadata metadata = client.metadata(s).get(s);
+ assertThat(metadata).isNotNull();
+ assertThat(metadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
+ assertThat(metadata.getLeader()).isNotNull().extracting(b -> b.getPort()).isEqualTo(port);
+ } finally {
+ client.delete(s);
+ }
+ }
+ }
+
+ @Test
+ void randomLocatorShouldCreateOnAllNodesAfterSomeTime() throws Exception {
+ int clusterSize = 3;
+ Set<String> createdStreams = ConcurrentHashMap.newKeySet();
+ Set<Broker> leaderNodes = ConcurrentHashMap.newKeySet(clusterSize);
+ CountDownLatch latch = new CountDownLatch(1);
+ Client client = cf.get(new ClientParameters().port(TestUtils.streamPortNode1()));
+ Runnable runnable =
+ () -> {
+ while (leaderNodes.size() < clusterSize && !Thread.interrupted()) {
+ String s = UUID.randomUUID().toString();
+ Response response =
+ client.create(s, Collections.singletonMap("queue-leader-locator", "random"));
+ if (!response.isOk()) {
+ break;
+ }
+ createdStreams.add(s);
+ StreamMetadata metadata = client.metadata(s).get(s);
+ if (metadata == null || !metadata.isResponseOk() || metadata.getLeader() == null) {
+ break;
+ }
+ leaderNodes.add(metadata.getLeader());
+ }
+ latch.countDown();
+ };
+
+ Thread worker = new Thread(runnable);
+ worker.start();
+
+ try {
+ assertThat(latch.await(10, SECONDS)).isTrue();
+ assertThat(leaderNodes).hasSize(clusterSize);
+ // in case Broker class is broken
+ assertThat(leaderNodes.stream().map(b -> b.getPort()).collect(Collectors.toSet()))
+ .hasSize(clusterSize);
+ } finally {
+ if (worker.isAlive()) {
+ worker.interrupt();
+ }
+ createdStreams.forEach(
+ s -> {
+ Response response = client.delete(s);
+ if (!response.isOk()) {
+ LoggerFactory.getLogger(LeaderLocatorTest.class).warn("Error while deleting stream");
+ }
+ });
+ }
+ }
+
+ @Test
+ void leastLeadersShouldStreamLeadersOnTheCluster() {
+ int clusterSize = 3;
+ int streamsByNode = 5;
+ int streamCount = clusterSize * streamsByNode;
+ Set<String> createdStreams = ConcurrentHashMap.newKeySet();
+ Client client = cf.get(new ClientParameters().port(TestUtils.streamPortNode1()));
+
+ try {
+ IntStream.range(0, streamCount)
+ .forEach(
+ i -> {
+ String s = UUID.randomUUID().toString();
+ Response response =
+ client.create(
+ s, Collections.singletonMap("queue-leader-locator", "least-leaders"));
+ assertThat(response.isOk()).isTrue();
+ createdStreams.add(s);
+ });
+
+ Map<Integer, Integer> leaderCount = new HashMap<>();
+ Map<String, StreamMetadata> metadata =
+ client.metadata(createdStreams.toArray(new String[] {}));
+ assertThat(metadata).hasSize(streamCount);
+
+ metadata
+ .values()
+ .forEach(
+ streamMetadata -> {
+ assertThat(streamMetadata.isResponseOk()).isTrue();
+ assertThat(streamMetadata.getLeader()).isNotNull();
+ leaderCount.compute(
+ streamMetadata.getLeader().getPort(),
+ (port, value) -> value == null ? 1 : ++value);
+ });
+ assertThat(leaderCount).hasSize(clusterSize);
+ leaderCount.values().forEach(count -> assertThat(count).isEqualTo(streamsByNode));
+ } finally {
+ createdStreams.forEach(
+ s -> {
+ Response response = client.delete(s);
+ if (!response.isOk()) {
+ LoggerFactory.getLogger(LeaderLocatorTest.class).warn("Error while deleting stream");
+ }
+ });
+ }
+ }
+}