diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-09-02 15:13:37 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-09-02 15:13:37 +0200 |
commit | 21d9aea96cec88c8bb9d62c5d40433b9675c62f4 (patch) | |
tree | 84a5177fa6b29bfd35cbe91dd272024a2de22879 | |
parent | b2f30a55ba87c14c12b3c2a407f7a5ac2594bd8c (diff) | |
download | rabbitmq-server-git-21d9aea96cec88c8bb9d62c5d40433b9675c62f4.tar.gz |
Add publisher ID
5 files changed, 49 insertions, 29 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index f61680c47f..c5613fe908 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -150,6 +150,7 @@ Publish => Key Version Stream PublishedMessages Key => int16 // 0 Version => int16 Stream => string // the name of the stream + PublisherId => int8 PublishedMessages => [PublishedMessage] PublishedMessage => PublishingId Message Message => bytes @@ -161,6 +162,7 @@ Publish => Key Version Stream PublishedMessages PublishConfirm => Key Version PublishingIds Key => int16 // 1 Version => int16 + PublisherId => int8 PublishingIds => [int64] // to correlate with the messages sent ``` @@ -236,6 +238,7 @@ Unsubscribe => Key Version CorrelationId SubscriptionId PublishError => Key Version [PublishingError] Key => int16 // 6 Version => int16 + PublisherId => int8 PublishingError => PublishingId Code PublishingId => int64 Code => int16 // code to identify the problem diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 401ea6c193..ad7c0c409f 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -221,13 +221,27 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, {Connection, State} end, listen_loop_post_auth(Transport, Connection1, State1, Configuration); - {'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, CorrelationIdList}}} -> - CorrelationIdBinaries = [<<CorrelationId:64>> || CorrelationId <- CorrelationIdList], - CorrelationIdCount = length(CorrelationIdList), - FrameSize = 2 + 2 + 4 + CorrelationIdCount * 8, - %% FIXME enforce max frame size - %% in practice, this should be necessary only for very large chunks and for very small frame size limits - Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16>>, <<CorrelationIdCount:32>>, CorrelationIdBinaries]), + {'$gen_cast', {queue_event, _QueueResource, {osiris_written, _QueueResource, CorrelationList}}} -> + {FirstPublisherId, _FirstPublishingId} = lists:nth(1, CorrelationList), + {LastPublisherId, LastPublishingIds, LastCount} = lists:foldl(fun({PublisherId, PublishingId}, {CurrentPublisherId, PublishingIds, Count}) -> + case PublisherId of + CurrentPublisherId -> + {CurrentPublisherId, [PublishingIds, <<PublishingId:64>>], Count + 1}; + OtherPublisherId -> + FrameSize = 2 + 2 + 1 + 4 + Count * 8, + %% FIXME enforce max frame size + %% in practice, this should be necessary only for very large chunks and for very small frame size limits + Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16>>, + <<CurrentPublisherId:8>>, + <<Count:32>>, PublishingIds]), + {OtherPublisherId, <<PublishingId:64>>, 1} + end + end, {FirstPublisherId, <<>>, 0}, CorrelationList), + FrameSize = 2 + 2 + 1 + 4 + LastCount * 8, + Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16>>, + <<LastPublisherId:8>>, + <<LastCount:32>>, LastPublishingIds]), + CorrelationIdCount = length(CorrelationList), add_credits(Credits, CorrelationIdCount), State1 = case Blocked of true -> @@ -364,16 +378,16 @@ handle_inbound_data(Transport, Connection, #stream_connection_state{data = Lefto %% see osiris_replica:parse_chunk/3 handle_inbound_data(Transport, Connection, State1, <<Leftover/binary, Data/binary>>, HandleFrameFun). -write_messages(_ClusterLeader, <<>>) -> +write_messages(_ClusterLeader, _PublisherId, <<>>) -> ok; -write_messages(ClusterLeader, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) -> +write_messages(ClusterLeader, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) -> % FIXME handle write error - ok = osiris:write(ClusterLeader, PublishingId, Message), - write_messages(ClusterLeader, Rest); -write_messages(ClusterLeader, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) -> + ok = osiris:write(ClusterLeader, {PublisherId, PublishingId}, Message), + write_messages(ClusterLeader, PublisherId, Rest); +write_messages(ClusterLeader, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) -> % FIXME handle write error - ok = osiris:write(ClusterLeader, PublishingId, {batch, MessageCount, CompressionType, Batch}), - write_messages(ClusterLeader, Rest). + ok = osiris:write(ClusterLeader, {PublisherId, PublishingId}, {batch, MessageCount, CompressionType, Batch}), + write_messages(ClusterLeader, PublisherId, Rest). generate_publishing_error_details(Acc, _Code, <<>>) -> Acc; @@ -543,25 +557,28 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credi virtual_host = VirtualHost, user = User} = Connection, State, <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary, + PublisherId:8/unsigned, MessageCount:32, Messages/binary>>, Rest) -> case check_write_permitted(#resource{name = Stream, kind = queue, virtual_host = VirtualHost}, User, #{}) of ok -> case lookup_leader(Stream, Connection) of cluster_not_found -> - FrameSize = 2 + 2 + 4 + (8 + 2) * MessageCount, + FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount, Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, Messages), Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_ERROR:16, ?VERSION_0:16, + PublisherId:8, MessageCount:32, Details/binary>>]), {Connection, State, Rest}; {ClusterLeader, Connection1} -> - write_messages(ClusterLeader, Messages), + write_messages(ClusterLeader, PublisherId, Messages), sub_credits(Credits, MessageCount), {Connection1, State, Rest} end; error -> - FrameSize = 2 + 2 + 4 + (8 + 2) * MessageCount, + FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount, Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_ACCESS_REFUSED, Messages), Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_ERROR:16, ?VERSION_0:16, + PublisherId:8, MessageCount:32, Details/binary>>]), {Connection, State, Rest} end; diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index b4f209876e..62a581349d 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -199,10 +199,10 @@ test_delete_stream(S, Stream) -> test_publish_confirm(S, Stream, Body) -> BodySize = byte_size(Body), StreamSize = byte_size(Stream), - PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>, + PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary, 42:8, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>, FrameSize = byte_size(PublishFrame), gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>), - {ok, <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000). + {ok, <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, 42:8, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000). test_subscribe(S, SubscriptionId, Stream) -> StreamSize = byte_size(Stream), diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java index 7b3dcab7aa..5c43b9d0f6 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java @@ -76,10 +76,10 @@ public class FailureTest { Client publisher = cf.get(new Client.ClientParameters() .port(replica.getPort()) .metadataListener((stream, code) -> metadataLatch.countDown()) - .publishConfirmListener(publishingId -> confirmLatch.get().countDown())); + .publishConfirmListener((publisherId, publishingId) -> confirmLatch.get().countDown())); String message = "all nodes available"; messages.add(message); - publisher.publish(stream, + publisher.publish(stream, (byte) 1, Collections.singletonList(publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build())); assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue(); confirmLatch.set(null); @@ -104,7 +104,7 @@ public class FailureTest { confirmLatch.set(new CountDownLatch(1)); message = "2 nodes available"; messages.add(message); - publisher.publish(stream, Collections.singletonList(publisher.messageBuilder() + publisher.publish(stream, (byte) 1, Collections.singletonList(publisher.messageBuilder() .addData(message.getBytes(StandardCharsets.UTF_8)).build())); assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue(); confirmLatch.set(null); @@ -121,7 +121,7 @@ public class FailureTest { confirmLatch.set(new CountDownLatch(1)); message = "all nodes are back"; messages.add(message); - publisher.publish(stream, Collections.singletonList(publisher.messageBuilder() + publisher.publish(stream, (byte) 1, Collections.singletonList(publisher.messageBuilder() .addData(message.getBytes(StandardCharsets.UTF_8)).build())); assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue(); confirmLatch.set(null); @@ -157,7 +157,7 @@ public class FailureTest { Map<Long, Message> published = new ConcurrentHashMap<>(); Set<Message> confirmed = ConcurrentHashMap.newKeySet(); - Client.PublishConfirmListener publishConfirmListener = publishingId -> { + Client.PublishConfirmListener publishConfirmListener = (publisherId, publishingId) -> { Message confirmedMessage; int attempts = 0; while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) { @@ -226,7 +226,7 @@ public class FailureTest { .messageBuilder().applicationProperties().entry("generation", generation.get()) .messageBuilder().build(); try { - long publishingId = publisher.get().publish(stream, Collections.singletonList(message)).get(0); + long publishingId = publisher.get().publish(stream, (byte) 1, Collections.singletonList(message)).get(0); published.put(publishingId, message); } catch (Exception e) { // keep going @@ -321,7 +321,7 @@ public class FailureTest { Map<Long, Message> published = new ConcurrentHashMap<>(); Set<Message> confirmed = ConcurrentHashMap.newKeySet(); Set<Long> confirmedIds = ConcurrentHashMap.newKeySet(); - Client.PublishConfirmListener publishConfirmListener = publishingId -> { + Client.PublishConfirmListener publishConfirmListener = (publisherId, publishingId) -> { Message confirmedMessage; int attempts = 0; while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) { @@ -349,7 +349,7 @@ public class FailureTest { .messageBuilder().applicationProperties().entry("generation", generation.get()) .messageBuilder().build(); try { - long publishingId = publisher.publish(stream, Collections.singletonList(message)).get(0); + long publishingId = publisher.publish(stream, (byte) 1, Collections.singletonList(message)).get(0); published.put(publishingId, message); } catch (Exception e) { // keep going diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java index b98c9017d7..ac5ba238e7 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java @@ -69,9 +69,9 @@ public class StreamTest { CountDownLatch publishingLatch = new CountDownLatch(messageCount); Client publisher = cf.get(new Client.ClientParameters() .port(publisherBroker.apply(streamMetadata).getPort()) - .publishConfirmListener(publishingId -> publishingLatch.countDown())); + .publishConfirmListener((publisherId, publishingId) -> publishingLatch.countDown())); - IntStream.range(0, messageCount).forEach(i -> publisher.publish(stream, Collections.singletonList( + IntStream.range(0, messageCount).forEach(i -> publisher.publish(stream, (byte) 1, Collections.singletonList( publisher.messageBuilder().addData(("hello " + i).getBytes(StandardCharsets.UTF_8)).build()))); assertThat(publishingLatch.await(10, TimeUnit.SECONDS)).isTrue(); |