diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-11-27 11:07:10 +0100 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-11-27 11:07:10 +0100 |
commit | 67308be13c2cac736c9b9184f7e00d01510e924f (patch) | |
tree | 79d6bd1de8dcbec78d83c8d9acfc1ecb6f18198c | |
parent | bfa541a0959f1fd9d9a4870fe4698c3e3c2c8098 (diff) | |
download | rabbitmq-server-git-67308be13c2cac736c9b9184f7e00d01510e924f.tar.gz |
Remove stream field from publish command
Only the publisher ID is necessary now the publisher declaration is
mandatory before publishing.
4 files changed, 35 insertions, 43 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 425d2dbb23..3aadd03f52 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -780,44 +780,37 @@ handle_frame_post_auth(Transport, #stream_connection{publishers = Publishers, response(Transport, Connection0, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST), {Connection0, State, Rest} end; -handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits, - virtual_host = VirtualHost, user = User, publishers = Publishers} = Connection, State, - <<?COMMAND_PUBLISH:16, ?VERSION_0:16, - StreamSize:16, Stream:StreamSize/binary, - PublisherId:8/unsigned, - MessageCount:32, Messages/binary>>, Rest) -> - %% FIXME fail messages if publisher is unknown - {StreamToPublishTo, PublisherRef} = case Publishers of +handle_frame_post_auth(Transport, #stream_connection{ + socket = S, credits = Credits, + virtual_host = VirtualHost, user = User, publishers = Publishers} = Connection, State, + <<?COMMAND_PUBLISH:16, ?VERSION_0:16, + PublisherId:8/unsigned, + MessageCount:32, Messages/binary>>, Rest) -> + case Publishers of #{PublisherId := Publisher} -> - #publisher{stream = St, reference = R} = Publisher, - {St, R}; - _ -> - {Stream, undefined} - end, - case rabbit_stream_utils:check_write_permitted( - #resource{name = StreamToPublishTo, kind = queue, virtual_host = VirtualHost}, - User, - #{}) of - ok -> - case lookup_leader(StreamToPublishTo, Connection) of - cluster_not_found -> + #publisher{stream = Stream, reference = Reference, leader = Leader} = Publisher, + case rabbit_stream_utils:check_write_permitted( + #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, + User, + #{}) of + ok -> + rabbit_stream_utils:write_messages(Leader, Reference, PublisherId, Messages), + sub_credits(Credits, MessageCount), + {Connection, State, Rest}; + error -> FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount, - Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, Messages), + Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_ACCESS_REFUSED, Messages), Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_ERROR:16, ?VERSION_0:16, PublisherId:8, MessageCount:32, Details/binary>>]), - {Connection, State, Rest}; - {ClusterLeader, Connection1} -> - rabbit_stream_utils:write_messages(ClusterLeader, PublisherRef, PublisherId, Messages), - sub_credits(Credits, MessageCount), - {Connection1, State, Rest} + {Connection, State, Rest} end; - error -> + _ -> FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount, - Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_ACCESS_REFUSED, Messages), + Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST, Messages), Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_ERROR:16, ?VERSION_0:16, - PublisherId:8, - MessageCount:32, Details/binary>>]), + PublisherId:8, + MessageCount:32, Details/binary>>]), {Connection, State, Rest} end; handle_frame_post_auth(Transport, #stream_connection{socket = Socket, diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index fdf8a3468b..003a879718 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -117,7 +117,7 @@ test_server(Port) -> PublisherId = 42, test_declare_publisher(S, PublisherId, Stream), Body = <<"hello">>, - test_publish_confirm(S, PublisherId, Stream, Body), + test_publish_confirm(S, PublisherId, Body), SubscriptionId = 42, Rest = test_subscribe(S, SubscriptionId, Stream), test_deliver(S, Rest, SubscriptionId, Body), @@ -213,10 +213,9 @@ test_declare_publisher(S, PublisherId, Stream) -> {ok, <<_Size:32, ?COMMAND_DECLARE_PUBLISHER:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, Rest/binary>>} = Res, Rest. -test_publish_confirm(S, PublisherId, Stream, Body) -> +test_publish_confirm(S, PublisherId, Body) -> BodySize = byte_size(Body), - StreamSize = byte_size(Stream), - PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary, + PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, PublisherId:8, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>, FrameSize = byte_size(PublishFrame), gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>), diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java index c7a390f00d..fcb7b26ad1 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java @@ -79,8 +79,8 @@ public class FailureTest { (publisherId, publishingId) -> confirmLatch.get().countDown())); String message = "all nodes available"; messages.add(message); + publisher.declarePublisher((byte) 1, null, stream); publisher.publish( - stream, (byte) 1, Collections.singletonList( publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build())); @@ -109,8 +109,9 @@ public class FailureTest { confirmLatch.set(new CountDownLatch(1)); message = "2 nodes available"; messages.add(message); + + publisher.declarePublisher((byte) 1, null, stream); publisher.publish( - stream, (byte) 1, Collections.singletonList( publisher @@ -135,7 +136,6 @@ public class FailureTest { message = "all nodes are back"; messages.add(message); publisher.publish( - stream, (byte) 1, Collections.singletonList( publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build())); @@ -233,6 +233,7 @@ public class FailureTest { generation.incrementAndGet(); published.clear(); + newPublisher.declarePublisher((byte) 1, null, stream); publisher.set(newPublisher); connected.set(true); @@ -249,6 +250,7 @@ public class FailureTest { .shutdownListener(shutdownListener) .publishConfirmListener(publishConfirmListener)); + client.declarePublisher((byte) 1, null, stream); publisher.set(client); AtomicBoolean keepPublishing = new AtomicBoolean(true); @@ -270,10 +272,7 @@ public class FailureTest { .build(); try { long publishingId = - publisher - .get() - .publish(stream, (byte) 1, Collections.singletonList(message)) - .get(0); + publisher.get().publish((byte) 1, Collections.singletonList(message)).get(0); published.put(publishingId, message); } catch (Exception e) { // keep going @@ -389,6 +388,7 @@ public class FailureTest { .port(streamMetadata.getLeader().getPort()) .publishConfirmListener(publishConfirmListener)); + publisher.declarePublisher((byte) 1, null, stream); AtomicLong generation = new AtomicLong(0); AtomicLong sequence = new AtomicLong(0); AtomicBoolean keepPublishing = new AtomicBoolean(true); @@ -408,7 +408,7 @@ public class FailureTest { .build(); try { long publishingId = - publisher.publish(stream, (byte) 1, Collections.singletonList(message)).get(0); + publisher.publish((byte) 1, Collections.singletonList(message)).get(0); published.put(publishingId, message); } catch (Exception e) { // keep going diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java index 08024a12bf..1af7513d28 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java @@ -92,11 +92,11 @@ public class StreamTest { .publishConfirmListener( (publisherId, publishingId) -> publishingLatch.countDown())); + publisher.declarePublisher((byte) 1, null, stream); IntStream.range(0, messageCount) .forEach( i -> publisher.publish( - stream, (byte) 1, Collections.singletonList( publisher |