diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-09-29 12:05:29 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-09-29 12:05:29 +0200 |
commit | b8bdb5ae7b633ed0c897af67337a27dcde838784 (patch) | |
tree | 8ceb40380d91501dd2d066334def7e68654bf72c | |
parent | 03a11e07713dd8b9ce9a3609bd3fa1a1d7df3327 (diff) | |
download | rabbitmq-server-git-b8bdb5ae7b633ed0c897af67337a27dcde838784.tar.gz |
Bump dependencies in Java test suite
And apply Google Java Format with Maven Spotless plugin.
5 files changed, 785 insertions, 655 deletions
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml index 7bc9e659ed..aa27c29baf 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml @@ -27,13 +27,14 @@ <properties> <stream-client.version>0.1.0-SNAPSHOT</stream-client.version> - <proton-j.version>0.33.5</proton-j.version> - <junit.jupiter.version>5.6.2</junit.jupiter.version> - <assertj.version>3.16.1</assertj.version> - <mockito.version>3.3.3</mockito.version> + <proton-j.version>0.33.6</proton-j.version> + <junit.jupiter.version>5.7.0</junit.jupiter.version> + <assertj.version>3.17.2</assertj.version> + <mockito.version>3.5.11</mockito.version> <logback.version>1.2.3</logback.version> <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version> <maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version> + <spotless.version>2.2.0</spotless.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> @@ -110,6 +111,20 @@ <version>${maven-surefire-plugin.version}</version> </plugin> + <plugin> + <groupId>com.diffplug.spotless</groupId> + <artifactId>spotless-maven-plugin</artifactId> + <version>${spotless.version}</version> + <configuration> + <java> + <googleJavaFormat> + <version>1.9</version> + <style>GOOGLE</style> + </googleJavaFormat> + </java> + </configuration> + </plugin> + </plugins> </build> diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java index ecabe0ee5c..c7a390f00d 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java @@ -16,12 +16,11 @@ package com.rabbitmq.stream; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + import com.rabbitmq.stream.codec.WrapperMessageBuilder; import com.rabbitmq.stream.impl.Client; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.*; @@ -29,88 +28,100 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) public class FailureTest { - TestUtils.ClientFactory cf; - String stream; - ExecutorService executorService; + TestUtils.ClientFactory cf; + String stream; + ExecutorService executorService; - static void wait(Duration duration) { - try { - Thread.sleep(duration.toMillis()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + static void wait(Duration duration) { + try { + Thread.sleep(duration.toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); } + } - @AfterEach - void tearDown() { - if (executorService != null) { - executorService.shutdownNow(); - } + @AfterEach + void tearDown() { + if (executorService != null) { + executorService.shutdownNow(); } - - @Test - void leaderFailureWhenPublisherConnectedToReplica() throws Exception { - Set<String> messages = new HashSet<>(); - Client client = cf.get(new Client.ClientParameters() - .port(TestUtils.streamPortNode1()) - ); - Map<String, Client.StreamMetadata> metadata = client.metadata(stream); - Client.StreamMetadata streamMetadata = metadata.get(stream); - assertThat(streamMetadata).isNotNull(); - - assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1()); - assertThat(streamMetadata.getReplicas()).isNotEmpty(); - Client.Broker replica = streamMetadata.getReplicas().get(0); - assertThat(replica.getPort()).isNotEqualTo(TestUtils.streamPortNode1()); - - AtomicReference<CountDownLatch> confirmLatch = new AtomicReference<>(new CountDownLatch(1)); - - CountDownLatch metadataLatch = new CountDownLatch(1); - Client publisher = cf.get(new Client.ClientParameters() + } + + @Test + void leaderFailureWhenPublisherConnectedToReplica() throws Exception { + Set<String> messages = new HashSet<>(); + Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + Map<String, Client.StreamMetadata> metadata = client.metadata(stream); + Client.StreamMetadata streamMetadata = metadata.get(stream); + assertThat(streamMetadata).isNotNull(); + + assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1()); + assertThat(streamMetadata.getReplicas()).isNotEmpty(); + Client.Broker replica = streamMetadata.getReplicas().get(0); + assertThat(replica.getPort()).isNotEqualTo(TestUtils.streamPortNode1()); + + AtomicReference<CountDownLatch> confirmLatch = new AtomicReference<>(new CountDownLatch(1)); + + CountDownLatch metadataLatch = new CountDownLatch(1); + Client publisher = + cf.get( + new Client.ClientParameters() .port(replica.getPort()) .metadataListener((stream, code) -> metadataLatch.countDown()) - .publishConfirmListener((publisherId, publishingId) -> confirmLatch.get().countDown())); - String message = "all nodes available"; - messages.add(message); - publisher.publish(stream, (byte) 1, - Collections.singletonList(publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build())); - assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue(); - confirmLatch.set(null); - - try { - Host.rabbitmqctl("stop_app"); - try { - cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); - fail("Node app stopped, connecting should not be possible"); - } catch (Exception e) { - // OK - } - - assertThat(metadataLatch.await(10, TimeUnit.SECONDS)).isTrue(); - - // wait until there's a new leader - TestUtils.waitAtMost(Duration.ofSeconds(10), () -> { - Client.StreamMetadata m = publisher.metadata(stream).get(stream); - return m.getLeader() != null && m.getLeader().getPort() != TestUtils.streamPortNode1(); - }); - - confirmLatch.set(new CountDownLatch(1)); - message = "2 nodes available"; - messages.add(message); - publisher.publish(stream, (byte) 1, Collections.singletonList(publisher.messageBuilder() - .addData(message.getBytes(StandardCharsets.UTF_8)).build())); - assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue(); - confirmLatch.set(null); - } finally { - Host.rabbitmqctl("start_app"); - } + .publishConfirmListener( + (publisherId, publishingId) -> confirmLatch.get().countDown())); + String message = "all nodes available"; + messages.add(message); + publisher.publish( + stream, + (byte) 1, + Collections.singletonList( + publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build())); + assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue(); + confirmLatch.set(null); + + try { + Host.rabbitmqctl("stop_app"); + try { + cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + fail("Node app stopped, connecting should not be possible"); + } catch (Exception e) { + // OK + } + + assertThat(metadataLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + // wait until there's a new leader + TestUtils.waitAtMost( + Duration.ofSeconds(10), + () -> { + Client.StreamMetadata m = publisher.metadata(stream).get(stream); + return m.getLeader() != null && m.getLeader().getPort() != TestUtils.streamPortNode1(); + }); + + confirmLatch.set(new CountDownLatch(1)); + message = "2 nodes available"; + messages.add(message); + publisher.publish( + stream, + (byte) 1, + Collections.singletonList( + publisher + .messageBuilder() + .addData(message.getBytes(StandardCharsets.UTF_8)) + .build())); + assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue(); + confirmLatch.set(null); + } finally { + Host.rabbitmqctl("start_app"); + } // wait until all the replicas are there TestUtils.waitAtMost( @@ -120,350 +131,411 @@ public class FailureTest { return m.getReplicas().size() == 2; }); - confirmLatch.set(new CountDownLatch(1)); - message = "all nodes are back"; - messages.add(message); - publisher.publish(stream, (byte) 1, Collections.singletonList(publisher.messageBuilder() - .addData(message.getBytes(StandardCharsets.UTF_8)).build())); - assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue(); - confirmLatch.set(null); - - CountDownLatch consumeLatch = new CountDownLatch(2); - Set<String> bodies = ConcurrentHashMap.newKeySet(); - Client consumer = cf.get(new Client.ClientParameters() + confirmLatch.set(new CountDownLatch(1)); + message = "all nodes are back"; + messages.add(message); + publisher.publish( + stream, + (byte) 1, + Collections.singletonList( + publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build())); + assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue(); + confirmLatch.set(null); + + CountDownLatch consumeLatch = new CountDownLatch(2); + Set<String> bodies = ConcurrentHashMap.newKeySet(); + Client consumer = + cf.get( + new Client.ClientParameters() .port(TestUtils.streamPortNode1()) - .messageListener((subscriptionId, offset, msg) -> { - bodies.add(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8)); - consumeLatch.countDown(); - })); - - TestUtils.waitAtMost(Duration.ofSeconds(5), () -> { - Client.Response response = consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); - return response.isOk(); - }); - assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(bodies).hasSize(3).contains("all nodes available", "2 nodes available", "all nodes are back"); - } + .messageListener( + (subscriptionId, offset, msg) -> { + bodies.add(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8)); + consumeLatch.countDown(); + })); - @Test - void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception { - executorService = Executors.newCachedThreadPool(); - Client client = cf.get(new Client.ClientParameters() - .port(TestUtils.streamPortNode1()) - ); - Map<String, Client.StreamMetadata> metadata = client.metadata(stream); - Client.StreamMetadata streamMetadata = metadata.get(stream); - assertThat(streamMetadata).isNotNull(); - - assertThat(streamMetadata.getLeader()).isNotNull(); - assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1()); - - Map<Long, Message> published = new ConcurrentHashMap<>(); - Set<Message> confirmed = ConcurrentHashMap.newKeySet(); - - Client.PublishConfirmListener publishConfirmListener = (publisherId, publishingId) -> { - Message confirmedMessage; - int attempts = 0; - while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) { - wait(Duration.ofMillis(5)); - attempts++; - } - confirmed.add(confirmedMessage); + TestUtils.waitAtMost( + Duration.ofSeconds(5), + () -> { + Client.Response response = + consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); + return response.isOk(); + }); + assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(bodies) + .hasSize(3) + .contains("all nodes available", "2 nodes available", "all nodes are back"); + } + + @Test + void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception { + executorService = Executors.newCachedThreadPool(); + Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + Map<String, Client.StreamMetadata> metadata = client.metadata(stream); + Client.StreamMetadata streamMetadata = metadata.get(stream); + assertThat(streamMetadata).isNotNull(); + + assertThat(streamMetadata.getLeader()).isNotNull(); + assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1()); + + Map<Long, Message> published = new ConcurrentHashMap<>(); + Set<Message> confirmed = ConcurrentHashMap.newKeySet(); + + Client.PublishConfirmListener publishConfirmListener = + (publisherId, publishingId) -> { + Message confirmedMessage; + int attempts = 0; + while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) { + wait(Duration.ofMillis(5)); + attempts++; + } + confirmed.add(confirmedMessage); }; - AtomicLong generation = new AtomicLong(0); - AtomicLong sequence = new AtomicLong(0); - AtomicBoolean connected = new AtomicBoolean(true); - AtomicReference<Client> publisher = new AtomicReference<>(); - CountDownLatch reconnectionLatch = new CountDownLatch(1); - AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>(); - Client.ShutdownListener shutdownListener = shutdownContext -> { - if (shutdownContext.getShutdownReason() == Client.ShutdownContext.ShutdownReason.UNKNOWN) { - // avoid long-running task in the IO thread - executorService.submit(() -> { - connected.set(false); - - Client locator = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2())); - // wait until there's a new leader - try { - TestUtils.waitAtMost(Duration.ofSeconds(5), () -> { - Client.StreamMetadata m = locator.metadata(stream).get(stream); - return m.getLeader() != null && m.getLeader().getPort() != TestUtils.streamPortNode1(); + AtomicLong generation = new AtomicLong(0); + AtomicLong sequence = new AtomicLong(0); + AtomicBoolean connected = new AtomicBoolean(true); + AtomicReference<Client> publisher = new AtomicReference<>(); + CountDownLatch reconnectionLatch = new CountDownLatch(1); + AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>(); + Client.ShutdownListener shutdownListener = + shutdownContext -> { + if (shutdownContext.getShutdownReason() + == Client.ShutdownContext.ShutdownReason.UNKNOWN) { + // avoid long-running task in the IO thread + executorService.submit( + () -> { + connected.set(false); + + Client locator = + cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2())); + // wait until there's a new leader + try { + TestUtils.waitAtMost( + Duration.ofSeconds(5), + () -> { + Client.StreamMetadata m = locator.metadata(stream).get(stream); + return m.getLeader() != null + && m.getLeader().getPort() != TestUtils.streamPortNode1(); }); - } catch (Throwable e) { - reconnectionLatch.countDown(); - return; - } - - int newLeaderPort = locator.metadata(stream).get(stream).getLeader().getPort(); - Client newPublisher = cf.get(new Client.ClientParameters() - .port(newLeaderPort) - .shutdownListener(shutdownListenerReference.get()) - .publishConfirmListener(publishConfirmListener) - ); - - generation.incrementAndGet(); - published.clear(); - publisher.set(newPublisher); - connected.set(true); - + } catch (Throwable e) { reconnectionLatch.countDown(); + return; + } + + int newLeaderPort = locator.metadata(stream).get(stream).getLeader().getPort(); + Client newPublisher = + cf.get( + new Client.ClientParameters() + .port(newLeaderPort) + .shutdownListener(shutdownListenerReference.get()) + .publishConfirmListener(publishConfirmListener)); + + generation.incrementAndGet(); + published.clear(); + publisher.set(newPublisher); + connected.set(true); + + reconnectionLatch.countDown(); }); - } + } }; - shutdownListenerReference.set(shutdownListener); + shutdownListenerReference.set(shutdownListener); - client = cf.get(new Client.ClientParameters() + client = + cf.get( + new Client.ClientParameters() .port(streamMetadata.getLeader().getPort()) .shutdownListener(shutdownListener) .publishConfirmListener(publishConfirmListener)); - publisher.set(client); - - AtomicBoolean keepPublishing = new AtomicBoolean(true); - - executorService.submit(() -> { - while (keepPublishing.get()) { - if (connected.get()) { - Message message = publisher.get().messageBuilder() - .properties().messageId(sequence.getAndIncrement()) - .messageBuilder().applicationProperties().entry("generation", generation.get()) - .messageBuilder().build(); - try { - long publishingId = publisher.get().publish(stream, (byte) 1, Collections.singletonList(message)).get(0); - published.put(publishingId, message); - } catch (Exception e) { - // keep going - } - wait(Duration.ofMillis(10)); - } else { - wait(Duration.ofSeconds(1)); - } + publisher.set(client); + + AtomicBoolean keepPublishing = new AtomicBoolean(true); + + executorService.submit( + () -> { + while (keepPublishing.get()) { + if (connected.get()) { + Message message = + publisher + .get() + .messageBuilder() + .properties() + .messageId(sequence.getAndIncrement()) + .messageBuilder() + .applicationProperties() + .entry("generation", generation.get()) + .messageBuilder() + .build(); + try { + long publishingId = + publisher + .get() + .publish(stream, (byte) 1, Collections.singletonList(message)) + .get(0); + published.put(publishingId, message); + } catch (Exception e) { + // keep going + } + wait(Duration.ofMillis(10)); + } else { + wait(Duration.ofSeconds(1)); } + } }); - // let's publish for a bit of time - Thread.sleep(2000); + // let's publish for a bit of time + Thread.sleep(2000); - assertThat(confirmed).isNotEmpty(); - int confirmedCount = confirmed.size(); + assertThat(confirmed).isNotEmpty(); + int confirmedCount = confirmed.size(); - try { - Host.rabbitmqctl("stop_app"); + try { + Host.rabbitmqctl("stop_app"); - assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue(); - // let's publish for a bit of time - Thread.sleep(2000); + // let's publish for a bit of time + Thread.sleep(2000); - } finally { - Host.rabbitmqctl("start_app"); - } - assertThat(confirmed).hasSizeGreaterThan(confirmedCount); - confirmedCount = confirmed.size(); + } finally { + Host.rabbitmqctl("start_app"); + } + assertThat(confirmed).hasSizeGreaterThan(confirmedCount); + confirmedCount = confirmed.size(); - Client metadataClient = cf.get(new Client.ClientParameters() - .port(TestUtils.streamPortNode2()) - ); - // wait until all the replicas are there - TestUtils.waitAtMost(Duration.ofSeconds(5), () -> { - Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); - return m.getReplicas().size() == 2; + Client metadataClient = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2())); + // wait until all the replicas are there + TestUtils.waitAtMost( + Duration.ofSeconds(5), + () -> { + Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); + return m.getReplicas().size() == 2; }); - // let's publish for a bit of time - Thread.sleep(2000); + // let's publish for a bit of time + Thread.sleep(2000); - assertThat(confirmed).hasSizeGreaterThan(confirmedCount); + assertThat(confirmed).hasSizeGreaterThan(confirmedCount); - keepPublishing.set(false); + keepPublishing.set(false); - Queue<Message> consumed = new ConcurrentLinkedQueue<>(); - Set<Long> generations = ConcurrentHashMap.newKeySet(); - CountDownLatch consumedLatch = new CountDownLatch(1); - Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); - Client consumer = cf.get(new Client.ClientParameters() + Queue<Message> consumed = new ConcurrentLinkedQueue<>(); + Set<Long> generations = ConcurrentHashMap.newKeySet(); + CountDownLatch consumedLatch = new CountDownLatch(1); + Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); + Client consumer = + cf.get( + new Client.ClientParameters() .port(m.getReplicas().get(0).getPort()) - .chunkListener((client1, subscriptionId, offset, messageCount, dataSize) -> client1.credit(subscriptionId, 1)) - .messageListener((subscriptionId, offset, message) -> { - consumed.add(message); - generations.add((Long) message.getApplicationProperties().get("generation")); - if (consumed.size() == confirmed.size()) { + .chunkListener( + (client1, subscriptionId, offset, messageCount, dataSize) -> + client1.credit(subscriptionId, 1)) + .messageListener( + (subscriptionId, offset, message) -> { + consumed.add(message); + generations.add((Long) message.getApplicationProperties().get("generation")); + if (consumed.size() == confirmed.size()) { consumedLatch.countDown(); - } - }) - ); - - Client.Response response = consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); - assertThat(response.isOk()).isTrue(); - - assertThat(consumedLatch.await(5, TimeUnit.SECONDS)).isTrue(); - assertThat(generations).hasSize(2).contains(0L, 1L); - assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size()); - long lastMessageId = -1; - for (Message message : consumed) { - long messageId = message.getProperties().getMessageIdAsLong(); - assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId); - lastMessageId = messageId; - } - assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get()); + } + })); + + Client.Response response = + consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); + assertThat(response.isOk()).isTrue(); + + assertThat(consumedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(generations).hasSize(2).contains(0L, 1L); + assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size()); + long lastMessageId = -1; + for (Message message : consumed) { + long messageId = message.getProperties().getMessageIdAsLong(); + assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId); + lastMessageId = messageId; } - - @Test - void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception { - executorService = Executors.newCachedThreadPool(); - Client metadataClient = cf.get(new Client.ClientParameters() - .port(TestUtils.streamPortNode1()) - ); - Map<String, Client.StreamMetadata> metadata = metadataClient.metadata(stream); - Client.StreamMetadata streamMetadata = metadata.get(stream); - assertThat(streamMetadata).isNotNull(); - - assertThat(streamMetadata.getLeader()).isNotNull(); - assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1()); - - Map<Long, Message> published = new ConcurrentHashMap<>(); - Set<Message> confirmed = ConcurrentHashMap.newKeySet(); - Set<Long> confirmedIds = ConcurrentHashMap.newKeySet(); - Client.PublishConfirmListener publishConfirmListener = (publisherId, publishingId) -> { - Message confirmedMessage; - int attempts = 0; - while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) { - wait(Duration.ofMillis(5)); - attempts++; - } - confirmed.add(confirmedMessage); - confirmedIds.add(confirmedMessage.getProperties().getMessageIdAsLong()); + assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get()); + } + + @Test + void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception { + executorService = Executors.newCachedThreadPool(); + Client metadataClient = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + Map<String, Client.StreamMetadata> metadata = metadataClient.metadata(stream); + Client.StreamMetadata streamMetadata = metadata.get(stream); + assertThat(streamMetadata).isNotNull(); + + assertThat(streamMetadata.getLeader()).isNotNull(); + assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1()); + + Map<Long, Message> published = new ConcurrentHashMap<>(); + Set<Message> confirmed = ConcurrentHashMap.newKeySet(); + Set<Long> confirmedIds = ConcurrentHashMap.newKeySet(); + Client.PublishConfirmListener publishConfirmListener = + (publisherId, publishingId) -> { + Message confirmedMessage; + int attempts = 0; + while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) { + wait(Duration.ofMillis(5)); + attempts++; + } + confirmed.add(confirmedMessage); + confirmedIds.add(confirmedMessage.getProperties().getMessageIdAsLong()); }; - Client publisher = cf.get(new Client.ClientParameters() + Client publisher = + cf.get( + new Client.ClientParameters() .port(streamMetadata.getLeader().getPort()) - .publishConfirmListener(publishConfirmListener) - ); - - AtomicLong generation = new AtomicLong(0); - AtomicLong sequence = new AtomicLong(0); - AtomicBoolean keepPublishing = new AtomicBoolean(true); - CountDownLatch publishingLatch = new CountDownLatch(1); - - executorService.submit(() -> { - while (keepPublishing.get()) { - Message message = new WrapperMessageBuilder() - .properties().messageId(sequence.getAndIncrement()) - .messageBuilder().applicationProperties().entry("generation", generation.get()) - .messageBuilder().build(); - try { - long publishingId = publisher.publish(stream, (byte) 1, Collections.singletonList(message)).get(0); - published.put(publishingId, message); - } catch (Exception e) { - // keep going - } - wait(Duration.ofMillis(10)); + .publishConfirmListener(publishConfirmListener)); + + AtomicLong generation = new AtomicLong(0); + AtomicLong sequence = new AtomicLong(0); + AtomicBoolean keepPublishing = new AtomicBoolean(true); + CountDownLatch publishingLatch = new CountDownLatch(1); + + executorService.submit( + () -> { + while (keepPublishing.get()) { + Message message = + new WrapperMessageBuilder() + .properties() + .messageId(sequence.getAndIncrement()) + .messageBuilder() + .applicationProperties() + .entry("generation", generation.get()) + .messageBuilder() + .build(); + try { + long publishingId = + publisher.publish(stream, (byte) 1, Collections.singletonList(message)).get(0); + published.put(publishingId, message); + } catch (Exception e) { + // keep going } - publishingLatch.countDown(); + wait(Duration.ofMillis(10)); + } + publishingLatch.countDown(); }); - Queue<Message> consumed = new ConcurrentLinkedQueue<>(); - - Client.Broker replica = streamMetadata.getReplicas().stream() - .filter(broker -> broker.getPort() == TestUtils.streamPortNode2()) - .findFirst() - .orElseThrow(() -> new NoSuchElementException()); - - AtomicLong lastProcessedOffset = new AtomicLong(-1); - Set<Long> generations = ConcurrentHashMap.newKeySet(); - Set<Long> consumedIds = ConcurrentHashMap.newKeySet(); - Client.MessageListener messageListener = (subscriptionId, offset, message) -> { - consumed.add(message); - generations.add((Long) message.getApplicationProperties().get("generation")); - consumedIds.add(message.getProperties().getMessageIdAsLong()); - lastProcessedOffset.set(offset); + Queue<Message> consumed = new ConcurrentLinkedQueue<>(); + + Client.Broker replica = + streamMetadata.getReplicas().stream() + .filter(broker -> broker.getPort() == TestUtils.streamPortNode2()) + .findFirst() + .orElseThrow(() -> new NoSuchElementException()); + + AtomicLong lastProcessedOffset = new AtomicLong(-1); + Set<Long> generations = ConcurrentHashMap.newKeySet(); + Set<Long> consumedIds = ConcurrentHashMap.newKeySet(); + Client.MessageListener messageListener = + (subscriptionId, offset, message) -> { + consumed.add(message); + generations.add((Long) message.getApplicationProperties().get("generation")); + consumedIds.add(message.getProperties().getMessageIdAsLong()); + lastProcessedOffset.set(offset); }; - CountDownLatch reconnectionLatch = new CountDownLatch(1); - AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>(); - Client.ShutdownListener shutdownListener = shutdownContext -> { - if (shutdownContext.getShutdownReason() == Client.ShutdownContext.ShutdownReason.UNKNOWN) { - // avoid long-running task in the IO thread - executorService.submit(() -> { - Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); - int newReplicaPort = m.getReplicas().get(0).getPort(); - - Client newConsumer = cf.get(new Client.ClientParameters() - .port(newReplicaPort) - .shutdownListener(shutdownListenerReference.get()) - .chunkListener((client1, subscriptionId, offset, messageCount, dataSize) -> client1.credit(subscriptionId, 1)) - .messageListener(messageListener) - ); - - newConsumer.subscribe((byte) 1, stream, OffsetSpecification.offset(lastProcessedOffset.get() + 1), 10); - - generation.incrementAndGet(); - reconnectionLatch.countDown(); + CountDownLatch reconnectionLatch = new CountDownLatch(1); + AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>(); + Client.ShutdownListener shutdownListener = + shutdownContext -> { + if (shutdownContext.getShutdownReason() + == Client.ShutdownContext.ShutdownReason.UNKNOWN) { + // avoid long-running task in the IO thread + executorService.submit( + () -> { + Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); + int newReplicaPort = m.getReplicas().get(0).getPort(); + + Client newConsumer = + cf.get( + new Client.ClientParameters() + .port(newReplicaPort) + .shutdownListener(shutdownListenerReference.get()) + .chunkListener( + (client1, subscriptionId, offset, messageCount, dataSize) -> + client1.credit(subscriptionId, 1)) + .messageListener(messageListener)); + + newConsumer.subscribe( + (byte) 1, + stream, + OffsetSpecification.offset(lastProcessedOffset.get() + 1), + 10); + + generation.incrementAndGet(); + reconnectionLatch.countDown(); }); - } + } }; - shutdownListenerReference.set(shutdownListener); + shutdownListenerReference.set(shutdownListener); - Client consumer = cf.get(new Client.ClientParameters() + Client consumer = + cf.get( + new Client.ClientParameters() .port(replica.getPort()) .shutdownListener(shutdownListener) - .chunkListener((client1, subscriptionId, offset, messageCount, dataSize) -> client1.credit(subscriptionId, 1)) - .messageListener(messageListener) - ); + .chunkListener( + (client1, subscriptionId, offset, messageCount, dataSize) -> + client1.credit(subscriptionId, 1)) + .messageListener(messageListener)); - Client.Response response = consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); - assertThat(response.isOk()).isTrue(); + Client.Response response = + consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); + assertThat(response.isOk()).isTrue(); - // let's publish for a bit of time - Thread.sleep(2000); + // let's publish for a bit of time + Thread.sleep(2000); - assertThat(confirmed).isNotEmpty(); - assertThat(consumed).isNotEmpty(); - int confirmedCount = confirmed.size(); + assertThat(confirmed).isNotEmpty(); + assertThat(consumed).isNotEmpty(); + int confirmedCount = confirmed.size(); - try { - Host.rabbitmqctl("stop_app", Host.node2name()); + try { + Host.rabbitmqctl("stop_app", Host.node2name()); - assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue(); - // let's publish for a bit of time - Thread.sleep(2000); + // let's publish for a bit of time + Thread.sleep(2000); - } finally { - Host.rabbitmqctl("start_app", Host.node2name()); - } - assertThat(confirmed).hasSizeGreaterThan(confirmedCount); - confirmedCount = confirmed.size(); + } finally { + Host.rabbitmqctl("start_app", Host.node2name()); + } + assertThat(confirmed).hasSizeGreaterThan(confirmedCount); + confirmedCount = confirmed.size(); - // wait until all the replicas are there - TestUtils.waitAtMost(Duration.ofSeconds(5), () -> { - Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); - return m.getReplicas().size() == 2; + // wait until all the replicas are there + TestUtils.waitAtMost( + Duration.ofSeconds(5), + () -> { + Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); + return m.getReplicas().size() == 2; }); - // let's publish for a bit of time - Thread.sleep(2000); - - assertThat(confirmed).hasSizeGreaterThan(confirmedCount); + // let's publish for a bit of time + Thread.sleep(2000); - keepPublishing.set(false); + assertThat(confirmed).hasSizeGreaterThan(confirmedCount); - assertThat(publishingLatch.await(5, TimeUnit.SECONDS)).isTrue(); + keepPublishing.set(false); - TestUtils.waitAtMost(Duration.ofSeconds(5), () -> consumed.size() >= confirmed.size()); + assertThat(publishingLatch.await(5, TimeUnit.SECONDS)).isTrue(); - assertThat(generations).hasSize(2).contains(0L, 1L); - assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size()); - long lastMessageId = -1; - for (Message message : consumed) { - long messageId = message.getProperties().getMessageIdAsLong(); - assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId); - lastMessageId = messageId; - } - assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get()); + TestUtils.waitAtMost(Duration.ofSeconds(5), () -> consumed.size() >= confirmed.size()); - confirmedIds.forEach(confirmedId -> assertThat(consumedIds).contains(confirmedId)); + assertThat(generations).hasSize(2).contains(0L, 1L); + assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size()); + long lastMessageId = -1; + for (Message message : consumed) { + long messageId = message.getProperties().getMessageIdAsLong(); + assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId); + lastMessageId = messageId; } + assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get()); + confirmedIds.forEach(confirmedId -> assertThat(consumedIds).contains(confirmedId)); + } } diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java index 1c89f5d165..0134038a8b 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java @@ -25,88 +25,93 @@ import java.net.UnknownHostException; public class Host { - private static String capture(InputStream is) - throws IOException { - BufferedReader br = new BufferedReader(new InputStreamReader(is)); - String line; - StringBuilder buff = new StringBuilder(); - while ((line = br.readLine()) != null) { - buff.append(line).append("\n"); - } - return buff.toString(); + private static String capture(InputStream is) throws IOException { + BufferedReader br = new BufferedReader(new InputStreamReader(is)); + String line; + StringBuilder buff = new StringBuilder(); + while ((line = br.readLine()) != null) { + buff.append(line).append("\n"); } + return buff.toString(); + } - private static Process executeCommand(String command) throws IOException { - Process pr = executeCommandProcess(command); + private static Process executeCommand(String command) throws IOException { + Process pr = executeCommandProcess(command); - int ev = waitForExitValue(pr); - if (ev != 0) { - String stdout = capture(pr.getInputStream()); - String stderr = capture(pr.getErrorStream()); - throw new IOException("unexpected command exit value: " + ev + - "\ncommand: " + command + "\n" + - "\nstdout:\n" + stdout + - "\nstderr:\n" + stderr + "\n"); - } - return pr; + int ev = waitForExitValue(pr); + if (ev != 0) { + String stdout = capture(pr.getInputStream()); + String stderr = capture(pr.getErrorStream()); + throw new IOException( + "unexpected command exit value: " + + ev + + "\ncommand: " + + command + + "\n" + + "\nstdout:\n" + + stdout + + "\nstderr:\n" + + stderr + + "\n"); } + return pr; + } - private static int waitForExitValue(Process pr) { - while (true) { - try { - pr.waitFor(); - break; - } catch (InterruptedException ignored) { - } - } - return pr.exitValue(); + private static int waitForExitValue(Process pr) { + while (true) { + try { + pr.waitFor(); + break; + } catch (InterruptedException ignored) { + } } + return pr.exitValue(); + } - private static Process executeCommandProcess(String command) throws IOException { - String[] finalCommand; - if (System.getProperty("os.name").toLowerCase().contains("windows")) { - finalCommand = new String[4]; - finalCommand[0] = "C:\\winnt\\system32\\cmd.exe"; - finalCommand[1] = "/y"; - finalCommand[2] = "/c"; - finalCommand[3] = command; - } else { - finalCommand = new String[3]; - finalCommand[0] = "/bin/sh"; - finalCommand[1] = "-c"; - finalCommand[2] = command; - } - return Runtime.getRuntime().exec(finalCommand); + private static Process executeCommandProcess(String command) throws IOException { + String[] finalCommand; + if (System.getProperty("os.name").toLowerCase().contains("windows")) { + finalCommand = new String[4]; + finalCommand[0] = "C:\\winnt\\system32\\cmd.exe"; + finalCommand[1] = "/y"; + finalCommand[2] = "/c"; + finalCommand[3] = command; + } else { + finalCommand = new String[3]; + finalCommand[0] = "/bin/sh"; + finalCommand[1] = "-c"; + finalCommand[2] = command; } + return Runtime.getRuntime().exec(finalCommand); + } - public static Process rabbitmqctl(String command) throws IOException { - return rabbitmqctl(command, node1name()); - } - - public static Process rabbitmqctl(String command, String nodename) throws IOException { - return executeCommand(rabbitmqctlCommand() + - " -n '" + nodename + "'" + - " " + command); - } + public static Process rabbitmqctl(String command) throws IOException { + return rabbitmqctl(command, node1name()); + } - public static String node1name() { - try { - return System.getProperty("node1.name", "rabbit-1@" + InetAddress.getLocalHost().getHostName()); - } catch (UnknownHostException e) { - throw new RuntimeException(e); - } - } + public static Process rabbitmqctl(String command, String nodename) throws IOException { + return executeCommand(rabbitmqctlCommand() + " -n '" + nodename + "'" + " " + command); + } - public static String node2name() { - try { - return System.getProperty("node2.name", "rabbit-2@" + InetAddress.getLocalHost().getHostName()); - } catch (UnknownHostException e) { - throw new RuntimeException(e); - } + public static String node1name() { + try { + return System.getProperty( + "node1.name", "rabbit-1@" + InetAddress.getLocalHost().getHostName()); + } catch (UnknownHostException e) { + throw new RuntimeException(e); } + } - static String rabbitmqctlCommand() { - return System.getProperty("rabbitmqctl.bin"); + public static String node2name() { + try { + return System.getProperty( + "node2.name", "rabbit-2@" + InetAddress.getLocalHost().getHostName()); + } catch (UnknownHostException e) { + throw new RuntimeException(e); } + } + static String rabbitmqctlCommand() { + return System.getProperty("rabbitmqctl.bin"); + } } diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java index ac5ba238e7..08024a12bf 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java @@ -16,13 +16,9 @@ package com.rabbitmq.stream; -import com.rabbitmq.stream.impl.Client; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; +import static org.assertj.core.api.Assertions.assertThat; +import com.rabbitmq.stream.impl.Client; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -32,106 +28,146 @@ import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.IntStream; import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.assertThat; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) public class StreamTest { - String stream; - TestUtils.ClientFactory cf; - - static Stream<Arguments> shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember() { - return Stream.of( - brokers("leader", metadata -> metadata.getLeader(), "leader", metadata -> metadata.getLeader()), - brokers("leader", metadata -> metadata.getLeader(), "replica", metadata -> metadata.getReplicas().iterator().next()), - brokers("replica", metadata -> metadata.getReplicas().iterator().next(), "leader", metadata -> metadata.getLeader()), - brokers("replica", metadata -> new ArrayList<>(metadata.getReplicas()).get(0), "replica", metadata -> new ArrayList<>(metadata.getReplicas()).get(1)) - ); - } - - static Arguments brokers(String dp, Function<Client.StreamMetadata, Client.Broker> publisherBroker, - String dc, Function<Client.StreamMetadata, Client.Broker> consumerBroker) { - return Arguments.of(new FunctionWithToString<>(dp, publisherBroker), new FunctionWithToString<>(dc, consumerBroker)); - } - - @ParameterizedTest - @MethodSource - void shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember(Function<Client.StreamMetadata, Client.Broker> publisherBroker, - Function<Client.StreamMetadata, Client.Broker> consumerBroker) throws Exception { - - int messageCount = 10_000; - Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); - Map<String, Client.StreamMetadata> metadata = client.metadata(stream); - assertThat(metadata).hasSize(1).containsKey(stream); - Client.StreamMetadata streamMetadata = metadata.get(stream); - - CountDownLatch publishingLatch = new CountDownLatch(messageCount); - Client publisher = cf.get(new Client.ClientParameters() + String stream; + TestUtils.ClientFactory cf; + + static Stream<Arguments> shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember() { + return Stream.of( + brokers( + "leader", metadata -> metadata.getLeader(), "leader", metadata -> metadata.getLeader()), + brokers( + "leader", + metadata -> metadata.getLeader(), + "replica", + metadata -> metadata.getReplicas().iterator().next()), + brokers( + "replica", + metadata -> metadata.getReplicas().iterator().next(), + "leader", + metadata -> metadata.getLeader()), + brokers( + "replica", + metadata -> new ArrayList<>(metadata.getReplicas()).get(0), + "replica", + metadata -> new ArrayList<>(metadata.getReplicas()).get(1))); + } + + static Arguments brokers( + String dp, + Function<Client.StreamMetadata, Client.Broker> publisherBroker, + String dc, + Function<Client.StreamMetadata, Client.Broker> consumerBroker) { + return Arguments.of( + new FunctionWithToString<>(dp, publisherBroker), + new FunctionWithToString<>(dc, consumerBroker)); + } + + @ParameterizedTest + @MethodSource + void shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember( + Function<Client.StreamMetadata, Client.Broker> publisherBroker, + Function<Client.StreamMetadata, Client.Broker> consumerBroker) + throws Exception { + + int messageCount = 10_000; + Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + Map<String, Client.StreamMetadata> metadata = client.metadata(stream); + assertThat(metadata).hasSize(1).containsKey(stream); + Client.StreamMetadata streamMetadata = metadata.get(stream); + + CountDownLatch publishingLatch = new CountDownLatch(messageCount); + Client publisher = + cf.get( + new Client.ClientParameters() .port(publisherBroker.apply(streamMetadata).getPort()) - .publishConfirmListener((publisherId, publishingId) -> publishingLatch.countDown())); - - IntStream.range(0, messageCount).forEach(i -> publisher.publish(stream, (byte) 1, Collections.singletonList( - publisher.messageBuilder().addData(("hello " + i).getBytes(StandardCharsets.UTF_8)).build()))); - - assertThat(publishingLatch.await(10, TimeUnit.SECONDS)).isTrue(); - - CountDownLatch consumingLatch = new CountDownLatch(messageCount); - Set<String> bodies = ConcurrentHashMap.newKeySet(messageCount); - Client consumer = cf.get(new Client.ClientParameters() + .publishConfirmListener( + (publisherId, publishingId) -> publishingLatch.countDown())); + + IntStream.range(0, messageCount) + .forEach( + i -> + publisher.publish( + stream, + (byte) 1, + Collections.singletonList( + publisher + .messageBuilder() + .addData(("hello " + i).getBytes(StandardCharsets.UTF_8)) + .build()))); + + assertThat(publishingLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + CountDownLatch consumingLatch = new CountDownLatch(messageCount); + Set<String> bodies = ConcurrentHashMap.newKeySet(messageCount); + Client consumer = + cf.get( + new Client.ClientParameters() .port(consumerBroker.apply(streamMetadata).getPort()) - .chunkListener((client1, subscriptionId, offset, messageCount1, dataSize) -> client1.credit(subscriptionId, 10)) - .messageListener((subscriptionId, offset, message) -> { - bodies.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8)); - consumingLatch.countDown(); - }) - ); - - consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); - - assertThat(consumingLatch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(bodies).hasSize(messageCount); - IntStream.range(0, messageCount).forEach(i -> assertThat(bodies.contains("hello " + i))); - } - - @Test - void metadataOnClusterShouldReturnLeaderAndReplicas() { - Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); - Map<String, Client.StreamMetadata> metadata = client.metadata(stream); - assertThat(metadata).hasSize(1).containsKey(stream); - Client.StreamMetadata streamMetadata = metadata.get(stream); - assertThat(streamMetadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); - assertThat(streamMetadata.getReplicas()).hasSize(2); - - BiConsumer<Client.Broker, Client.Broker> assertNodesAreDifferent = (node, anotherNode) -> { - assertThat(node.getHost()).isEqualTo(anotherNode.getHost()); - assertThat(node.getPort()).isNotEqualTo(anotherNode.getPort()); + .chunkListener( + (client1, subscriptionId, offset, messageCount1, dataSize) -> + client1.credit(subscriptionId, 10)) + .messageListener( + (subscriptionId, offset, message) -> { + bodies.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8)); + consumingLatch.countDown(); + })); + + consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); + + assertThat(consumingLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(bodies).hasSize(messageCount); + IntStream.range(0, messageCount).forEach(i -> assertThat(bodies.contains("hello " + i))); + } + + @Test + void metadataOnClusterShouldReturnLeaderAndReplicas() { + Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + Map<String, Client.StreamMetadata> metadata = client.metadata(stream); + assertThat(metadata).hasSize(1).containsKey(stream); + Client.StreamMetadata streamMetadata = metadata.get(stream); + assertThat(streamMetadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(streamMetadata.getReplicas()).hasSize(2); + + BiConsumer<Client.Broker, Client.Broker> assertNodesAreDifferent = + (node, anotherNode) -> { + assertThat(node.getHost()).isEqualTo(anotherNode.getHost()); + assertThat(node.getPort()).isNotEqualTo(anotherNode.getPort()); }; - streamMetadata.getReplicas().forEach(replica -> assertNodesAreDifferent.accept(replica, streamMetadata.getLeader())); - List<Client.Broker> replicas = new ArrayList<>(streamMetadata.getReplicas()); - assertNodesAreDifferent.accept(replicas.get(0), replicas.get(1)); - } + streamMetadata + .getReplicas() + .forEach(replica -> assertNodesAreDifferent.accept(replica, streamMetadata.getLeader())); + List<Client.Broker> replicas = new ArrayList<>(streamMetadata.getReplicas()); + assertNodesAreDifferent.accept(replicas.get(0), replicas.get(1)); + } - static class FunctionWithToString<T, R> implements Function<T, R> { + static class FunctionWithToString<T, R> implements Function<T, R> { - final String toString; - final Function<T, R> delegate; + final String toString; + final Function<T, R> delegate; - FunctionWithToString(String toString, Function<T, R> delegate) { - this.toString = toString; - this.delegate = delegate; - } - - @Override - public R apply(T t) { - return delegate.apply(t); - } + FunctionWithToString(String toString, Function<T, R> delegate) { + this.toString = toString; + this.delegate = delegate; + } - @Override - public String toString() { - return toString; - } + @Override + public R apply(T t) { + return delegate.apply(t); } + @Override + public String toString() { + return toString; + } + } } diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java index da3eb28f02..c49a8d5832 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java @@ -16,162 +16,164 @@ package com.rabbitmq.stream; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; + import com.rabbitmq.stream.impl.Client; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; -import org.junit.jupiter.api.extension.*; - import java.lang.reflect.Field; import java.time.Duration; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BooleanSupplier; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.fail; +import org.junit.jupiter.api.extension.*; public class TestUtils { - static int streamPortNode1() { - String port = System.getProperty("node1.stream.port", "5555"); - return Integer.valueOf(port); - } + static int streamPortNode1() { + String port = System.getProperty("node1.stream.port", "5555"); + return Integer.valueOf(port); + } - static int streamPortNode2() { - String port = System.getProperty("node2.stream.port", "5556"); - return Integer.valueOf(port); - } + static int streamPortNode2() { + String port = System.getProperty("node2.stream.port", "5556"); + return Integer.valueOf(port); + } - static void waitAtMost(Duration duration, BooleanSupplier condition) throws InterruptedException { - if (condition.getAsBoolean()) { - return; - } - int waitTime = 100; - int waitedTime = 0; - long timeoutInMs = duration.toMillis(); - while (waitedTime <= timeoutInMs) { - Thread.sleep(waitTime); - if (condition.getAsBoolean()) { - return; - } - waitedTime += waitTime; - } - fail("Waited " + duration.getSeconds() + " second(s), condition never got true"); + static void waitAtMost(Duration duration, BooleanSupplier condition) throws InterruptedException { + if (condition.getAsBoolean()) { + return; } + int waitTime = 100; + int waitedTime = 0; + long timeoutInMs = duration.toMillis(); + while (waitedTime <= timeoutInMs) { + Thread.sleep(waitTime); + if (condition.getAsBoolean()) { + return; + } + waitedTime += waitTime; + } + fail("Waited " + duration.getSeconds() + " second(s), condition never got true"); + } - static class StreamTestInfrastructureExtension implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback { - - private static final ExtensionContext.Namespace NAMESPACE = ExtensionContext.Namespace.create(StreamTestInfrastructureExtension.class); - - private static ExtensionContext.Store store(ExtensionContext extensionContext) { - return extensionContext.getRoot().getStore(NAMESPACE); - } - - private static EventLoopGroup eventLoopGroup(ExtensionContext context) { - return (EventLoopGroup) store(context).get("nettyEventLoopGroup"); - } + static class StreamTestInfrastructureExtension + implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback { - @Override - public void beforeAll(ExtensionContext context) { - store(context).put("nettyEventLoopGroup", new NioEventLoopGroup()); - } + private static final ExtensionContext.Namespace NAMESPACE = + ExtensionContext.Namespace.create(StreamTestInfrastructureExtension.class); - @Override - public void beforeEach(ExtensionContext context) throws Exception { - try { - Field streamField = context.getTestInstance().get().getClass().getDeclaredField("eventLoopGroup"); - streamField.setAccessible(true); - streamField.set(context.getTestInstance().get(), eventLoopGroup(context)); - } catch (NoSuchFieldException e) { - - } - try { - Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream"); - streamField.setAccessible(true); - String stream = UUID.randomUUID().toString(); - streamField.set(context.getTestInstance().get(), stream); - Client client = new Client(new Client.ClientParameters().eventLoopGroup(eventLoopGroup(context)) - .port(streamPortNode1()) - ); - Client.Response response = client.create(stream); - assertThat(response.isOk()).isTrue(); - client.close(); - store(context).put("testMethodStream", stream); - } catch (NoSuchFieldException e) { - - } - - for (Field declaredField : context.getTestInstance().get().getClass().getDeclaredFields()) { - if (declaredField.getType().equals(ClientFactory.class)) { - declaredField.setAccessible(true); - ClientFactory clientFactory = new ClientFactory(eventLoopGroup(context)); - declaredField.set(context.getTestInstance().get(), clientFactory); - store(context).put("testClientFactory", clientFactory); - break; - } - } + private static ExtensionContext.Store store(ExtensionContext extensionContext) { + return extensionContext.getRoot().getStore(NAMESPACE); + } - } + private static EventLoopGroup eventLoopGroup(ExtensionContext context) { + return (EventLoopGroup) store(context).get("nettyEventLoopGroup"); + } - @Override - public void afterEach(ExtensionContext context) throws Exception { - try { - Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream"); - streamField.setAccessible(true); - String stream = (String) streamField.get(context.getTestInstance().get()); - Client client = new Client(new Client.ClientParameters().eventLoopGroup(eventLoopGroup(context)) - .port(streamPortNode1()) - ); - Client.Response response = client.delete(stream); - assertThat(response.isOk()).isTrue(); - client.close(); - store(context).remove("testMethodStream"); - } catch (NoSuchFieldException e) { - - } - - ClientFactory clientFactory = (ClientFactory) store(context).get("testClientFactory"); - if (clientFactory != null) { - clientFactory.close(); - } - } + @Override + public void beforeAll(ExtensionContext context) { + store(context).put("nettyEventLoopGroup", new NioEventLoopGroup()); + } - @Override - public void afterAll(ExtensionContext context) throws Exception { - EventLoopGroup eventLoopGroup = eventLoopGroup(context); - eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS); + @Override + public void beforeEach(ExtensionContext context) throws Exception { + try { + Field streamField = + context.getTestInstance().get().getClass().getDeclaredField("eventLoopGroup"); + streamField.setAccessible(true); + streamField.set(context.getTestInstance().get(), eventLoopGroup(context)); + } catch (NoSuchFieldException e) { + + } + try { + Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream"); + streamField.setAccessible(true); + String stream = UUID.randomUUID().toString(); + streamField.set(context.getTestInstance().get(), stream); + Client client = + new Client( + new Client.ClientParameters() + .eventLoopGroup(eventLoopGroup(context)) + .port(streamPortNode1())); + Client.Response response = client.create(stream); + assertThat(response.isOk()).isTrue(); + client.close(); + store(context).put("testMethodStream", stream); + } catch (NoSuchFieldException e) { + + } + + for (Field declaredField : context.getTestInstance().get().getClass().getDeclaredFields()) { + if (declaredField.getType().equals(ClientFactory.class)) { + declaredField.setAccessible(true); + ClientFactory clientFactory = new ClientFactory(eventLoopGroup(context)); + declaredField.set(context.getTestInstance().get(), clientFactory); + store(context).put("testClientFactory", clientFactory); + break; } - + } } - static class ClientFactory { + @Override + public void afterEach(ExtensionContext context) throws Exception { + try { + Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream"); + streamField.setAccessible(true); + String stream = (String) streamField.get(context.getTestInstance().get()); + Client client = + new Client( + new Client.ClientParameters() + .eventLoopGroup(eventLoopGroup(context)) + .port(streamPortNode1())); + Client.Response response = client.delete(stream); + assertThat(response.isOk()).isTrue(); + client.close(); + store(context).remove("testMethodStream"); + } catch (NoSuchFieldException e) { + + } + + ClientFactory clientFactory = (ClientFactory) store(context).get("testClientFactory"); + if (clientFactory != null) { + clientFactory.close(); + } + } - private final EventLoopGroup eventLoopGroup; - private final Set<Client> clients = ConcurrentHashMap.newKeySet(); + @Override + public void afterAll(ExtensionContext context) throws Exception { + EventLoopGroup eventLoopGroup = eventLoopGroup(context); + eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS); + } + } + static class ClientFactory { - public ClientFactory(EventLoopGroup eventLoopGroup) { - this.eventLoopGroup = eventLoopGroup; - } + private final EventLoopGroup eventLoopGroup; + private final Set<Client> clients = ConcurrentHashMap.newKeySet(); - public Client get() { - return get(new Client.ClientParameters()); - } + public ClientFactory(EventLoopGroup eventLoopGroup) { + this.eventLoopGroup = eventLoopGroup; + } - public Client get(Client.ClientParameters parameters) { - // don't set the port, it would override the caller's port setting - Client client = new Client(parameters.eventLoopGroup(eventLoopGroup)); - clients.add(client); - return client; - } + public Client get() { + return get(new Client.ClientParameters()); + } - private void close() { - for (Client c : clients) { - c.close(); - } - } + public Client get(Client.ClientParameters parameters) { + // don't set the port, it would override the caller's port setting + Client client = new Client(parameters.eventLoopGroup(eventLoopGroup)); + clients.add(client); + return client; } + private void close() { + for (Client c : clients) { + c.close(); + } + } + } } |