diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-02-26 11:03:36 +0100 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-02-26 11:03:36 +0100 |
commit | 768a61792ef253667181442ef56ce7a88e7c75e6 (patch) | |
tree | ae0c90e95df4c14dfa61f312f792ff9ff08d7ce1 | |
parent | 3430906ad8c1109b5a9739b058ac957d4a9a8b3b (diff) | |
download | rabbitmq-server-git-768a61792ef253667181442ef56ce7a88e7c75e6.tar.gz |
Use MSB in keys to distinguish requests/responses
In stream protocol.
-rw-r--r-- | deps/rabbitmq_stream/docs/PROTOCOL.adoc | 15 | ||||
-rw-r--r-- | deps/rabbitmq_stream/include/rabbit_stream.hrl | 3 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 142 | ||||
-rw-r--r-- | deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl | 80 |
4 files changed, 168 insertions, 72 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index 4ed907ed33..dba0499caa 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -48,6 +48,15 @@ does not contain a correlation ID. Some responses may carry additional information than just the response code, this is specified in the command definition. +Keys are int16, but the actual value is defined on the last 15 bits, the most significant bit being +used to make the difference between a request (0) and a response (1). Example for `subscribe` +(key is 6): + +``` +0b00000000 00000110 => subscribe request +0b10000000 00000110 => subscribe response +``` + == Response Codes .Stream Protocol Response Codes @@ -559,7 +568,9 @@ Heartbeat => Key Version Version => int16 ``` -=== Route (experimental) +=== Route + +_Experimental_ ``` RouteQuery => Key Version CorrelationId RoutingKey SuperStream @@ -578,6 +589,8 @@ RouteResponse => Key Version CorrelationId Stream === Partitions (experimental) +_Experimental_ + ``` PartitionsQuery => Key Version CorrelationId SuperStream Key => int16 // 24 diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl index 6be20bb95f..802b2b823f 100644 --- a/deps/rabbitmq_stream/include/rabbit_stream.hrl +++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl @@ -24,6 +24,9 @@ -define(COMMAND_ROUTE, 23). -define(COMMAND_PARTITIONS, 24). +-define(REQUEST, 0). +-define(RESPONSE, 1). + -define(VERSION_0, 0). -define(RESPONSE_CODE_OK, 0). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index df5ccd0736..013d57bec0 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -294,7 +294,8 @@ listen_loop_pre_auth(Transport, case ConnectionStep of authenticated -> TuneFrame = - <<?COMMAND_TUNE:16, + <<?REQUEST:1, + ?COMMAND_TUNE:15, ?VERSION_0:16, FrameMax:32, Heartbeat:32>>, @@ -448,7 +449,8 @@ listen_loop_post_auth(Transport, FrameSize = 2 + 2 + 2 + 2 + StreamSize, Transport:send(S, [<<FrameSize:32, - ?COMMAND_METADATA_UPDATE:16, + ?REQUEST:1, + ?COMMAND_METADATA_UPDATE:15, ?VERSION_0:16, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16, StreamSize:16, @@ -483,7 +485,8 @@ listen_loop_post_auth(Transport, %% in practice, this should be necessary only for very large chunks and for very small frame size limits Transport:send(S, [<<FrameSize:32, - ?COMMAND_PUBLISH_CONFIRM:16, + ?REQUEST:1, + ?COMMAND_PUBLISH_CONFIRM:15, ?VERSION_0:16>>, <<CurrentPublisherId:8>>, <<Count:32>>, @@ -501,7 +504,9 @@ listen_loop_post_auth(Transport, {FirstPublisherId, <<>>, 0}, CorrelationList), FrameSize = 2 + 2 + 1 + 4 + LastCount * 8, Transport:send(S, - [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, + [<<FrameSize:32, + ?REQUEST:1, + ?COMMAND_PUBLISH_CONFIRM:15, ?VERSION_0:16>>, <<LastPublisherId:8>>, <<LastCount:32>>, @@ -547,7 +552,9 @@ listen_loop_post_auth(Transport, PublishingIdCount = length(CorrelationList), FrameSize = 2 + 2 + 1 + 4 + PublishingIdCount * 8, Transport:send(S, - [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, + [<<FrameSize:32, + ?REQUEST:1, + ?COMMAND_PUBLISH_CONFIRM:15, ?VERSION_0:16>>, <<PublisherId:8>>, <<PublishingIdCount:32>>, @@ -635,7 +642,7 @@ listen_loop_post_auth(Transport, State1, Configuration); heartbeat_send -> - Frame = <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>, + Frame = <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>, case catch frame(Transport, Connection, Frame) of ok -> listen_loop_post_auth(Transport, @@ -787,7 +794,8 @@ handle_inbound_data(Transport, CloseReason = <<"frame too large">>, CloseReasonLength = byte_size(CloseReason), CloseFrame = - <<?COMMAND_CLOSE:16, + <<?REQUEST:1, + ?COMMAND_CLOSE:15, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_FRAME_TOO_LARGE:16, @@ -841,7 +849,8 @@ generate_publishing_error_details(Acc, Code, handle_frame_pre_auth(Transport, #stream_connection{socket = S} = Connection, State, - <<?COMMAND_PEER_PROPERTIES:16, + <<?REQUEST:1, + ?COMMAND_PEER_PROPERTIES:15, ?VERSION_0:16, CorrelationId:32, ClientPropertiesCount:32, @@ -891,7 +900,8 @@ handle_frame_pre_auth(Transport, <<>>, ServerProperties), Frame = - <<?COMMAND_PEER_PROPERTIES:16, + <<?RESPONSE:1, + ?COMMAND_PEER_PROPERTIES:15, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_CODE_OK:16, @@ -907,7 +917,9 @@ handle_frame_pre_auth(Transport, handle_frame_pre_auth(Transport, #stream_connection{socket = S} = Connection, State, - <<?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, + <<?REQUEST:1, + ?COMMAND_SASL_HANDSHAKE:15, + ?VERSION_0:16, CorrelationId:32>>, Rest) -> Mechanisms = rabbit_stream_utils:auth_mechanisms(S), @@ -919,7 +931,8 @@ handle_frame_pre_auth(Transport, <<>>, Mechanisms), MechanismsCount = length(Mechanisms), Frame = - <<?COMMAND_SASL_HANDSHAKE:16, + <<?RESPONSE:1, + ?COMMAND_SASL_HANDSHAKE:15, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_CODE_OK:16, @@ -935,7 +948,8 @@ handle_frame_pre_auth(Transport, host = Host} = Connection0, State, - <<?COMMAND_SASL_AUTHENTICATE:16, + <<?REQUEST:1, + ?COMMAND_SASL_AUTHENTICATE:15, ?VERSION_0:16, CorrelationId:32, MechanismLength:16, @@ -1031,7 +1045,8 @@ handle_frame_pre_auth(Transport, end end, Frame = - <<?COMMAND_SASL_AUTHENTICATE:16, + <<?RESPONSE:1, + ?COMMAND_SASL_AUTHENTICATE:15, ?VERSION_0:16, CorrelationId:32, FrameFragment/binary>>, @@ -1039,7 +1054,8 @@ handle_frame_pre_auth(Transport, {C2, Rest}; {error, _} -> Frame = - <<?COMMAND_SASL_AUTHENTICATE:16, + <<?RESPONSE:1, + ?COMMAND_SASL_AUTHENTICATE:15, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_SASL_MECHANISM_NOT_SUPPORTED:16>>, @@ -1054,7 +1070,8 @@ handle_frame_pre_auth(_Transport, name = ConnectionName} = Connection, State, - <<?COMMAND_TUNE:16, + <<?RESPONSE:1, + ?COMMAND_TUNE:15, ?VERSION_0:16, FrameMax:32, Heartbeat:32>>, @@ -1088,7 +1105,8 @@ handle_frame_pre_auth(_Transport, handle_frame_pre_auth(Transport, #stream_connection{user = User, socket = S} = Connection, State, - <<?COMMAND_OPEN:16, + <<?REQUEST:1, + ?COMMAND_OPEN:15, ?VERSION_0:16, CorrelationId:32, VirtualHostLength:16, @@ -1101,7 +1119,8 @@ handle_frame_pre_auth(Transport, VirtualHost, {socket, S}, #{}), - F = <<?COMMAND_OPEN:16, + F = <<?RESPONSE:1, + ?COMMAND_OPEN:15, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_CODE_OK:16>>, @@ -1111,7 +1130,8 @@ handle_frame_pre_auth(Transport, F} catch exit:_ -> - Fr = <<?COMMAND_OPEN:16, + Fr = <<?RESPONSE:1, + ?COMMAND_OPEN:15, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_VHOST_ACCESS_FAILURE:16>>, @@ -1124,7 +1144,7 @@ handle_frame_pre_auth(Transport, handle_frame_pre_auth(_Transport, Connection, State, - <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>, + <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>, Rest) -> rabbit_log:info("Received heartbeat frame pre auth~n"), {Connection, State, Rest}; @@ -1172,7 +1192,8 @@ handle_frame_post_auth(Transport, publisher_to_ids = RefIds0} = Connection0, State, - <<?COMMAND_DECLARE_PUBLISHER:16, + <<?REQUEST:1, + ?COMMAND_DECLARE_PUBLISHER:15, ?VERSION_0:16, CorrelationId:32, PublisherId:8, @@ -1261,7 +1282,8 @@ handle_frame_post_auth(Transport, publishers = Publishers} = Connection, State, - <<?COMMAND_PUBLISH:16, + <<?REQUEST:1, + ?COMMAND_PUBLISH:15, ?VERSION_0:16, PublisherId:8/unsigned, MessageCount:32, @@ -1299,7 +1321,8 @@ handle_frame_post_auth(Transport, Messages), Transport:send(S, [<<FrameSize:32, - ?COMMAND_PUBLISH_ERROR:16, + ?REQUEST:1, + ?COMMAND_PUBLISH_ERROR:15, ?VERSION_0:16, PublisherId:8, MessageCount:32, @@ -1315,7 +1338,8 @@ handle_frame_post_auth(Transport, Messages), Transport:send(S, [<<FrameSize:32, - ?COMMAND_PUBLISH_ERROR:16, + ?REQUEST:1, + ?COMMAND_PUBLISH_ERROR:15, ?VERSION_0:16, PublisherId:8, MessageCount:32, @@ -1328,7 +1352,8 @@ handle_frame_post_auth(Transport, user = User} = Connection, State, - <<?COMMAND_QUERY_PUBLISHER_SEQUENCE:16, + <<?REQUEST:1, + ?COMMAND_QUERY_PUBLISHER_SEQUENCE:15, ?VERSION_0:16, CorrelationId:32, ReferenceSize:16, @@ -1364,7 +1389,9 @@ handle_frame_post_auth(Transport, {?RESPONSE_CODE_ACCESS_REFUSED, 0} end, Transport:send(S, - [<<FrameSize:32, ?COMMAND_QUERY_PUBLISHER_SEQUENCE:16, + [<<FrameSize:32, + ?RESPONSE:1, + ?COMMAND_QUERY_PUBLISHER_SEQUENCE:15, ?VERSION_0:16>>, <<CorrelationId:32>>, <<ResponseCode:16>>, @@ -1375,7 +1402,8 @@ handle_frame_post_auth(Transport, publisher_to_ids = PubToIds} = Connection0, State, - <<?COMMAND_DELETE_PUBLISHER:16, + <<?REQUEST:1, + ?COMMAND_DELETE_PUBLISHER:15, ?VERSION_0:16, CorrelationId:32, PublisherId:8>>, @@ -1418,7 +1446,8 @@ handle_frame_post_auth(Transport, send_file_oct = SendFileOct} = Connection, #stream_connection_state{consumers = Consumers} = State, - <<?COMMAND_SUBSCRIBE:16, + <<?REQUEST:1, + ?COMMAND_SUBSCRIBE:15, ?VERSION_0:16, CorrelationId:32, SubscriptionId:8/unsigned, @@ -1574,7 +1603,8 @@ handle_frame_post_auth(Transport, send_file_oct = SendFileOct} = Connection, #stream_connection_state{consumers = Consumers} = State, - <<?COMMAND_CREDIT:16, + <<?REQUEST:1, + ?COMMAND_CREDIT:15, ?VERSION_0:16, SubscriptionId:8/unsigned, Credit:16/signed>>, @@ -1599,7 +1629,8 @@ handle_frame_post_auth(Transport, rabbit_log:warning("Giving credit to unknown subscription: ~p~n", [SubscriptionId]), Frame = - <<?COMMAND_CREDIT:16, + <<?RESPONSE:1, + ?COMMAND_CREDIT:15, ?VERSION_0:16, ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST:16, SubscriptionId:8>>, @@ -1612,7 +1643,8 @@ handle_frame_post_auth(_Transport, user = User} = Connection, State, - <<?COMMAND_COMMIT_OFFSET:16, + <<?REQUEST:1, + ?COMMAND_COMMIT_OFFSET:15, ?VERSION_0:16, _CorrelationId:32, ReferenceSize:16, @@ -1651,7 +1683,8 @@ handle_frame_post_auth(Transport, user = User} = Connection, State, - <<?COMMAND_QUERY_OFFSET:16, + <<?REQUEST:1, + ?COMMAND_QUERY_OFFSET:15, ?VERSION_0:16, CorrelationId:32, ReferenceSize:16, @@ -1686,7 +1719,10 @@ handle_frame_post_auth(Transport, {?RESPONSE_CODE_ACCESS_REFUSED, 0} end, Transport:send(S, - [<<FrameSize:32, ?COMMAND_QUERY_OFFSET:16, ?VERSION_0:16>>, + [<<FrameSize:32, + ?RESPONSE:1, + ?COMMAND_QUERY_OFFSET:15, + ?VERSION_0:16>>, <<CorrelationId:32>>, <<ResponseCode:16>>, <<Offset:64>>]), @@ -1696,7 +1732,8 @@ handle_frame_post_auth(Transport, StreamSubscriptions} = Connection, #stream_connection_state{consumers = Consumers} = State, - <<?COMMAND_UNSUBSCRIBE:16, + <<?REQUEST:1, + ?COMMAND_UNSUBSCRIBE:15, ?VERSION_0:16, CorrelationId:32, SubscriptionId:8/unsigned>>, @@ -1748,7 +1785,8 @@ handle_frame_post_auth(Transport, User} = Connection, State, - <<?COMMAND_CREATE_STREAM:16, + <<?REQUEST:1, + ?COMMAND_CREATE_STREAM:15, ?VERSION_0:16, CorrelationId:32, StreamSize:16, @@ -1831,7 +1869,8 @@ handle_frame_post_auth(Transport, User} = Connection, State, - <<?COMMAND_DELETE_STREAM:16, + <<?REQUEST:1, + ?COMMAND_DELETE_STREAM:15, ?VERSION_0:16, CorrelationId:32, StreamSize:16, @@ -1862,7 +1901,8 @@ handle_frame_post_auth(Transport, FrameSize = 2 + 2 + 2 + 2 + StreamSize, Transport:send(S, [<<FrameSize:32, - ?COMMAND_METADATA_UPDATE:16, + ?REQUEST:1, + ?COMMAND_METADATA_UPDATE:15, ?VERSION_0:16, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16, StreamSize:16, @@ -1893,7 +1933,8 @@ handle_frame_post_auth(Transport, virtual_host = VirtualHost} = Connection, State, - <<?COMMAND_METADATA:16, + <<?REQUEST:1, + ?COMMAND_METADATA:15, ?VERSION_0:16, CorrelationId:32, StreamCount:32, @@ -2018,7 +2059,8 @@ handle_frame_post_auth(Transport, end, <<StreamCount:32>>, Streams), Frame = - <<?COMMAND_METADATA:16, + <<?RESPONSE:1, + ?COMMAND_METADATA:15, ?VERSION_0:16, CorrelationId:32, BrokersBin/binary, @@ -2101,7 +2143,8 @@ handle_frame_post_auth(Transport, handle_frame_post_auth(Transport, Connection, State, - <<?COMMAND_CLOSE:16, + <<?REQUEST:1, + ?COMMAND_CLOSE:15, ?VERSION_0:16, CorrelationId:32, ClosingCode:16, @@ -2111,7 +2154,8 @@ handle_frame_post_auth(Transport, rabbit_log:info("Received close command ~p ~p~n", [ClosingCode, ClosingReason]), Frame = - <<?COMMAND_CLOSE:16, + <<?RESPONSE:1, + ?COMMAND_CLOSE:15, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_CODE_OK:16>>, @@ -2121,7 +2165,7 @@ handle_frame_post_auth(Transport, handle_frame_post_auth(_Transport, Connection, State, - <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>, + <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>, Rest) -> rabbit_log:info("Received heartbeat frame post auth~n"), {Connection, State, Rest}; @@ -2131,7 +2175,8 @@ handle_frame_post_auth(Transport, Connection, State, Frame, Rest) -> CloseReason = <<"unknown frame">>, CloseReasonLength = byte_size(CloseReason), CloseFrame = - <<?COMMAND_CLOSE:16, + <<?REQUEST:1, + ?COMMAND_CLOSE:15, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_UNKNOWN_FRAME:16, @@ -2168,7 +2213,8 @@ notify_connection_closed(#stream_connection{name = Name, handle_frame_post_close(_Transport, Connection, State, - <<?COMMAND_CLOSE:16, + <<?RESPONSE:1, + ?COMMAND_CLOSE:15, ?VERSION_0:16, _CorrelationId:32, _ResponseCode:16>>, @@ -2179,7 +2225,7 @@ handle_frame_post_close(_Transport, handle_frame_post_close(_Transport, Connection, State, - <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>, + <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>, Rest) -> rabbit_log:info("Received heartbeat frame post close~n"), {Connection, State, Rest}; @@ -2380,7 +2426,10 @@ response(Transport, CorrelationId, ResponseCode) -> Transport:send(S, - [<<?RESPONSE_FRAME_SIZE:32, CommandId:16, ?VERSION_0:16>>, + [<<?RESPONSE_FRAME_SIZE:32, + ?RESPONSE:1, + CommandId:15, + ?VERSION_0:16>>, <<CorrelationId:32>>, <<ResponseCode:16>>]). subscription_exists(StreamSubscriptions, SubscriptionId) -> @@ -2399,7 +2448,8 @@ send_file_callback(Transport, FrameSize = 2 + 2 + 1 + Size, FrameBeginning = <<FrameSize:32, - ?COMMAND_DELIVER:16, + ?REQUEST:1, + ?COMMAND_DELIVER:15, ?VERSION_0:16, SubscriptionId:8/unsigned>>, Transport:send(S, FrameBeginning), diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 46903ecc1c..a3aa1c3ba6 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -183,13 +183,18 @@ test_server(Port) -> test_peer_properties(S) -> PeerPropertiesFrame = - <<?COMMAND_PEER_PROPERTIES:16, ?VERSION_0:16, 1:32, 0:32>>, + <<?REQUEST:1, + ?COMMAND_PEER_PROPERTIES:15, + ?VERSION_0:16, + 1:32, + 0:32>>, PeerPropertiesFrameSize = byte_size(PeerPropertiesFrame), gen_tcp:send(S, <<PeerPropertiesFrameSize:32, PeerPropertiesFrame/binary>>), {ok, <<_Size:32, - ?COMMAND_PEER_PROPERTIES:16, + ?RESPONSE:1, + ?COMMAND_PEER_PROPERTIES:15, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, @@ -198,7 +203,7 @@ test_peer_properties(S) -> test_authenticate(S) -> SaslHandshakeFrame = - <<?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, 1:32>>, + <<?REQUEST:1, ?COMMAND_SASL_HANDSHAKE:15, ?VERSION_0:16, 1:32>>, SaslHandshakeFrameSize = byte_size(SaslHandshakeFrame), gen_tcp:send(S, <<SaslHandshakeFrameSize:32, SaslHandshakeFrame/binary>>), @@ -209,7 +214,8 @@ test_authenticate(S) -> ok = case SaslAvailable of <<31:32, - ?COMMAND_SASL_HANDSHAKE:16, + ?RESPONSE:1, + ?COMMAND_SASL_HANDSHAKE:15, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, @@ -220,7 +226,8 @@ test_authenticate(S) -> AmqPlain:8/binary>> -> ok; <<31:32, - ?COMMAND_SASL_HANDSHAKE:16, + ?RESPONSE:1, + ?COMMAND_SASL_HANDSHAKE:15, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, @@ -241,7 +248,8 @@ test_authenticate(S) -> PlainSaslSize = byte_size(PlainSasl), SaslAuthenticateFrame = - <<?COMMAND_SASL_AUTHENTICATE:16, + <<?REQUEST:1, + ?COMMAND_SASL_AUTHENTICATE:15, ?VERSION_0:16, 2:32, 5:16, @@ -256,7 +264,8 @@ test_authenticate(S) -> SaslAuthenticateFrame/binary>>), {ok, <<10:32, - ?COMMAND_SASL_AUTHENTICATE:16, + ?RESPONSE:1, + ?COMMAND_SASL_AUTHENTICATE:15, ?VERSION_0:16, 2:32, ?RESPONSE_CODE_OK:16, @@ -265,7 +274,8 @@ test_authenticate(S) -> TuneExpected = <<12:32, - ?COMMAND_TUNE:16, + ?REQUEST:1, + ?COMMAND_TUNE:15, ?VERSION_0:16, ?DEFAULT_FRAME_MAX:32, ?DEFAULT_HEARTBEAT:32>>, @@ -277,14 +287,19 @@ test_authenticate(S) -> end, TuneFrame = - <<?COMMAND_TUNE:16, ?VERSION_0:16, ?DEFAULT_FRAME_MAX:32, 0:32>>, + <<?RESPONSE:1, + ?COMMAND_TUNE:15, + ?VERSION_0:16, + ?DEFAULT_FRAME_MAX:32, + 0:32>>, TuneFrameSize = byte_size(TuneFrame), gen_tcp:send(S, <<TuneFrameSize:32, TuneFrame/binary>>), VirtualHost = <<"/">>, VirtualHostLength = byte_size(VirtualHost), OpenFrame = - <<?COMMAND_OPEN:16, + <<?REQUEST:1, + ?COMMAND_OPEN:15, ?VERSION_0:16, 3:32, VirtualHostLength:16, @@ -293,7 +308,8 @@ test_authenticate(S) -> gen_tcp:send(S, <<OpenFrameSize:32, OpenFrame/binary>>), {ok, <<10:32, - ?COMMAND_OPEN:16, + ?RESPONSE:1, + ?COMMAND_OPEN:15, ?VERSION_0:16, 3:32, ?RESPONSE_CODE_OK:16>>} = @@ -302,7 +318,8 @@ test_authenticate(S) -> test_create_stream(S, Stream) -> StreamSize = byte_size(Stream), CreateStreamFrame = - <<?COMMAND_CREATE_STREAM:16, + <<?REQUEST:1, + ?COMMAND_CREATE_STREAM:15, ?VERSION_0:16, 1:32, StreamSize:16, @@ -312,7 +329,8 @@ test_create_stream(S, Stream) -> gen_tcp:send(S, <<FrameSize:32, CreateStreamFrame/binary>>), {ok, <<_Size:32, - ?COMMAND_CREATE_STREAM:16, + ?RESPONSE:1, + ?COMMAND_CREATE_STREAM:15, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = @@ -321,7 +339,8 @@ test_create_stream(S, Stream) -> test_delete_stream(S, Stream) -> StreamSize = byte_size(Stream), DeleteStreamFrame = - <<?COMMAND_DELETE_STREAM:16, + <<?REQUEST:1, + ?COMMAND_DELETE_STREAM:15, ?VERSION_0:16, 1:32, StreamSize:16, @@ -331,7 +350,8 @@ test_delete_stream(S, Stream) -> ResponseFrameSize = 10, {ok, <<ResponseFrameSize:32, - ?COMMAND_DELETE_STREAM:16, + ?RESPONSE:1, + ?COMMAND_DELETE_STREAM:15, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = @@ -340,7 +360,8 @@ test_delete_stream(S, Stream) -> test_declare_publisher(S, PublisherId, Stream) -> StreamSize = byte_size(Stream), DeclarePublisherFrame = - <<?COMMAND_DECLARE_PUBLISHER:16, + <<?REQUEST:1, + ?COMMAND_DECLARE_PUBLISHER:15, ?VERSION_0:16, 1:32, PublisherId:8, @@ -352,7 +373,8 @@ test_declare_publisher(S, PublisherId, Stream) -> Res = gen_tcp:recv(S, 0, 5000), {ok, <<_Size:32, - ?COMMAND_DECLARE_PUBLISHER:16, + ?RESPONSE:1, + ?COMMAND_DECLARE_PUBLISHER:15, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, @@ -363,7 +385,8 @@ test_declare_publisher(S, PublisherId, Stream) -> test_publish_confirm(S, PublisherId, Body) -> BodySize = byte_size(Body), PublishFrame = - <<?COMMAND_PUBLISH:16, + <<?REQUEST:1, + ?COMMAND_PUBLISH:15, ?VERSION_0:16, PublisherId:8, 1:32, @@ -374,7 +397,8 @@ test_publish_confirm(S, PublisherId, Body) -> gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>), {ok, <<_Size:32, - ?COMMAND_PUBLISH_CONFIRM:16, + ?REQUEST:1, + ?COMMAND_PUBLISH_CONFIRM:15, ?VERSION_0:16, PublisherId:8, 1:32, @@ -384,7 +408,8 @@ test_publish_confirm(S, PublisherId, Body) -> test_subscribe(S, SubscriptionId, Stream) -> StreamSize = byte_size(Stream), SubscribeFrame = - <<?COMMAND_SUBSCRIBE:16, + <<?REQUEST:1, + ?COMMAND_SUBSCRIBE:15, ?VERSION_0:16, 1:32, SubscriptionId:8, @@ -398,7 +423,8 @@ test_subscribe(S, SubscriptionId, Stream) -> Res = gen_tcp:recv(S, 0, 5000), {ok, <<_Size:32, - ?COMMAND_SUBSCRIBE:16, + ?RESPONSE:1, + ?COMMAND_SUBSCRIBE:15, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, @@ -410,7 +436,8 @@ test_deliver(S, Rest, SubscriptionId, Body) -> BodySize = byte_size(Body), Frame = read_frame(S, Rest), <<58:32, - ?COMMAND_DELIVER:16, + ?REQUEST:1, + ?COMMAND_DELIVER:15, ?VERSION_0:16, SubscriptionId:8, 5:4/unsigned, @@ -434,7 +461,8 @@ test_metadata_update_stream_deleted(S, Stream) -> FrameSize = 2 + 2 + 2 + 2 + StreamSize, {ok, <<FrameSize:32, - ?COMMAND_METADATA_UPDATE:16, + ?REQUEST:1, + ?COMMAND_METADATA_UPDATE:15, ?VERSION_0:16, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16, StreamSize:16, @@ -445,7 +473,8 @@ test_close(S) -> CloseReason = <<"OK">>, CloseReasonSize = byte_size(CloseReason), CloseFrame = - <<?COMMAND_CLOSE:16, + <<?REQUEST:1, + ?COMMAND_CLOSE:15, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16, @@ -455,7 +484,8 @@ test_close(S) -> gen_tcp:send(S, <<CloseFrameSize:32, CloseFrame/binary>>), {ok, <<10:32, - ?COMMAND_CLOSE:16, + ?RESPONSE:1, + ?COMMAND_CLOSE:15, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = |