summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2022-08-10 09:59:00 +0200
committerGitHub <noreply@github.com>2022-08-10 09:59:00 +0200
commit238b7cac53581dc6a854cb38ce107c2a577b2a27 (patch)
treeaf17ec48da16b9edf3d9b8a4bcc03d35b026e7d6
parentf73ea407b4538620aaea87a95b90ac57a3e7a90c (diff)
parent8b6f382fbb142a85e9e30a8ace6cc4abb0c49b7f (diff)
downloadrabbitmq-server-git-238b7cac53581dc6a854cb38ce107c2a577b2a27.tar.gz
Merge pull request #5480 from rabbitmq/mergify/bp/v3.11.x/pr-5479
Streams: adapt tests to the latest Java stream client listener interface (backport #5479)
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java6
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java2
2 files changed, 4 insertions, 4 deletions
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
index 55ccea1f6b..28d16bae09 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
@@ -164,7 +164,7 @@ public class FailureTest {
new Client.ClientParameters()
.port(TestUtils.streamPortNode1())
.messageListener(
- (subscriptionId, offset, chunkTimestamp, msg) -> {
+ (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
bodies.add(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8));
consumeLatch.countDown();
}));
@@ -345,7 +345,7 @@ public class FailureTest {
(client1, subscriptionId, offset, messageCount, dataSize) ->
client1.credit(subscriptionId, 1))
.messageListener(
- (subscriptionId, offset, chunkTimestamp, message) -> {
+ (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
consumed.add(message);
generations.add((Long) message.getApplicationProperties().get("generation"));
if (consumed.size() == confirmed.size()) {
@@ -447,7 +447,7 @@ public class FailureTest {
Set<Long> generations = ConcurrentHashMap.newKeySet();
Set<Long> consumedIds = ConcurrentHashMap.newKeySet();
Client.MessageListener messageListener =
- (subscriptionId, offset, chunkTimestamp, message) -> {
+ (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
consumed.add(message);
generations.add((Long) message.getApplicationProperties().get("generation"));
consumedIds.add(message.getProperties().getMessageIdAsLong());
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java
index e4d06c9025..ba3ddf7464 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java
@@ -119,7 +119,7 @@ public class StreamTest {
(client1, subscriptionId, offset, messageCount1, dataSize) ->
client1.credit(subscriptionId, 10))
.messageListener(
- (subscriptionId, offset, chunkTimestamp, message) -> {
+ (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
bodies.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8));
consumingLatch.countDown();
}));