diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-11-24 18:18:14 +0100 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-11-24 18:18:14 +0100 |
commit | 9859714f0e497a8ed8317519a95095d0661e1a54 (patch) | |
tree | e46ff406d972c72d3230b677b9f0db7860b316e8 | |
parent | 8f97ea400a8276432b9b11b5f07217db42efc909 (diff) | |
download | rabbitmq-server-git-9859714f0e497a8ed8317519a95095d0661e1a54.tar.gz |
Declare publisher in stream test
-rw-r--r-- | deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl | 26 |
1 files changed, 20 insertions, 6 deletions
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 4197b1de71..fdf8a3468b 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -114,8 +114,10 @@ test_server(Port) -> test_authenticate(S), Stream = <<"stream1">>, test_create_stream(S, Stream), + PublisherId = 42, + test_declare_publisher(S, PublisherId, Stream), Body = <<"hello">>, - test_publish_confirm(S, Stream, Body), + test_publish_confirm(S, PublisherId, Stream, Body), SubscriptionId = 42, Rest = test_subscribe(S, SubscriptionId, Stream), test_deliver(S, Rest, SubscriptionId, Body), @@ -200,13 +202,25 @@ test_delete_stream(S, Stream) -> ResponseFrameSize = 10, {ok, <<ResponseFrameSize:32, ?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 4 + 10, 5000). -test_publish_confirm(S, Stream, Body) -> +test_declare_publisher(S, PublisherId, Stream) -> + StreamSize = byte_size(Stream), + DeclarePublisherFrame = <<?COMMAND_DECLARE_PUBLISHER:16, ?VERSION_0:16, 1:32, PublisherId:8, + 0:16, %% empty publisher reference + StreamSize:16, Stream:StreamSize/binary>>, + FrameSize = byte_size(DeclarePublisherFrame), + gen_tcp:send(S, <<FrameSize:32, DeclarePublisherFrame/binary>>), + Res = gen_tcp:recv(S, 0, 5000), + {ok, <<_Size:32, ?COMMAND_DECLARE_PUBLISHER:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, Rest/binary>>} = Res, + Rest. + +test_publish_confirm(S, PublisherId, Stream, Body) -> BodySize = byte_size(Body), StreamSize = byte_size(Stream), - PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary, 42:8, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>, + PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary, + PublisherId:8, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>, FrameSize = byte_size(PublishFrame), gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>), - {ok, <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, 42:8, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000). + {ok, <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, PublisherId:8, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000). test_subscribe(S, SubscriptionId, Stream) -> StreamSize = byte_size(Stream), @@ -221,9 +235,9 @@ test_subscribe(S, SubscriptionId, Stream) -> test_deliver(S, Rest, SubscriptionId, Body) -> BodySize = byte_size(Body), Frame = read_frame(S, Rest), - <<54:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:8, 5:4/unsigned, 0:4/unsigned, 0:8, + <<58:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:8, 5:4/unsigned, 0:4/unsigned, 0:8, 1:16, 1:32, - _Timestamp:64, _Epoch:64, 0:64, _Crc:32, _DataLength:32, + _Timestamp:64, _Epoch:64, 0:64, _Crc:32, _DataLength:32, _TrailerLength:32, 0:1, BodySize:31/unsigned, Body/binary>> = Frame. test_metadata_update_stream_deleted(S, Stream) -> |