summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-09-29 12:05:29 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-09-29 12:05:29 +0200
commitb8bdb5ae7b633ed0c897af67337a27dcde838784 (patch)
tree8ceb40380d91501dd2d066334def7e68654bf72c
parent03a11e07713dd8b9ce9a3609bd3fa1a1d7df3327 (diff)
downloadrabbitmq-server-git-b8bdb5ae7b633ed0c897af67337a27dcde838784.tar.gz
Bump dependencies in Java test suite
And apply Google Java Format with Maven Spotless plugin.
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml23
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java796
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java143
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java224
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java254
5 files changed, 785 insertions, 655 deletions
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml
index 7bc9e659ed..aa27c29baf 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml
@@ -27,13 +27,14 @@
<properties>
<stream-client.version>0.1.0-SNAPSHOT</stream-client.version>
- <proton-j.version>0.33.5</proton-j.version>
- <junit.jupiter.version>5.6.2</junit.jupiter.version>
- <assertj.version>3.16.1</assertj.version>
- <mockito.version>3.3.3</mockito.version>
+ <proton-j.version>0.33.6</proton-j.version>
+ <junit.jupiter.version>5.7.0</junit.jupiter.version>
+ <assertj.version>3.17.2</assertj.version>
+ <mockito.version>3.5.11</mockito.version>
<logback.version>1.2.3</logback.version>
<maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
+ <spotless.version>2.2.0</spotless.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
@@ -110,6 +111,20 @@
<version>${maven-surefire-plugin.version}</version>
</plugin>
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ <version>${spotless.version}</version>
+ <configuration>
+ <java>
+ <googleJavaFormat>
+ <version>1.9</version>
+ <style>GOOGLE</style>
+ </googleJavaFormat>
+ </java>
+ </configuration>
+ </plugin>
+
</plugins>
</build>
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
index ecabe0ee5c..c7a390f00d 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
@@ -16,12 +16,11 @@
package com.rabbitmq.stream;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
import com.rabbitmq.stream.codec.WrapperMessageBuilder;
import com.rabbitmq.stream.impl.Client;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
@@ -29,88 +28,100 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
public class FailureTest {
- TestUtils.ClientFactory cf;
- String stream;
- ExecutorService executorService;
+ TestUtils.ClientFactory cf;
+ String stream;
+ ExecutorService executorService;
- static void wait(Duration duration) {
- try {
- Thread.sleep(duration.toMillis());
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
+ static void wait(Duration duration) {
+ try {
+ Thread.sleep(duration.toMillis());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
+ }
- @AfterEach
- void tearDown() {
- if (executorService != null) {
- executorService.shutdownNow();
- }
+ @AfterEach
+ void tearDown() {
+ if (executorService != null) {
+ executorService.shutdownNow();
}
-
- @Test
- void leaderFailureWhenPublisherConnectedToReplica() throws Exception {
- Set<String> messages = new HashSet<>();
- Client client = cf.get(new Client.ClientParameters()
- .port(TestUtils.streamPortNode1())
- );
- Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
- Client.StreamMetadata streamMetadata = metadata.get(stream);
- assertThat(streamMetadata).isNotNull();
-
- assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
- assertThat(streamMetadata.getReplicas()).isNotEmpty();
- Client.Broker replica = streamMetadata.getReplicas().get(0);
- assertThat(replica.getPort()).isNotEqualTo(TestUtils.streamPortNode1());
-
- AtomicReference<CountDownLatch> confirmLatch = new AtomicReference<>(new CountDownLatch(1));
-
- CountDownLatch metadataLatch = new CountDownLatch(1);
- Client publisher = cf.get(new Client.ClientParameters()
+ }
+
+ @Test
+ void leaderFailureWhenPublisherConnectedToReplica() throws Exception {
+ Set<String> messages = new HashSet<>();
+ Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
+ Client.StreamMetadata streamMetadata = metadata.get(stream);
+ assertThat(streamMetadata).isNotNull();
+
+ assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
+ assertThat(streamMetadata.getReplicas()).isNotEmpty();
+ Client.Broker replica = streamMetadata.getReplicas().get(0);
+ assertThat(replica.getPort()).isNotEqualTo(TestUtils.streamPortNode1());
+
+ AtomicReference<CountDownLatch> confirmLatch = new AtomicReference<>(new CountDownLatch(1));
+
+ CountDownLatch metadataLatch = new CountDownLatch(1);
+ Client publisher =
+ cf.get(
+ new Client.ClientParameters()
.port(replica.getPort())
.metadataListener((stream, code) -> metadataLatch.countDown())
- .publishConfirmListener((publisherId, publishingId) -> confirmLatch.get().countDown()));
- String message = "all nodes available";
- messages.add(message);
- publisher.publish(stream, (byte) 1,
- Collections.singletonList(publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build()));
- assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
- confirmLatch.set(null);
-
- try {
- Host.rabbitmqctl("stop_app");
- try {
- cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
- fail("Node app stopped, connecting should not be possible");
- } catch (Exception e) {
- // OK
- }
-
- assertThat(metadataLatch.await(10, TimeUnit.SECONDS)).isTrue();
-
- // wait until there's a new leader
- TestUtils.waitAtMost(Duration.ofSeconds(10), () -> {
- Client.StreamMetadata m = publisher.metadata(stream).get(stream);
- return m.getLeader() != null && m.getLeader().getPort() != TestUtils.streamPortNode1();
- });
-
- confirmLatch.set(new CountDownLatch(1));
- message = "2 nodes available";
- messages.add(message);
- publisher.publish(stream, (byte) 1, Collections.singletonList(publisher.messageBuilder()
- .addData(message.getBytes(StandardCharsets.UTF_8)).build()));
- assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
- confirmLatch.set(null);
- } finally {
- Host.rabbitmqctl("start_app");
- }
+ .publishConfirmListener(
+ (publisherId, publishingId) -> confirmLatch.get().countDown()));
+ String message = "all nodes available";
+ messages.add(message);
+ publisher.publish(
+ stream,
+ (byte) 1,
+ Collections.singletonList(
+ publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build()));
+ assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
+ confirmLatch.set(null);
+
+ try {
+ Host.rabbitmqctl("stop_app");
+ try {
+ cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ fail("Node app stopped, connecting should not be possible");
+ } catch (Exception e) {
+ // OK
+ }
+
+ assertThat(metadataLatch.await(10, TimeUnit.SECONDS)).isTrue();
+
+ // wait until there's a new leader
+ TestUtils.waitAtMost(
+ Duration.ofSeconds(10),
+ () -> {
+ Client.StreamMetadata m = publisher.metadata(stream).get(stream);
+ return m.getLeader() != null && m.getLeader().getPort() != TestUtils.streamPortNode1();
+ });
+
+ confirmLatch.set(new CountDownLatch(1));
+ message = "2 nodes available";
+ messages.add(message);
+ publisher.publish(
+ stream,
+ (byte) 1,
+ Collections.singletonList(
+ publisher
+ .messageBuilder()
+ .addData(message.getBytes(StandardCharsets.UTF_8))
+ .build()));
+ assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
+ confirmLatch.set(null);
+ } finally {
+ Host.rabbitmqctl("start_app");
+ }
// wait until all the replicas are there
TestUtils.waitAtMost(
@@ -120,350 +131,411 @@ public class FailureTest {
return m.getReplicas().size() == 2;
});
- confirmLatch.set(new CountDownLatch(1));
- message = "all nodes are back";
- messages.add(message);
- publisher.publish(stream, (byte) 1, Collections.singletonList(publisher.messageBuilder()
- .addData(message.getBytes(StandardCharsets.UTF_8)).build()));
- assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
- confirmLatch.set(null);
-
- CountDownLatch consumeLatch = new CountDownLatch(2);
- Set<String> bodies = ConcurrentHashMap.newKeySet();
- Client consumer = cf.get(new Client.ClientParameters()
+ confirmLatch.set(new CountDownLatch(1));
+ message = "all nodes are back";
+ messages.add(message);
+ publisher.publish(
+ stream,
+ (byte) 1,
+ Collections.singletonList(
+ publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build()));
+ assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
+ confirmLatch.set(null);
+
+ CountDownLatch consumeLatch = new CountDownLatch(2);
+ Set<String> bodies = ConcurrentHashMap.newKeySet();
+ Client consumer =
+ cf.get(
+ new Client.ClientParameters()
.port(TestUtils.streamPortNode1())
- .messageListener((subscriptionId, offset, msg) -> {
- bodies.add(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8));
- consumeLatch.countDown();
- }));
-
- TestUtils.waitAtMost(Duration.ofSeconds(5), () -> {
- Client.Response response = consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
- return response.isOk();
- });
- assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
- assertThat(bodies).hasSize(3).contains("all nodes available", "2 nodes available", "all nodes are back");
- }
+ .messageListener(
+ (subscriptionId, offset, msg) -> {
+ bodies.add(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8));
+ consumeLatch.countDown();
+ }));
- @Test
- void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
- executorService = Executors.newCachedThreadPool();
- Client client = cf.get(new Client.ClientParameters()
- .port(TestUtils.streamPortNode1())
- );
- Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
- Client.StreamMetadata streamMetadata = metadata.get(stream);
- assertThat(streamMetadata).isNotNull();
-
- assertThat(streamMetadata.getLeader()).isNotNull();
- assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
-
- Map<Long, Message> published = new ConcurrentHashMap<>();
- Set<Message> confirmed = ConcurrentHashMap.newKeySet();
-
- Client.PublishConfirmListener publishConfirmListener = (publisherId, publishingId) -> {
- Message confirmedMessage;
- int attempts = 0;
- while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) {
- wait(Duration.ofMillis(5));
- attempts++;
- }
- confirmed.add(confirmedMessage);
+ TestUtils.waitAtMost(
+ Duration.ofSeconds(5),
+ () -> {
+ Client.Response response =
+ consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
+ return response.isOk();
+ });
+ assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
+ assertThat(bodies)
+ .hasSize(3)
+ .contains("all nodes available", "2 nodes available", "all nodes are back");
+ }
+
+ @Test
+ void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
+ executorService = Executors.newCachedThreadPool();
+ Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
+ Client.StreamMetadata streamMetadata = metadata.get(stream);
+ assertThat(streamMetadata).isNotNull();
+
+ assertThat(streamMetadata.getLeader()).isNotNull();
+ assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
+
+ Map<Long, Message> published = new ConcurrentHashMap<>();
+ Set<Message> confirmed = ConcurrentHashMap.newKeySet();
+
+ Client.PublishConfirmListener publishConfirmListener =
+ (publisherId, publishingId) -> {
+ Message confirmedMessage;
+ int attempts = 0;
+ while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) {
+ wait(Duration.ofMillis(5));
+ attempts++;
+ }
+ confirmed.add(confirmedMessage);
};
- AtomicLong generation = new AtomicLong(0);
- AtomicLong sequence = new AtomicLong(0);
- AtomicBoolean connected = new AtomicBoolean(true);
- AtomicReference<Client> publisher = new AtomicReference<>();
- CountDownLatch reconnectionLatch = new CountDownLatch(1);
- AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>();
- Client.ShutdownListener shutdownListener = shutdownContext -> {
- if (shutdownContext.getShutdownReason() == Client.ShutdownContext.ShutdownReason.UNKNOWN) {
- // avoid long-running task in the IO thread
- executorService.submit(() -> {
- connected.set(false);
-
- Client locator = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2()));
- // wait until there's a new leader
- try {
- TestUtils.waitAtMost(Duration.ofSeconds(5), () -> {
- Client.StreamMetadata m = locator.metadata(stream).get(stream);
- return m.getLeader() != null && m.getLeader().getPort() != TestUtils.streamPortNode1();
+ AtomicLong generation = new AtomicLong(0);
+ AtomicLong sequence = new AtomicLong(0);
+ AtomicBoolean connected = new AtomicBoolean(true);
+ AtomicReference<Client> publisher = new AtomicReference<>();
+ CountDownLatch reconnectionLatch = new CountDownLatch(1);
+ AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>();
+ Client.ShutdownListener shutdownListener =
+ shutdownContext -> {
+ if (shutdownContext.getShutdownReason()
+ == Client.ShutdownContext.ShutdownReason.UNKNOWN) {
+ // avoid long-running task in the IO thread
+ executorService.submit(
+ () -> {
+ connected.set(false);
+
+ Client locator =
+ cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2()));
+ // wait until there's a new leader
+ try {
+ TestUtils.waitAtMost(
+ Duration.ofSeconds(5),
+ () -> {
+ Client.StreamMetadata m = locator.metadata(stream).get(stream);
+ return m.getLeader() != null
+ && m.getLeader().getPort() != TestUtils.streamPortNode1();
});
- } catch (Throwable e) {
- reconnectionLatch.countDown();
- return;
- }
-
- int newLeaderPort = locator.metadata(stream).get(stream).getLeader().getPort();
- Client newPublisher = cf.get(new Client.ClientParameters()
- .port(newLeaderPort)
- .shutdownListener(shutdownListenerReference.get())
- .publishConfirmListener(publishConfirmListener)
- );
-
- generation.incrementAndGet();
- published.clear();
- publisher.set(newPublisher);
- connected.set(true);
-
+ } catch (Throwable e) {
reconnectionLatch.countDown();
+ return;
+ }
+
+ int newLeaderPort = locator.metadata(stream).get(stream).getLeader().getPort();
+ Client newPublisher =
+ cf.get(
+ new Client.ClientParameters()
+ .port(newLeaderPort)
+ .shutdownListener(shutdownListenerReference.get())
+ .publishConfirmListener(publishConfirmListener));
+
+ generation.incrementAndGet();
+ published.clear();
+ publisher.set(newPublisher);
+ connected.set(true);
+
+ reconnectionLatch.countDown();
});
- }
+ }
};
- shutdownListenerReference.set(shutdownListener);
+ shutdownListenerReference.set(shutdownListener);
- client = cf.get(new Client.ClientParameters()
+ client =
+ cf.get(
+ new Client.ClientParameters()
.port(streamMetadata.getLeader().getPort())
.shutdownListener(shutdownListener)
.publishConfirmListener(publishConfirmListener));
- publisher.set(client);
-
- AtomicBoolean keepPublishing = new AtomicBoolean(true);
-
- executorService.submit(() -> {
- while (keepPublishing.get()) {
- if (connected.get()) {
- Message message = publisher.get().messageBuilder()
- .properties().messageId(sequence.getAndIncrement())
- .messageBuilder().applicationProperties().entry("generation", generation.get())
- .messageBuilder().build();
- try {
- long publishingId = publisher.get().publish(stream, (byte) 1, Collections.singletonList(message)).get(0);
- published.put(publishingId, message);
- } catch (Exception e) {
- // keep going
- }
- wait(Duration.ofMillis(10));
- } else {
- wait(Duration.ofSeconds(1));
- }
+ publisher.set(client);
+
+ AtomicBoolean keepPublishing = new AtomicBoolean(true);
+
+ executorService.submit(
+ () -> {
+ while (keepPublishing.get()) {
+ if (connected.get()) {
+ Message message =
+ publisher
+ .get()
+ .messageBuilder()
+ .properties()
+ .messageId(sequence.getAndIncrement())
+ .messageBuilder()
+ .applicationProperties()
+ .entry("generation", generation.get())
+ .messageBuilder()
+ .build();
+ try {
+ long publishingId =
+ publisher
+ .get()
+ .publish(stream, (byte) 1, Collections.singletonList(message))
+ .get(0);
+ published.put(publishingId, message);
+ } catch (Exception e) {
+ // keep going
+ }
+ wait(Duration.ofMillis(10));
+ } else {
+ wait(Duration.ofSeconds(1));
}
+ }
});
- // let's publish for a bit of time
- Thread.sleep(2000);
+ // let's publish for a bit of time
+ Thread.sleep(2000);
- assertThat(confirmed).isNotEmpty();
- int confirmedCount = confirmed.size();
+ assertThat(confirmed).isNotEmpty();
+ int confirmedCount = confirmed.size();
- try {
- Host.rabbitmqctl("stop_app");
+ try {
+ Host.rabbitmqctl("stop_app");
- assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue();
+ assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue();
- // let's publish for a bit of time
- Thread.sleep(2000);
+ // let's publish for a bit of time
+ Thread.sleep(2000);
- } finally {
- Host.rabbitmqctl("start_app");
- }
- assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
- confirmedCount = confirmed.size();
+ } finally {
+ Host.rabbitmqctl("start_app");
+ }
+ assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
+ confirmedCount = confirmed.size();
- Client metadataClient = cf.get(new Client.ClientParameters()
- .port(TestUtils.streamPortNode2())
- );
- // wait until all the replicas are there
- TestUtils.waitAtMost(Duration.ofSeconds(5), () -> {
- Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
- return m.getReplicas().size() == 2;
+ Client metadataClient = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2()));
+ // wait until all the replicas are there
+ TestUtils.waitAtMost(
+ Duration.ofSeconds(5),
+ () -> {
+ Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
+ return m.getReplicas().size() == 2;
});
- // let's publish for a bit of time
- Thread.sleep(2000);
+ // let's publish for a bit of time
+ Thread.sleep(2000);
- assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
+ assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
- keepPublishing.set(false);
+ keepPublishing.set(false);
- Queue<Message> consumed = new ConcurrentLinkedQueue<>();
- Set<Long> generations = ConcurrentHashMap.newKeySet();
- CountDownLatch consumedLatch = new CountDownLatch(1);
- Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
- Client consumer = cf.get(new Client.ClientParameters()
+ Queue<Message> consumed = new ConcurrentLinkedQueue<>();
+ Set<Long> generations = ConcurrentHashMap.newKeySet();
+ CountDownLatch consumedLatch = new CountDownLatch(1);
+ Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
+ Client consumer =
+ cf.get(
+ new Client.ClientParameters()
.port(m.getReplicas().get(0).getPort())
- .chunkListener((client1, subscriptionId, offset, messageCount, dataSize) -> client1.credit(subscriptionId, 1))
- .messageListener((subscriptionId, offset, message) -> {
- consumed.add(message);
- generations.add((Long) message.getApplicationProperties().get("generation"));
- if (consumed.size() == confirmed.size()) {
+ .chunkListener(
+ (client1, subscriptionId, offset, messageCount, dataSize) ->
+ client1.credit(subscriptionId, 1))
+ .messageListener(
+ (subscriptionId, offset, message) -> {
+ consumed.add(message);
+ generations.add((Long) message.getApplicationProperties().get("generation"));
+ if (consumed.size() == confirmed.size()) {
consumedLatch.countDown();
- }
- })
- );
-
- Client.Response response = consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
- assertThat(response.isOk()).isTrue();
-
- assertThat(consumedLatch.await(5, TimeUnit.SECONDS)).isTrue();
- assertThat(generations).hasSize(2).contains(0L, 1L);
- assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size());
- long lastMessageId = -1;
- for (Message message : consumed) {
- long messageId = message.getProperties().getMessageIdAsLong();
- assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId);
- lastMessageId = messageId;
- }
- assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get());
+ }
+ }));
+
+ Client.Response response =
+ consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
+ assertThat(response.isOk()).isTrue();
+
+ assertThat(consumedLatch.await(5, TimeUnit.SECONDS)).isTrue();
+ assertThat(generations).hasSize(2).contains(0L, 1L);
+ assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size());
+ long lastMessageId = -1;
+ for (Message message : consumed) {
+ long messageId = message.getProperties().getMessageIdAsLong();
+ assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId);
+ lastMessageId = messageId;
}
-
- @Test
- void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception {
- executorService = Executors.newCachedThreadPool();
- Client metadataClient = cf.get(new Client.ClientParameters()
- .port(TestUtils.streamPortNode1())
- );
- Map<String, Client.StreamMetadata> metadata = metadataClient.metadata(stream);
- Client.StreamMetadata streamMetadata = metadata.get(stream);
- assertThat(streamMetadata).isNotNull();
-
- assertThat(streamMetadata.getLeader()).isNotNull();
- assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
-
- Map<Long, Message> published = new ConcurrentHashMap<>();
- Set<Message> confirmed = ConcurrentHashMap.newKeySet();
- Set<Long> confirmedIds = ConcurrentHashMap.newKeySet();
- Client.PublishConfirmListener publishConfirmListener = (publisherId, publishingId) -> {
- Message confirmedMessage;
- int attempts = 0;
- while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) {
- wait(Duration.ofMillis(5));
- attempts++;
- }
- confirmed.add(confirmedMessage);
- confirmedIds.add(confirmedMessage.getProperties().getMessageIdAsLong());
+ assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get());
+ }
+
+ @Test
+ void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception {
+ executorService = Executors.newCachedThreadPool();
+ Client metadataClient = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ Map<String, Client.StreamMetadata> metadata = metadataClient.metadata(stream);
+ Client.StreamMetadata streamMetadata = metadata.get(stream);
+ assertThat(streamMetadata).isNotNull();
+
+ assertThat(streamMetadata.getLeader()).isNotNull();
+ assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
+
+ Map<Long, Message> published = new ConcurrentHashMap<>();
+ Set<Message> confirmed = ConcurrentHashMap.newKeySet();
+ Set<Long> confirmedIds = ConcurrentHashMap.newKeySet();
+ Client.PublishConfirmListener publishConfirmListener =
+ (publisherId, publishingId) -> {
+ Message confirmedMessage;
+ int attempts = 0;
+ while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) {
+ wait(Duration.ofMillis(5));
+ attempts++;
+ }
+ confirmed.add(confirmedMessage);
+ confirmedIds.add(confirmedMessage.getProperties().getMessageIdAsLong());
};
- Client publisher = cf.get(new Client.ClientParameters()
+ Client publisher =
+ cf.get(
+ new Client.ClientParameters()
.port(streamMetadata.getLeader().getPort())
- .publishConfirmListener(publishConfirmListener)
- );
-
- AtomicLong generation = new AtomicLong(0);
- AtomicLong sequence = new AtomicLong(0);
- AtomicBoolean keepPublishing = new AtomicBoolean(true);
- CountDownLatch publishingLatch = new CountDownLatch(1);
-
- executorService.submit(() -> {
- while (keepPublishing.get()) {
- Message message = new WrapperMessageBuilder()
- .properties().messageId(sequence.getAndIncrement())
- .messageBuilder().applicationProperties().entry("generation", generation.get())
- .messageBuilder().build();
- try {
- long publishingId = publisher.publish(stream, (byte) 1, Collections.singletonList(message)).get(0);
- published.put(publishingId, message);
- } catch (Exception e) {
- // keep going
- }
- wait(Duration.ofMillis(10));
+ .publishConfirmListener(publishConfirmListener));
+
+ AtomicLong generation = new AtomicLong(0);
+ AtomicLong sequence = new AtomicLong(0);
+ AtomicBoolean keepPublishing = new AtomicBoolean(true);
+ CountDownLatch publishingLatch = new CountDownLatch(1);
+
+ executorService.submit(
+ () -> {
+ while (keepPublishing.get()) {
+ Message message =
+ new WrapperMessageBuilder()
+ .properties()
+ .messageId(sequence.getAndIncrement())
+ .messageBuilder()
+ .applicationProperties()
+ .entry("generation", generation.get())
+ .messageBuilder()
+ .build();
+ try {
+ long publishingId =
+ publisher.publish(stream, (byte) 1, Collections.singletonList(message)).get(0);
+ published.put(publishingId, message);
+ } catch (Exception e) {
+ // keep going
}
- publishingLatch.countDown();
+ wait(Duration.ofMillis(10));
+ }
+ publishingLatch.countDown();
});
- Queue<Message> consumed = new ConcurrentLinkedQueue<>();
-
- Client.Broker replica = streamMetadata.getReplicas().stream()
- .filter(broker -> broker.getPort() == TestUtils.streamPortNode2())
- .findFirst()
- .orElseThrow(() -> new NoSuchElementException());
-
- AtomicLong lastProcessedOffset = new AtomicLong(-1);
- Set<Long> generations = ConcurrentHashMap.newKeySet();
- Set<Long> consumedIds = ConcurrentHashMap.newKeySet();
- Client.MessageListener messageListener = (subscriptionId, offset, message) -> {
- consumed.add(message);
- generations.add((Long) message.getApplicationProperties().get("generation"));
- consumedIds.add(message.getProperties().getMessageIdAsLong());
- lastProcessedOffset.set(offset);
+ Queue<Message> consumed = new ConcurrentLinkedQueue<>();
+
+ Client.Broker replica =
+ streamMetadata.getReplicas().stream()
+ .filter(broker -> broker.getPort() == TestUtils.streamPortNode2())
+ .findFirst()
+ .orElseThrow(() -> new NoSuchElementException());
+
+ AtomicLong lastProcessedOffset = new AtomicLong(-1);
+ Set<Long> generations = ConcurrentHashMap.newKeySet();
+ Set<Long> consumedIds = ConcurrentHashMap.newKeySet();
+ Client.MessageListener messageListener =
+ (subscriptionId, offset, message) -> {
+ consumed.add(message);
+ generations.add((Long) message.getApplicationProperties().get("generation"));
+ consumedIds.add(message.getProperties().getMessageIdAsLong());
+ lastProcessedOffset.set(offset);
};
- CountDownLatch reconnectionLatch = new CountDownLatch(1);
- AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>();
- Client.ShutdownListener shutdownListener = shutdownContext -> {
- if (shutdownContext.getShutdownReason() == Client.ShutdownContext.ShutdownReason.UNKNOWN) {
- // avoid long-running task in the IO thread
- executorService.submit(() -> {
- Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
- int newReplicaPort = m.getReplicas().get(0).getPort();
-
- Client newConsumer = cf.get(new Client.ClientParameters()
- .port(newReplicaPort)
- .shutdownListener(shutdownListenerReference.get())
- .chunkListener((client1, subscriptionId, offset, messageCount, dataSize) -> client1.credit(subscriptionId, 1))
- .messageListener(messageListener)
- );
-
- newConsumer.subscribe((byte) 1, stream, OffsetSpecification.offset(lastProcessedOffset.get() + 1), 10);
-
- generation.incrementAndGet();
- reconnectionLatch.countDown();
+ CountDownLatch reconnectionLatch = new CountDownLatch(1);
+ AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>();
+ Client.ShutdownListener shutdownListener =
+ shutdownContext -> {
+ if (shutdownContext.getShutdownReason()
+ == Client.ShutdownContext.ShutdownReason.UNKNOWN) {
+ // avoid long-running task in the IO thread
+ executorService.submit(
+ () -> {
+ Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
+ int newReplicaPort = m.getReplicas().get(0).getPort();
+
+ Client newConsumer =
+ cf.get(
+ new Client.ClientParameters()
+ .port(newReplicaPort)
+ .shutdownListener(shutdownListenerReference.get())
+ .chunkListener(
+ (client1, subscriptionId, offset, messageCount, dataSize) ->
+ client1.credit(subscriptionId, 1))
+ .messageListener(messageListener));
+
+ newConsumer.subscribe(
+ (byte) 1,
+ stream,
+ OffsetSpecification.offset(lastProcessedOffset.get() + 1),
+ 10);
+
+ generation.incrementAndGet();
+ reconnectionLatch.countDown();
});
- }
+ }
};
- shutdownListenerReference.set(shutdownListener);
+ shutdownListenerReference.set(shutdownListener);
- Client consumer = cf.get(new Client.ClientParameters()
+ Client consumer =
+ cf.get(
+ new Client.ClientParameters()
.port(replica.getPort())
.shutdownListener(shutdownListener)
- .chunkListener((client1, subscriptionId, offset, messageCount, dataSize) -> client1.credit(subscriptionId, 1))
- .messageListener(messageListener)
- );
+ .chunkListener(
+ (client1, subscriptionId, offset, messageCount, dataSize) ->
+ client1.credit(subscriptionId, 1))
+ .messageListener(messageListener));
- Client.Response response = consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
- assertThat(response.isOk()).isTrue();
+ Client.Response response =
+ consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
+ assertThat(response.isOk()).isTrue();
- // let's publish for a bit of time
- Thread.sleep(2000);
+ // let's publish for a bit of time
+ Thread.sleep(2000);
- assertThat(confirmed).isNotEmpty();
- assertThat(consumed).isNotEmpty();
- int confirmedCount = confirmed.size();
+ assertThat(confirmed).isNotEmpty();
+ assertThat(consumed).isNotEmpty();
+ int confirmedCount = confirmed.size();
- try {
- Host.rabbitmqctl("stop_app", Host.node2name());
+ try {
+ Host.rabbitmqctl("stop_app", Host.node2name());
- assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue();
+ assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue();
- // let's publish for a bit of time
- Thread.sleep(2000);
+ // let's publish for a bit of time
+ Thread.sleep(2000);
- } finally {
- Host.rabbitmqctl("start_app", Host.node2name());
- }
- assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
- confirmedCount = confirmed.size();
+ } finally {
+ Host.rabbitmqctl("start_app", Host.node2name());
+ }
+ assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
+ confirmedCount = confirmed.size();
- // wait until all the replicas are there
- TestUtils.waitAtMost(Duration.ofSeconds(5), () -> {
- Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
- return m.getReplicas().size() == 2;
+ // wait until all the replicas are there
+ TestUtils.waitAtMost(
+ Duration.ofSeconds(5),
+ () -> {
+ Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
+ return m.getReplicas().size() == 2;
});
- // let's publish for a bit of time
- Thread.sleep(2000);
-
- assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
+ // let's publish for a bit of time
+ Thread.sleep(2000);
- keepPublishing.set(false);
+ assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
- assertThat(publishingLatch.await(5, TimeUnit.SECONDS)).isTrue();
+ keepPublishing.set(false);
- TestUtils.waitAtMost(Duration.ofSeconds(5), () -> consumed.size() >= confirmed.size());
+ assertThat(publishingLatch.await(5, TimeUnit.SECONDS)).isTrue();
- assertThat(generations).hasSize(2).contains(0L, 1L);
- assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size());
- long lastMessageId = -1;
- for (Message message : consumed) {
- long messageId = message.getProperties().getMessageIdAsLong();
- assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId);
- lastMessageId = messageId;
- }
- assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get());
+ TestUtils.waitAtMost(Duration.ofSeconds(5), () -> consumed.size() >= confirmed.size());
- confirmedIds.forEach(confirmedId -> assertThat(consumedIds).contains(confirmedId));
+ assertThat(generations).hasSize(2).contains(0L, 1L);
+ assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size());
+ long lastMessageId = -1;
+ for (Message message : consumed) {
+ long messageId = message.getProperties().getMessageIdAsLong();
+ assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId);
+ lastMessageId = messageId;
}
+ assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get());
+ confirmedIds.forEach(confirmedId -> assertThat(consumedIds).contains(confirmedId));
+ }
}
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java
index 1c89f5d165..0134038a8b 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java
@@ -25,88 +25,93 @@ import java.net.UnknownHostException;
public class Host {
- private static String capture(InputStream is)
- throws IOException {
- BufferedReader br = new BufferedReader(new InputStreamReader(is));
- String line;
- StringBuilder buff = new StringBuilder();
- while ((line = br.readLine()) != null) {
- buff.append(line).append("\n");
- }
- return buff.toString();
+ private static String capture(InputStream is) throws IOException {
+ BufferedReader br = new BufferedReader(new InputStreamReader(is));
+ String line;
+ StringBuilder buff = new StringBuilder();
+ while ((line = br.readLine()) != null) {
+ buff.append(line).append("\n");
}
+ return buff.toString();
+ }
- private static Process executeCommand(String command) throws IOException {
- Process pr = executeCommandProcess(command);
+ private static Process executeCommand(String command) throws IOException {
+ Process pr = executeCommandProcess(command);
- int ev = waitForExitValue(pr);
- if (ev != 0) {
- String stdout = capture(pr.getInputStream());
- String stderr = capture(pr.getErrorStream());
- throw new IOException("unexpected command exit value: " + ev +
- "\ncommand: " + command + "\n" +
- "\nstdout:\n" + stdout +
- "\nstderr:\n" + stderr + "\n");
- }
- return pr;
+ int ev = waitForExitValue(pr);
+ if (ev != 0) {
+ String stdout = capture(pr.getInputStream());
+ String stderr = capture(pr.getErrorStream());
+ throw new IOException(
+ "unexpected command exit value: "
+ + ev
+ + "\ncommand: "
+ + command
+ + "\n"
+ + "\nstdout:\n"
+ + stdout
+ + "\nstderr:\n"
+ + stderr
+ + "\n");
}
+ return pr;
+ }
- private static int waitForExitValue(Process pr) {
- while (true) {
- try {
- pr.waitFor();
- break;
- } catch (InterruptedException ignored) {
- }
- }
- return pr.exitValue();
+ private static int waitForExitValue(Process pr) {
+ while (true) {
+ try {
+ pr.waitFor();
+ break;
+ } catch (InterruptedException ignored) {
+ }
}
+ return pr.exitValue();
+ }
- private static Process executeCommandProcess(String command) throws IOException {
- String[] finalCommand;
- if (System.getProperty("os.name").toLowerCase().contains("windows")) {
- finalCommand = new String[4];
- finalCommand[0] = "C:\\winnt\\system32\\cmd.exe";
- finalCommand[1] = "/y";
- finalCommand[2] = "/c";
- finalCommand[3] = command;
- } else {
- finalCommand = new String[3];
- finalCommand[0] = "/bin/sh";
- finalCommand[1] = "-c";
- finalCommand[2] = command;
- }
- return Runtime.getRuntime().exec(finalCommand);
+ private static Process executeCommandProcess(String command) throws IOException {
+ String[] finalCommand;
+ if (System.getProperty("os.name").toLowerCase().contains("windows")) {
+ finalCommand = new String[4];
+ finalCommand[0] = "C:\\winnt\\system32\\cmd.exe";
+ finalCommand[1] = "/y";
+ finalCommand[2] = "/c";
+ finalCommand[3] = command;
+ } else {
+ finalCommand = new String[3];
+ finalCommand[0] = "/bin/sh";
+ finalCommand[1] = "-c";
+ finalCommand[2] = command;
}
+ return Runtime.getRuntime().exec(finalCommand);
+ }
- public static Process rabbitmqctl(String command) throws IOException {
- return rabbitmqctl(command, node1name());
- }
-
- public static Process rabbitmqctl(String command, String nodename) throws IOException {
- return executeCommand(rabbitmqctlCommand() +
- " -n '" + nodename + "'" +
- " " + command);
- }
+ public static Process rabbitmqctl(String command) throws IOException {
+ return rabbitmqctl(command, node1name());
+ }
- public static String node1name() {
- try {
- return System.getProperty("node1.name", "rabbit-1@" + InetAddress.getLocalHost().getHostName());
- } catch (UnknownHostException e) {
- throw new RuntimeException(e);
- }
- }
+ public static Process rabbitmqctl(String command, String nodename) throws IOException {
+ return executeCommand(rabbitmqctlCommand() + " -n '" + nodename + "'" + " " + command);
+ }
- public static String node2name() {
- try {
- return System.getProperty("node2.name", "rabbit-2@" + InetAddress.getLocalHost().getHostName());
- } catch (UnknownHostException e) {
- throw new RuntimeException(e);
- }
+ public static String node1name() {
+ try {
+ return System.getProperty(
+ "node1.name", "rabbit-1@" + InetAddress.getLocalHost().getHostName());
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
}
+ }
- static String rabbitmqctlCommand() {
- return System.getProperty("rabbitmqctl.bin");
+ public static String node2name() {
+ try {
+ return System.getProperty(
+ "node2.name", "rabbit-2@" + InetAddress.getLocalHost().getHostName());
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
}
+ }
+ static String rabbitmqctlCommand() {
+ return System.getProperty("rabbitmqctl.bin");
+ }
}
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java
index ac5ba238e7..08024a12bf 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java
@@ -16,13 +16,9 @@
package com.rabbitmq.stream;
-import com.rabbitmq.stream.impl.Client;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
+import static org.assertj.core.api.Assertions.assertThat;
+import com.rabbitmq.stream.impl.Client;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -32,106 +28,146 @@ import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.IntStream;
import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.assertThat;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
public class StreamTest {
- String stream;
- TestUtils.ClientFactory cf;
-
- static Stream<Arguments> shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember() {
- return Stream.of(
- brokers("leader", metadata -> metadata.getLeader(), "leader", metadata -> metadata.getLeader()),
- brokers("leader", metadata -> metadata.getLeader(), "replica", metadata -> metadata.getReplicas().iterator().next()),
- brokers("replica", metadata -> metadata.getReplicas().iterator().next(), "leader", metadata -> metadata.getLeader()),
- brokers("replica", metadata -> new ArrayList<>(metadata.getReplicas()).get(0), "replica", metadata -> new ArrayList<>(metadata.getReplicas()).get(1))
- );
- }
-
- static Arguments brokers(String dp, Function<Client.StreamMetadata, Client.Broker> publisherBroker,
- String dc, Function<Client.StreamMetadata, Client.Broker> consumerBroker) {
- return Arguments.of(new FunctionWithToString<>(dp, publisherBroker), new FunctionWithToString<>(dc, consumerBroker));
- }
-
- @ParameterizedTest
- @MethodSource
- void shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember(Function<Client.StreamMetadata, Client.Broker> publisherBroker,
- Function<Client.StreamMetadata, Client.Broker> consumerBroker) throws Exception {
-
- int messageCount = 10_000;
- Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
- Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
- assertThat(metadata).hasSize(1).containsKey(stream);
- Client.StreamMetadata streamMetadata = metadata.get(stream);
-
- CountDownLatch publishingLatch = new CountDownLatch(messageCount);
- Client publisher = cf.get(new Client.ClientParameters()
+ String stream;
+ TestUtils.ClientFactory cf;
+
+ static Stream<Arguments> shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember() {
+ return Stream.of(
+ brokers(
+ "leader", metadata -> metadata.getLeader(), "leader", metadata -> metadata.getLeader()),
+ brokers(
+ "leader",
+ metadata -> metadata.getLeader(),
+ "replica",
+ metadata -> metadata.getReplicas().iterator().next()),
+ brokers(
+ "replica",
+ metadata -> metadata.getReplicas().iterator().next(),
+ "leader",
+ metadata -> metadata.getLeader()),
+ brokers(
+ "replica",
+ metadata -> new ArrayList<>(metadata.getReplicas()).get(0),
+ "replica",
+ metadata -> new ArrayList<>(metadata.getReplicas()).get(1)));
+ }
+
+ static Arguments brokers(
+ String dp,
+ Function<Client.StreamMetadata, Client.Broker> publisherBroker,
+ String dc,
+ Function<Client.StreamMetadata, Client.Broker> consumerBroker) {
+ return Arguments.of(
+ new FunctionWithToString<>(dp, publisherBroker),
+ new FunctionWithToString<>(dc, consumerBroker));
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember(
+ Function<Client.StreamMetadata, Client.Broker> publisherBroker,
+ Function<Client.StreamMetadata, Client.Broker> consumerBroker)
+ throws Exception {
+
+ int messageCount = 10_000;
+ Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
+ assertThat(metadata).hasSize(1).containsKey(stream);
+ Client.StreamMetadata streamMetadata = metadata.get(stream);
+
+ CountDownLatch publishingLatch = new CountDownLatch(messageCount);
+ Client publisher =
+ cf.get(
+ new Client.ClientParameters()
.port(publisherBroker.apply(streamMetadata).getPort())
- .publishConfirmListener((publisherId, publishingId) -> publishingLatch.countDown()));
-
- IntStream.range(0, messageCount).forEach(i -> publisher.publish(stream, (byte) 1, Collections.singletonList(
- publisher.messageBuilder().addData(("hello " + i).getBytes(StandardCharsets.UTF_8)).build())));
-
- assertThat(publishingLatch.await(10, TimeUnit.SECONDS)).isTrue();
-
- CountDownLatch consumingLatch = new CountDownLatch(messageCount);
- Set<String> bodies = ConcurrentHashMap.newKeySet(messageCount);
- Client consumer = cf.get(new Client.ClientParameters()
+ .publishConfirmListener(
+ (publisherId, publishingId) -> publishingLatch.countDown()));
+
+ IntStream.range(0, messageCount)
+ .forEach(
+ i ->
+ publisher.publish(
+ stream,
+ (byte) 1,
+ Collections.singletonList(
+ publisher
+ .messageBuilder()
+ .addData(("hello " + i).getBytes(StandardCharsets.UTF_8))
+ .build())));
+
+ assertThat(publishingLatch.await(10, TimeUnit.SECONDS)).isTrue();
+
+ CountDownLatch consumingLatch = new CountDownLatch(messageCount);
+ Set<String> bodies = ConcurrentHashMap.newKeySet(messageCount);
+ Client consumer =
+ cf.get(
+ new Client.ClientParameters()
.port(consumerBroker.apply(streamMetadata).getPort())
- .chunkListener((client1, subscriptionId, offset, messageCount1, dataSize) -> client1.credit(subscriptionId, 10))
- .messageListener((subscriptionId, offset, message) -> {
- bodies.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8));
- consumingLatch.countDown();
- })
- );
-
- consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
-
- assertThat(consumingLatch.await(10, TimeUnit.SECONDS)).isTrue();
- assertThat(bodies).hasSize(messageCount);
- IntStream.range(0, messageCount).forEach(i -> assertThat(bodies.contains("hello " + i)));
- }
-
- @Test
- void metadataOnClusterShouldReturnLeaderAndReplicas() {
- Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
- Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
- assertThat(metadata).hasSize(1).containsKey(stream);
- Client.StreamMetadata streamMetadata = metadata.get(stream);
- assertThat(streamMetadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
- assertThat(streamMetadata.getReplicas()).hasSize(2);
-
- BiConsumer<Client.Broker, Client.Broker> assertNodesAreDifferent = (node, anotherNode) -> {
- assertThat(node.getHost()).isEqualTo(anotherNode.getHost());
- assertThat(node.getPort()).isNotEqualTo(anotherNode.getPort());
+ .chunkListener(
+ (client1, subscriptionId, offset, messageCount1, dataSize) ->
+ client1.credit(subscriptionId, 10))
+ .messageListener(
+ (subscriptionId, offset, message) -> {
+ bodies.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8));
+ consumingLatch.countDown();
+ }));
+
+ consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
+
+ assertThat(consumingLatch.await(10, TimeUnit.SECONDS)).isTrue();
+ assertThat(bodies).hasSize(messageCount);
+ IntStream.range(0, messageCount).forEach(i -> assertThat(bodies.contains("hello " + i)));
+ }
+
+ @Test
+ void metadataOnClusterShouldReturnLeaderAndReplicas() {
+ Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
+ Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
+ assertThat(metadata).hasSize(1).containsKey(stream);
+ Client.StreamMetadata streamMetadata = metadata.get(stream);
+ assertThat(streamMetadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
+ assertThat(streamMetadata.getReplicas()).hasSize(2);
+
+ BiConsumer<Client.Broker, Client.Broker> assertNodesAreDifferent =
+ (node, anotherNode) -> {
+ assertThat(node.getHost()).isEqualTo(anotherNode.getHost());
+ assertThat(node.getPort()).isNotEqualTo(anotherNode.getPort());
};
- streamMetadata.getReplicas().forEach(replica -> assertNodesAreDifferent.accept(replica, streamMetadata.getLeader()));
- List<Client.Broker> replicas = new ArrayList<>(streamMetadata.getReplicas());
- assertNodesAreDifferent.accept(replicas.get(0), replicas.get(1));
- }
+ streamMetadata
+ .getReplicas()
+ .forEach(replica -> assertNodesAreDifferent.accept(replica, streamMetadata.getLeader()));
+ List<Client.Broker> replicas = new ArrayList<>(streamMetadata.getReplicas());
+ assertNodesAreDifferent.accept(replicas.get(0), replicas.get(1));
+ }
- static class FunctionWithToString<T, R> implements Function<T, R> {
+ static class FunctionWithToString<T, R> implements Function<T, R> {
- final String toString;
- final Function<T, R> delegate;
+ final String toString;
+ final Function<T, R> delegate;
- FunctionWithToString(String toString, Function<T, R> delegate) {
- this.toString = toString;
- this.delegate = delegate;
- }
-
- @Override
- public R apply(T t) {
- return delegate.apply(t);
- }
+ FunctionWithToString(String toString, Function<T, R> delegate) {
+ this.toString = toString;
+ this.delegate = delegate;
+ }
- @Override
- public String toString() {
- return toString;
- }
+ @Override
+ public R apply(T t) {
+ return delegate.apply(t);
}
+ @Override
+ public String toString() {
+ return toString;
+ }
+ }
}
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java
index da3eb28f02..c49a8d5832 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java
@@ -16,162 +16,164 @@
package com.rabbitmq.stream;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
import com.rabbitmq.stream.impl.Client;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
-import org.junit.jupiter.api.extension.*;
-
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BooleanSupplier;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.fail;
+import org.junit.jupiter.api.extension.*;
public class TestUtils {
- static int streamPortNode1() {
- String port = System.getProperty("node1.stream.port", "5555");
- return Integer.valueOf(port);
- }
+ static int streamPortNode1() {
+ String port = System.getProperty("node1.stream.port", "5555");
+ return Integer.valueOf(port);
+ }
- static int streamPortNode2() {
- String port = System.getProperty("node2.stream.port", "5556");
- return Integer.valueOf(port);
- }
+ static int streamPortNode2() {
+ String port = System.getProperty("node2.stream.port", "5556");
+ return Integer.valueOf(port);
+ }
- static void waitAtMost(Duration duration, BooleanSupplier condition) throws InterruptedException {
- if (condition.getAsBoolean()) {
- return;
- }
- int waitTime = 100;
- int waitedTime = 0;
- long timeoutInMs = duration.toMillis();
- while (waitedTime <= timeoutInMs) {
- Thread.sleep(waitTime);
- if (condition.getAsBoolean()) {
- return;
- }
- waitedTime += waitTime;
- }
- fail("Waited " + duration.getSeconds() + " second(s), condition never got true");
+ static void waitAtMost(Duration duration, BooleanSupplier condition) throws InterruptedException {
+ if (condition.getAsBoolean()) {
+ return;
}
+ int waitTime = 100;
+ int waitedTime = 0;
+ long timeoutInMs = duration.toMillis();
+ while (waitedTime <= timeoutInMs) {
+ Thread.sleep(waitTime);
+ if (condition.getAsBoolean()) {
+ return;
+ }
+ waitedTime += waitTime;
+ }
+ fail("Waited " + duration.getSeconds() + " second(s), condition never got true");
+ }
- static class StreamTestInfrastructureExtension implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback {
-
- private static final ExtensionContext.Namespace NAMESPACE = ExtensionContext.Namespace.create(StreamTestInfrastructureExtension.class);
-
- private static ExtensionContext.Store store(ExtensionContext extensionContext) {
- return extensionContext.getRoot().getStore(NAMESPACE);
- }
-
- private static EventLoopGroup eventLoopGroup(ExtensionContext context) {
- return (EventLoopGroup) store(context).get("nettyEventLoopGroup");
- }
+ static class StreamTestInfrastructureExtension
+ implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback {
- @Override
- public void beforeAll(ExtensionContext context) {
- store(context).put("nettyEventLoopGroup", new NioEventLoopGroup());
- }
+ private static final ExtensionContext.Namespace NAMESPACE =
+ ExtensionContext.Namespace.create(StreamTestInfrastructureExtension.class);
- @Override
- public void beforeEach(ExtensionContext context) throws Exception {
- try {
- Field streamField = context.getTestInstance().get().getClass().getDeclaredField("eventLoopGroup");
- streamField.setAccessible(true);
- streamField.set(context.getTestInstance().get(), eventLoopGroup(context));
- } catch (NoSuchFieldException e) {
-
- }
- try {
- Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream");
- streamField.setAccessible(true);
- String stream = UUID.randomUUID().toString();
- streamField.set(context.getTestInstance().get(), stream);
- Client client = new Client(new Client.ClientParameters().eventLoopGroup(eventLoopGroup(context))
- .port(streamPortNode1())
- );
- Client.Response response = client.create(stream);
- assertThat(response.isOk()).isTrue();
- client.close();
- store(context).put("testMethodStream", stream);
- } catch (NoSuchFieldException e) {
-
- }
-
- for (Field declaredField : context.getTestInstance().get().getClass().getDeclaredFields()) {
- if (declaredField.getType().equals(ClientFactory.class)) {
- declaredField.setAccessible(true);
- ClientFactory clientFactory = new ClientFactory(eventLoopGroup(context));
- declaredField.set(context.getTestInstance().get(), clientFactory);
- store(context).put("testClientFactory", clientFactory);
- break;
- }
- }
+ private static ExtensionContext.Store store(ExtensionContext extensionContext) {
+ return extensionContext.getRoot().getStore(NAMESPACE);
+ }
- }
+ private static EventLoopGroup eventLoopGroup(ExtensionContext context) {
+ return (EventLoopGroup) store(context).get("nettyEventLoopGroup");
+ }
- @Override
- public void afterEach(ExtensionContext context) throws Exception {
- try {
- Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream");
- streamField.setAccessible(true);
- String stream = (String) streamField.get(context.getTestInstance().get());
- Client client = new Client(new Client.ClientParameters().eventLoopGroup(eventLoopGroup(context))
- .port(streamPortNode1())
- );
- Client.Response response = client.delete(stream);
- assertThat(response.isOk()).isTrue();
- client.close();
- store(context).remove("testMethodStream");
- } catch (NoSuchFieldException e) {
-
- }
-
- ClientFactory clientFactory = (ClientFactory) store(context).get("testClientFactory");
- if (clientFactory != null) {
- clientFactory.close();
- }
- }
+ @Override
+ public void beforeAll(ExtensionContext context) {
+ store(context).put("nettyEventLoopGroup", new NioEventLoopGroup());
+ }
- @Override
- public void afterAll(ExtensionContext context) throws Exception {
- EventLoopGroup eventLoopGroup = eventLoopGroup(context);
- eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS);
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ try {
+ Field streamField =
+ context.getTestInstance().get().getClass().getDeclaredField("eventLoopGroup");
+ streamField.setAccessible(true);
+ streamField.set(context.getTestInstance().get(), eventLoopGroup(context));
+ } catch (NoSuchFieldException e) {
+
+ }
+ try {
+ Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream");
+ streamField.setAccessible(true);
+ String stream = UUID.randomUUID().toString();
+ streamField.set(context.getTestInstance().get(), stream);
+ Client client =
+ new Client(
+ new Client.ClientParameters()
+ .eventLoopGroup(eventLoopGroup(context))
+ .port(streamPortNode1()));
+ Client.Response response = client.create(stream);
+ assertThat(response.isOk()).isTrue();
+ client.close();
+ store(context).put("testMethodStream", stream);
+ } catch (NoSuchFieldException e) {
+
+ }
+
+ for (Field declaredField : context.getTestInstance().get().getClass().getDeclaredFields()) {
+ if (declaredField.getType().equals(ClientFactory.class)) {
+ declaredField.setAccessible(true);
+ ClientFactory clientFactory = new ClientFactory(eventLoopGroup(context));
+ declaredField.set(context.getTestInstance().get(), clientFactory);
+ store(context).put("testClientFactory", clientFactory);
+ break;
}
-
+ }
}
- static class ClientFactory {
+ @Override
+ public void afterEach(ExtensionContext context) throws Exception {
+ try {
+ Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream");
+ streamField.setAccessible(true);
+ String stream = (String) streamField.get(context.getTestInstance().get());
+ Client client =
+ new Client(
+ new Client.ClientParameters()
+ .eventLoopGroup(eventLoopGroup(context))
+ .port(streamPortNode1()));
+ Client.Response response = client.delete(stream);
+ assertThat(response.isOk()).isTrue();
+ client.close();
+ store(context).remove("testMethodStream");
+ } catch (NoSuchFieldException e) {
+
+ }
+
+ ClientFactory clientFactory = (ClientFactory) store(context).get("testClientFactory");
+ if (clientFactory != null) {
+ clientFactory.close();
+ }
+ }
- private final EventLoopGroup eventLoopGroup;
- private final Set<Client> clients = ConcurrentHashMap.newKeySet();
+ @Override
+ public void afterAll(ExtensionContext context) throws Exception {
+ EventLoopGroup eventLoopGroup = eventLoopGroup(context);
+ eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS);
+ }
+ }
+ static class ClientFactory {
- public ClientFactory(EventLoopGroup eventLoopGroup) {
- this.eventLoopGroup = eventLoopGroup;
- }
+ private final EventLoopGroup eventLoopGroup;
+ private final Set<Client> clients = ConcurrentHashMap.newKeySet();
- public Client get() {
- return get(new Client.ClientParameters());
- }
+ public ClientFactory(EventLoopGroup eventLoopGroup) {
+ this.eventLoopGroup = eventLoopGroup;
+ }
- public Client get(Client.ClientParameters parameters) {
- // don't set the port, it would override the caller's port setting
- Client client = new Client(parameters.eventLoopGroup(eventLoopGroup));
- clients.add(client);
- return client;
- }
+ public Client get() {
+ return get(new Client.ClientParameters());
+ }
- private void close() {
- for (Client c : clients) {
- c.close();
- }
- }
+ public Client get(Client.ClientParameters parameters) {
+ // don't set the port, it would override the caller's port setting
+ Client client = new Client(parameters.eventLoopGroup(eventLoopGroup));
+ clients.add(client);
+ return client;
}
+ private void close() {
+ for (Client c : clients) {
+ c.close();
+ }
+ }
+ }
}