summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-11-27 11:07:10 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-11-27 11:07:10 +0100
commit67308be13c2cac736c9b9184f7e00d01510e924f (patch)
tree79d6bd1de8dcbec78d83c8d9acfc1ecb6f18198c
parentbfa541a0959f1fd9d9a4870fe4698c3e3c2c8098 (diff)
downloadrabbitmq-server-git-67308be13c2cac736c9b9184f7e00d01510e924f.tar.gz
Remove stream field from publish command
Only the publisher ID is necessary now the publisher declaration is mandatory before publishing.
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl53
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl7
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java16
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java2
4 files changed, 35 insertions, 43 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 425d2dbb23..3aadd03f52 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -780,44 +780,37 @@ handle_frame_post_auth(Transport, #stream_connection{publishers = Publishers,
response(Transport, Connection0, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST),
{Connection0, State, Rest}
end;
-handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits,
- virtual_host = VirtualHost, user = User, publishers = Publishers} = Connection, State,
- <<?COMMAND_PUBLISH:16, ?VERSION_0:16,
- StreamSize:16, Stream:StreamSize/binary,
- PublisherId:8/unsigned,
- MessageCount:32, Messages/binary>>, Rest) ->
- %% FIXME fail messages if publisher is unknown
- {StreamToPublishTo, PublisherRef} = case Publishers of
+handle_frame_post_auth(Transport, #stream_connection{
+ socket = S, credits = Credits,
+ virtual_host = VirtualHost, user = User, publishers = Publishers} = Connection, State,
+ <<?COMMAND_PUBLISH:16, ?VERSION_0:16,
+ PublisherId:8/unsigned,
+ MessageCount:32, Messages/binary>>, Rest) ->
+ case Publishers of
#{PublisherId := Publisher} ->
- #publisher{stream = St, reference = R} = Publisher,
- {St, R};
- _ ->
- {Stream, undefined}
- end,
- case rabbit_stream_utils:check_write_permitted(
- #resource{name = StreamToPublishTo, kind = queue, virtual_host = VirtualHost},
- User,
- #{}) of
- ok ->
- case lookup_leader(StreamToPublishTo, Connection) of
- cluster_not_found ->
+ #publisher{stream = Stream, reference = Reference, leader = Leader} = Publisher,
+ case rabbit_stream_utils:check_write_permitted(
+ #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
+ User,
+ #{}) of
+ ok ->
+ rabbit_stream_utils:write_messages(Leader, Reference, PublisherId, Messages),
+ sub_credits(Credits, MessageCount),
+ {Connection, State, Rest};
+ error ->
FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount,
- Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, Messages),
+ Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_ACCESS_REFUSED, Messages),
Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_ERROR:16, ?VERSION_0:16,
PublisherId:8,
MessageCount:32, Details/binary>>]),
- {Connection, State, Rest};
- {ClusterLeader, Connection1} ->
- rabbit_stream_utils:write_messages(ClusterLeader, PublisherRef, PublisherId, Messages),
- sub_credits(Credits, MessageCount),
- {Connection1, State, Rest}
+ {Connection, State, Rest}
end;
- error ->
+ _ ->
FrameSize = 2 + 2 + 1 + 4 + (8 + 2) * MessageCount,
- Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_ACCESS_REFUSED, Messages),
+ Details = generate_publishing_error_details(<<>>, ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST, Messages),
Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_ERROR:16, ?VERSION_0:16,
- PublisherId:8,
- MessageCount:32, Details/binary>>]),
+ PublisherId:8,
+ MessageCount:32, Details/binary>>]),
{Connection, State, Rest}
end;
handle_frame_post_auth(Transport, #stream_connection{socket = Socket,
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
index fdf8a3468b..003a879718 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
@@ -117,7 +117,7 @@ test_server(Port) ->
PublisherId = 42,
test_declare_publisher(S, PublisherId, Stream),
Body = <<"hello">>,
- test_publish_confirm(S, PublisherId, Stream, Body),
+ test_publish_confirm(S, PublisherId, Body),
SubscriptionId = 42,
Rest = test_subscribe(S, SubscriptionId, Stream),
test_deliver(S, Rest, SubscriptionId, Body),
@@ -213,10 +213,9 @@ test_declare_publisher(S, PublisherId, Stream) ->
{ok, <<_Size:32, ?COMMAND_DECLARE_PUBLISHER:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, Rest/binary>>} = Res,
Rest.
-test_publish_confirm(S, PublisherId, Stream, Body) ->
+test_publish_confirm(S, PublisherId, Body) ->
BodySize = byte_size(Body),
- StreamSize = byte_size(Stream),
- PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary,
+ PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16,
PublisherId:8, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>,
FrameSize = byte_size(PublishFrame),
gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>),
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
index c7a390f00d..fcb7b26ad1 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java
@@ -79,8 +79,8 @@ public class FailureTest {
(publisherId, publishingId) -> confirmLatch.get().countDown()));
String message = "all nodes available";
messages.add(message);
+ publisher.declarePublisher((byte) 1, null, stream);
publisher.publish(
- stream,
(byte) 1,
Collections.singletonList(
publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build()));
@@ -109,8 +109,9 @@ public class FailureTest {
confirmLatch.set(new CountDownLatch(1));
message = "2 nodes available";
messages.add(message);
+
+ publisher.declarePublisher((byte) 1, null, stream);
publisher.publish(
- stream,
(byte) 1,
Collections.singletonList(
publisher
@@ -135,7 +136,6 @@ public class FailureTest {
message = "all nodes are back";
messages.add(message);
publisher.publish(
- stream,
(byte) 1,
Collections.singletonList(
publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build()));
@@ -233,6 +233,7 @@ public class FailureTest {
generation.incrementAndGet();
published.clear();
+ newPublisher.declarePublisher((byte) 1, null, stream);
publisher.set(newPublisher);
connected.set(true);
@@ -249,6 +250,7 @@ public class FailureTest {
.shutdownListener(shutdownListener)
.publishConfirmListener(publishConfirmListener));
+ client.declarePublisher((byte) 1, null, stream);
publisher.set(client);
AtomicBoolean keepPublishing = new AtomicBoolean(true);
@@ -270,10 +272,7 @@ public class FailureTest {
.build();
try {
long publishingId =
- publisher
- .get()
- .publish(stream, (byte) 1, Collections.singletonList(message))
- .get(0);
+ publisher.get().publish((byte) 1, Collections.singletonList(message)).get(0);
published.put(publishingId, message);
} catch (Exception e) {
// keep going
@@ -389,6 +388,7 @@ public class FailureTest {
.port(streamMetadata.getLeader().getPort())
.publishConfirmListener(publishConfirmListener));
+ publisher.declarePublisher((byte) 1, null, stream);
AtomicLong generation = new AtomicLong(0);
AtomicLong sequence = new AtomicLong(0);
AtomicBoolean keepPublishing = new AtomicBoolean(true);
@@ -408,7 +408,7 @@ public class FailureTest {
.build();
try {
long publishingId =
- publisher.publish(stream, (byte) 1, Collections.singletonList(message)).get(0);
+ publisher.publish((byte) 1, Collections.singletonList(message)).get(0);
published.put(publishingId, message);
} catch (Exception e) {
// keep going
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java
index 08024a12bf..1af7513d28 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java
@@ -92,11 +92,11 @@ public class StreamTest {
.publishConfirmListener(
(publisherId, publishingId) -> publishingLatch.countDown()));
+ publisher.declarePublisher((byte) 1, null, stream);
IntStream.range(0, messageCount)
.forEach(
i ->
publisher.publish(
- stream,
(byte) 1,
Collections.singletonList(
publisher