summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-09-02 15:13:37 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-09-02 15:13:37 +0200
commit21d9aea96cec88c8bb9d62c5d40433b9675c62f4 (patch)
tree84a5177fa6b29bfd35cbe91dd272024a2de22879
parentb2f30a55ba87c14c12b3c2a407f7a5ac2594bd8c (diff)
downloadrabbitmq-server-git-21d9aea96cec88c8bb9d62c5d40433b9675c62f4.tar.gz
Add publisher ID
-rw-r--r--deps/rabbitmq_stream/docs/PROTOCOL.adoc3
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl51
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl4
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java16
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java4
5 files changed, 49 insertions, 29 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
index f61680c47f..c5613fe908 100644
--- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc
+++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
@@ -150,6 +150,7 @@ Publish => Key Version Stream PublishedMessages
Key => int16 // 0
Version => int16
Stream => string // the name of the stream
+ PublisherId => int8
PublishedMessages => [PublishedMessage]
PublishedMessage => PublishingId Message
Message => bytes
@@ -161,6 +162,7 @@ Publish => Key Version Stream PublishedMessages
PublishConfirm => Key Version PublishingIds
Key => int16 // 1
Version => int16
+ PublisherId => int8
PublishingIds => [int64] // to correlate with the messages sent
```
@@ -236,6 +238,7 @@ Unsubscribe => Key Version CorrelationId SubscriptionId
PublishError => Key Version [PublishingError]
Key => int16 // 6
Version => int16
+ PublisherId => int8
PublishingError => PublishingId Code
PublishingId => int64
Code => int16 // code to identify the problem
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 401ea6c193..ad7c0c409f 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -221,13 +221,27 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S,
{Connection, State}
end,
listen_loop_post_auth(Transport, Connection1, State1, Configuration);
- {'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, CorrelationIdList}}} ->
- CorrelationIdBinaries = [<<CorrelationId:64>> || CorrelationId <- CorrelationIdList],
- CorrelationIdCount = length(CorrelationIdList),
- FrameSize = 2 + 2 + 4 + CorrelationIdCount * 8,
- %% FIXME enforce max frame size
- %% in practice, this should be necessary only for very large chunks and for very small frame size limits
- Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16>>, <<CorrelationIdCount:32>>, CorrelationIdBinaries]),
+ {'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, CorrelationList}}} ->
+ {FirstPublisherId, _FirstPublishingId} = lists:nth(1, CorrelationList),
+ {LastPublisherId, LastPublishingIds, LastCount} = lists:foldl(fun({PublisherId, PublishingId}, {CurrentPublisherId, PublishingIds, Count}) ->
+ case PublisherId of
+ CurrentPublisherId ->
+ {CurrentPublisherId, [PublishingIds, <<PublishingId:64>>], Count + 1};
+ OtherPublisherId ->
+ FrameSize = 2 + 2 + 1 + 4 + Count * 8,
+ %% FIXME enforce max frame size
+ %% in practice, this should be necessary only for very large chunks and for very small frame size limits
+ Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16>>,
+ <<CurrentPublisherId:8>>,
+ <<Count:32>>, PublishingIds]),
+ {OtherPublisherId, <<PublishingId:64>>, 1}
+ end
+ end, {FirstPublisherId, <<>>, 0}, CorrelationList),
+ FrameSize = 2 + 2 + 1 + 4 + LastCount * 8,
+ Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16>>,
+ <<LastPublisherId:8>>,
+ <<LastCount:32>>, LastPublishingIds]),
+ CorrelationIdCount = length(CorrelationList),
add_credits(Credits, CorrelationIdCount),
State1 = case Blocked of
true ->
@@ -364,16 +378,16 @@ handle_inbound_data(Transport, Connection, #stream_connection_state{data = Lefto
%% see osiris_replica:parse_chunk/3
handle_inbound_data(Transport, Connection, State1, <<Leftover/binary, Data/binary>>, HandleFrameFun).
-write_messages(_ClusterLeader, <<>>) ->
+write_messages(_ClusterLeader, _PublisherId, <<>>) ->
ok;
-write_messages(ClusterLeader, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) ->
+write_messages(ClusterLeader, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) ->
% FIXME handle write error
- ok = osiris:write(ClusterLeader, PublishingId, Message),
- write_messages(ClusterLeader, Rest);
-write_messages(ClusterLeader, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) ->
+ ok = osiris:write(ClusterLeader, {PublisherId, PublishingId}, Message),
+ write_messages(ClusterLeader, PublisherId, Rest);
+write_messages(ClusterLeader, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) ->
% FIXME handle write error
- ok = osiris:write(ClusterLeader, PublishingId, {batch, MessageCount, CompressionType, Batch}),
- write_messages(ClusterLeader, Rest).
+ ok = osiris:write(ClusterLeader, {PublisherId, PublishingId}, {batch, MessageCount, CompressionType, Batch}),
+ write_messages(ClusterLeader, PublisherId, Rest).
generate_publishing_error_details(Acc, _Code, <<>>) ->
Acc;
@@ -543,25 +557,28 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credi
virtual_host = VirtualHost, user = User} = Connection, State,
<<?COMMAND_PUBLISH:16, ?VERSION_0:16,
StreamSize:16, Stream:StreamSize/binary,
+ PublisherId:8/unsigned,
MessageCount:32, Messages/binary>>, Rest) ->
case check_write_permitted(#resource{name = Stream, kind = queue, virtual_host = VirtualHost}, User, #{}) of
ok ->
case lookup_leader(Stream, Connection) of
cluster_not_found ->
- FrameSize = 2 + 2 + 4 + (8 + 2) * MessageCount,
+ FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount,
Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, Messages),
Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_ERROR:16, ?VERSION_0:16,
+ PublisherId:8,
MessageCount:32, Details/binary>>]),
{Connection, State, Rest};
{ClusterLeader, Connection1} ->
- write_messages(ClusterLeader, Messages),
+ write_messages(ClusterLeader, PublisherId, Messages),
sub_credits(Credits, MessageCount),
{Connection1, State, Rest}
end;
error ->
- FrameSize = 2 + 2 + 4 + (8 + 2) * MessageCount,
+ FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount,
Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_ACCESS_REFUSED, Messages),
Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_ERROR:16, ?VERSION_0:16,
+ PublisherId:8,
MessageCount:32, Details/binary>>]),
{Connection, State, Rest}
end;
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
index b4f209876e..62a581349d 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
@@ -199,10 +199,10 @@ test_delete_stream(S, Stream) ->
test_publish_confirm(S, Stream, Body) ->
BodySize = byte_size(Body),
StreamSize = byte_size(Stream),
- PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>,
+ PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary, 42:8, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>,
FrameSize = byte_size(PublishFrame),
gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>),
- {ok, <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000).
+ {ok, <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, 42:8, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000).
test_subscribe(S, SubscriptionId, Stream) ->
StreamSize = byte_size(Stream),
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
index 7b3dcab7aa..5c43b9d0f6 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
@@ -76,10 +76,10 @@ public class FailureTest {
Client publisher = cf.get(new Client.ClientParameters()
.port(replica.getPort())
.metadataListener((stream, code) -> metadataLatch.countDown())
- .publishConfirmListener(publishingId -> confirmLatch.get().countDown()));
+ .publishConfirmListener((publisherId, publishingId) -> confirmLatch.get().countDown()));
String message = "all nodes available";
messages.add(message);
- publisher.publish(stream,
+ publisher.publish(stream, (byte) 1,
Collections.singletonList(publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build()));
assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
confirmLatch.set(null);
@@ -104,7 +104,7 @@ public class FailureTest {
confirmLatch.set(new CountDownLatch(1));
message = "2 nodes available";
messages.add(message);
- publisher.publish(stream, Collections.singletonList(publisher.messageBuilder()
+ publisher.publish(stream, (byte) 1, Collections.singletonList(publisher.messageBuilder()
.addData(message.getBytes(StandardCharsets.UTF_8)).build()));
assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
confirmLatch.set(null);
@@ -121,7 +121,7 @@ public class FailureTest {
confirmLatch.set(new CountDownLatch(1));
message = "all nodes are back";
messages.add(message);
- publisher.publish(stream, Collections.singletonList(publisher.messageBuilder()
+ publisher.publish(stream, (byte) 1, Collections.singletonList(publisher.messageBuilder()
.addData(message.getBytes(StandardCharsets.UTF_8)).build()));
assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
confirmLatch.set(null);
@@ -157,7 +157,7 @@ public class FailureTest {
Map<Long, Message> published = new ConcurrentHashMap<>();
Set<Message> confirmed = ConcurrentHashMap.newKeySet();
- Client.PublishConfirmListener publishConfirmListener = publishingId -> {
+ Client.PublishConfirmListener publishConfirmListener = (publisherId, publishingId) -> {
Message confirmedMessage;
int attempts = 0;
while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) {
@@ -226,7 +226,7 @@ public class FailureTest {
.messageBuilder().applicationProperties().entry("generation", generation.get())
.messageBuilder().build();
try {
- long publishingId = publisher.get().publish(stream, Collections.singletonList(message)).get(0);
+ long publishingId = publisher.get().publish(stream, (byte) 1, Collections.singletonList(message)).get(0);
published.put(publishingId, message);
} catch (Exception e) {
// keep going
@@ -321,7 +321,7 @@ public class FailureTest {
Map<Long, Message> published = new ConcurrentHashMap<>();
Set<Message> confirmed = ConcurrentHashMap.newKeySet();
Set<Long> confirmedIds = ConcurrentHashMap.newKeySet();
- Client.PublishConfirmListener publishConfirmListener = publishingId -> {
+ Client.PublishConfirmListener publishConfirmListener = (publisherId, publishingId) -> {
Message confirmedMessage;
int attempts = 0;
while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) {
@@ -349,7 +349,7 @@ public class FailureTest {
.messageBuilder().applicationProperties().entry("generation", generation.get())
.messageBuilder().build();
try {
- long publishingId = publisher.publish(stream, Collections.singletonList(message)).get(0);
+ long publishingId = publisher.publish(stream, (byte) 1, Collections.singletonList(message)).get(0);
published.put(publishingId, message);
} catch (Exception e) {
// keep going
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java
index b98c9017d7..ac5ba238e7 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java
@@ -69,9 +69,9 @@ public class StreamTest {
CountDownLatch publishingLatch = new CountDownLatch(messageCount);
Client publisher = cf.get(new Client.ClientParameters()
.port(publisherBroker.apply(streamMetadata).getPort())
- .publishConfirmListener(publishingId -> publishingLatch.countDown()));
+ .publishConfirmListener((publisherId, publishingId) -> publishingLatch.countDown()));
- IntStream.range(0, messageCount).forEach(i -> publisher.publish(stream, Collections.singletonList(
+ IntStream.range(0, messageCount).forEach(i -> publisher.publish(stream, (byte) 1, Collections.singletonList(
publisher.messageBuilder().addData(("hello " + i).getBytes(StandardCharsets.UTF_8)).build())));
assertThat(publishingLatch.await(10, TimeUnit.SECONDS)).isTrue();