diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2022-08-10 09:46:02 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-08-10 09:46:02 +0200 |
commit | 424e4170ba2508eee5065b1bced81c3d22a200bc (patch) | |
tree | 068a1ae2ed0fcc847adef7699f2365e3cf32e08f | |
parent | d666555b4014240393b6015c39a9ebcf2d27c26e (diff) | |
parent | 68969faf948ff15c88b8ef43c91385f94261a16c (diff) | |
download | rabbitmq-server-git-424e4170ba2508eee5065b1bced81c3d22a200bc.tar.gz |
Merge pull request #5479 from rabbitmq/mk-rabbitmq-stream-java-test-suite-interface-change
Streams: adapt tests to the latest Java stream client listener interface
2 files changed, 4 insertions, 4 deletions
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java index 55ccea1f6b..28d16bae09 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java @@ -164,7 +164,7 @@ public class FailureTest { new Client.ClientParameters() .port(TestUtils.streamPortNode1()) .messageListener( - (subscriptionId, offset, chunkTimestamp, msg) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { bodies.add(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8)); consumeLatch.countDown(); })); @@ -345,7 +345,7 @@ public class FailureTest { (client1, subscriptionId, offset, messageCount, dataSize) -> client1.credit(subscriptionId, 1)) .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { consumed.add(message); generations.add((Long) message.getApplicationProperties().get("generation")); if (consumed.size() == confirmed.size()) { @@ -447,7 +447,7 @@ public class FailureTest { Set<Long> generations = ConcurrentHashMap.newKeySet(); Set<Long> consumedIds = ConcurrentHashMap.newKeySet(); Client.MessageListener messageListener = - (subscriptionId, offset, chunkTimestamp, message) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { consumed.add(message); generations.add((Long) message.getApplicationProperties().get("generation")); consumedIds.add(message.getProperties().getMessageIdAsLong()); diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java index e4d06c9025..ba3ddf7464 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java @@ -119,7 +119,7 @@ public class StreamTest { (client1, subscriptionId, offset, messageCount1, dataSize) -> client1.credit(subscriptionId, 10)) .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { bodies.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8)); consumingLatch.countDown(); })); |