summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-11-24 18:18:14 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-11-24 18:18:14 +0100
commit9859714f0e497a8ed8317519a95095d0661e1a54 (patch)
treee46ff406d972c72d3230b677b9f0db7860b316e8
parent8f97ea400a8276432b9b11b5f07217db42efc909 (diff)
downloadrabbitmq-server-git-9859714f0e497a8ed8317519a95095d0661e1a54.tar.gz
Declare publisher in stream test
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl26
1 files changed, 20 insertions, 6 deletions
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
index 4197b1de71..fdf8a3468b 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
@@ -114,8 +114,10 @@ test_server(Port) ->
test_authenticate(S),
Stream = <<"stream1">>,
test_create_stream(S, Stream),
+ PublisherId = 42,
+ test_declare_publisher(S, PublisherId, Stream),
Body = <<"hello">>,
- test_publish_confirm(S, Stream, Body),
+ test_publish_confirm(S, PublisherId, Stream, Body),
SubscriptionId = 42,
Rest = test_subscribe(S, SubscriptionId, Stream),
test_deliver(S, Rest, SubscriptionId, Body),
@@ -200,13 +202,25 @@ test_delete_stream(S, Stream) ->
ResponseFrameSize = 10,
{ok, <<ResponseFrameSize:32, ?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 4 + 10, 5000).
-test_publish_confirm(S, Stream, Body) ->
+test_declare_publisher(S, PublisherId, Stream) ->
+ StreamSize = byte_size(Stream),
+ DeclarePublisherFrame = <<?COMMAND_DECLARE_PUBLISHER:16, ?VERSION_0:16, 1:32, PublisherId:8,
+ 0:16, %% empty publisher reference
+ StreamSize:16, Stream:StreamSize/binary>>,
+ FrameSize = byte_size(DeclarePublisherFrame),
+ gen_tcp:send(S, <<FrameSize:32, DeclarePublisherFrame/binary>>),
+ Res = gen_tcp:recv(S, 0, 5000),
+ {ok, <<_Size:32, ?COMMAND_DECLARE_PUBLISHER:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, Rest/binary>>} = Res,
+ Rest.
+
+test_publish_confirm(S, PublisherId, Stream, Body) ->
BodySize = byte_size(Body),
StreamSize = byte_size(Stream),
- PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary, 42:8, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>,
+ PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary,
+ PublisherId:8, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>,
FrameSize = byte_size(PublishFrame),
gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>),
- {ok, <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, 42:8, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000).
+ {ok, <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, PublisherId:8, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000).
test_subscribe(S, SubscriptionId, Stream) ->
StreamSize = byte_size(Stream),
@@ -221,9 +235,9 @@ test_subscribe(S, SubscriptionId, Stream) ->
test_deliver(S, Rest, SubscriptionId, Body) ->
BodySize = byte_size(Body),
Frame = read_frame(S, Rest),
- <<54:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:8, 5:4/unsigned, 0:4/unsigned, 0:8,
+ <<58:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:8, 5:4/unsigned, 0:4/unsigned, 0:8,
1:16, 1:32,
- _Timestamp:64, _Epoch:64, 0:64, _Crc:32, _DataLength:32,
+ _Timestamp:64, _Epoch:64, 0:64, _Crc:32, _DataLength:32, _TrailerLength:32,
0:1, BodySize:31/unsigned, Body/binary>> = Frame.
test_metadata_update_stream_deleted(S, Stream) ->