summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-02-26 11:03:36 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-02-26 11:03:36 +0100
commit768a61792ef253667181442ef56ce7a88e7c75e6 (patch)
treeae0c90e95df4c14dfa61f312f792ff9ff08d7ce1
parent3430906ad8c1109b5a9739b058ac957d4a9a8b3b (diff)
downloadrabbitmq-server-git-768a61792ef253667181442ef56ce7a88e7c75e6.tar.gz
Use MSB in keys to distinguish requests/responses
In stream protocol.
-rw-r--r--deps/rabbitmq_stream/docs/PROTOCOL.adoc15
-rw-r--r--deps/rabbitmq_stream/include/rabbit_stream.hrl3
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl142
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl80
4 files changed, 168 insertions, 72 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
index 4ed907ed33..dba0499caa 100644
--- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc
+++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
@@ -48,6 +48,15 @@ does not contain a correlation ID.
Some responses may carry additional information than just the response code, this is specified in the command definition.
+Keys are int16, but the actual value is defined on the last 15 bits, the most significant bit being
+used to make the difference between a request (0) and a response (1). Example for `subscribe`
+(key is 6):
+
+```
+0b00000000 00000110 => subscribe request
+0b10000000 00000110 => subscribe response
+```
+
== Response Codes
.Stream Protocol Response Codes
@@ -559,7 +568,9 @@ Heartbeat => Key Version
Version => int16
```
-=== Route (experimental)
+=== Route
+
+_Experimental_
```
RouteQuery => Key Version CorrelationId RoutingKey SuperStream
@@ -578,6 +589,8 @@ RouteResponse => Key Version CorrelationId Stream
=== Partitions (experimental)
+_Experimental_
+
```
PartitionsQuery => Key Version CorrelationId SuperStream
Key => int16 // 24
diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl
index 6be20bb95f..802b2b823f 100644
--- a/deps/rabbitmq_stream/include/rabbit_stream.hrl
+++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl
@@ -24,6 +24,9 @@
-define(COMMAND_ROUTE, 23).
-define(COMMAND_PARTITIONS, 24).
+-define(REQUEST, 0).
+-define(RESPONSE, 1).
+
-define(VERSION_0, 0).
-define(RESPONSE_CODE_OK, 0).
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index df5ccd0736..013d57bec0 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -294,7 +294,8 @@ listen_loop_pre_auth(Transport,
case ConnectionStep of
authenticated ->
TuneFrame =
- <<?COMMAND_TUNE:16,
+ <<?REQUEST:1,
+ ?COMMAND_TUNE:15,
?VERSION_0:16,
FrameMax:32,
Heartbeat:32>>,
@@ -448,7 +449,8 @@ listen_loop_post_auth(Transport,
FrameSize = 2 + 2 + 2 + 2 + StreamSize,
Transport:send(S,
[<<FrameSize:32,
- ?COMMAND_METADATA_UPDATE:16,
+ ?REQUEST:1,
+ ?COMMAND_METADATA_UPDATE:15,
?VERSION_0:16,
?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16,
StreamSize:16,
@@ -483,7 +485,8 @@ listen_loop_post_auth(Transport,
%% in practice, this should be necessary only for very large chunks and for very small frame size limits
Transport:send(S,
[<<FrameSize:32,
- ?COMMAND_PUBLISH_CONFIRM:16,
+ ?REQUEST:1,
+ ?COMMAND_PUBLISH_CONFIRM:15,
?VERSION_0:16>>,
<<CurrentPublisherId:8>>,
<<Count:32>>,
@@ -501,7 +504,9 @@ listen_loop_post_auth(Transport,
{FirstPublisherId, <<>>, 0}, CorrelationList),
FrameSize = 2 + 2 + 1 + 4 + LastCount * 8,
Transport:send(S,
- [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16,
+ [<<FrameSize:32,
+ ?REQUEST:1,
+ ?COMMAND_PUBLISH_CONFIRM:15,
?VERSION_0:16>>,
<<LastPublisherId:8>>,
<<LastCount:32>>,
@@ -547,7 +552,9 @@ listen_loop_post_auth(Transport,
PublishingIdCount = length(CorrelationList),
FrameSize = 2 + 2 + 1 + 4 + PublishingIdCount * 8,
Transport:send(S,
- [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16,
+ [<<FrameSize:32,
+ ?REQUEST:1,
+ ?COMMAND_PUBLISH_CONFIRM:15,
?VERSION_0:16>>,
<<PublisherId:8>>,
<<PublishingIdCount:32>>,
@@ -635,7 +642,7 @@ listen_loop_post_auth(Transport,
State1,
Configuration);
heartbeat_send ->
- Frame = <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>,
+ Frame = <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>,
case catch frame(Transport, Connection, Frame) of
ok ->
listen_loop_post_auth(Transport,
@@ -787,7 +794,8 @@ handle_inbound_data(Transport,
CloseReason = <<"frame too large">>,
CloseReasonLength = byte_size(CloseReason),
CloseFrame =
- <<?COMMAND_CLOSE:16,
+ <<?REQUEST:1,
+ ?COMMAND_CLOSE:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_FRAME_TOO_LARGE:16,
@@ -841,7 +849,8 @@ generate_publishing_error_details(Acc, Code,
handle_frame_pre_auth(Transport,
#stream_connection{socket = S} = Connection,
State,
- <<?COMMAND_PEER_PROPERTIES:16,
+ <<?REQUEST:1,
+ ?COMMAND_PEER_PROPERTIES:15,
?VERSION_0:16,
CorrelationId:32,
ClientPropertiesCount:32,
@@ -891,7 +900,8 @@ handle_frame_pre_auth(Transport,
<<>>, ServerProperties),
Frame =
- <<?COMMAND_PEER_PROPERTIES:16,
+ <<?RESPONSE:1,
+ ?COMMAND_PEER_PROPERTIES:15,
?VERSION_0:16,
CorrelationId:32,
?RESPONSE_CODE_OK:16,
@@ -907,7 +917,9 @@ handle_frame_pre_auth(Transport,
handle_frame_pre_auth(Transport,
#stream_connection{socket = S} = Connection,
State,
- <<?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16,
+ <<?REQUEST:1,
+ ?COMMAND_SASL_HANDSHAKE:15,
+ ?VERSION_0:16,
CorrelationId:32>>,
Rest) ->
Mechanisms = rabbit_stream_utils:auth_mechanisms(S),
@@ -919,7 +931,8 @@ handle_frame_pre_auth(Transport,
<<>>, Mechanisms),
MechanismsCount = length(Mechanisms),
Frame =
- <<?COMMAND_SASL_HANDSHAKE:16,
+ <<?RESPONSE:1,
+ ?COMMAND_SASL_HANDSHAKE:15,
?VERSION_0:16,
CorrelationId:32,
?RESPONSE_CODE_OK:16,
@@ -935,7 +948,8 @@ handle_frame_pre_auth(Transport,
host = Host} =
Connection0,
State,
- <<?COMMAND_SASL_AUTHENTICATE:16,
+ <<?REQUEST:1,
+ ?COMMAND_SASL_AUTHENTICATE:15,
?VERSION_0:16,
CorrelationId:32,
MechanismLength:16,
@@ -1031,7 +1045,8 @@ handle_frame_pre_auth(Transport,
end
end,
Frame =
- <<?COMMAND_SASL_AUTHENTICATE:16,
+ <<?RESPONSE:1,
+ ?COMMAND_SASL_AUTHENTICATE:15,
?VERSION_0:16,
CorrelationId:32,
FrameFragment/binary>>,
@@ -1039,7 +1054,8 @@ handle_frame_pre_auth(Transport,
{C2, Rest};
{error, _} ->
Frame =
- <<?COMMAND_SASL_AUTHENTICATE:16,
+ <<?RESPONSE:1,
+ ?COMMAND_SASL_AUTHENTICATE:15,
?VERSION_0:16,
CorrelationId:32,
?RESPONSE_SASL_MECHANISM_NOT_SUPPORTED:16>>,
@@ -1054,7 +1070,8 @@ handle_frame_pre_auth(_Transport,
name = ConnectionName} =
Connection,
State,
- <<?COMMAND_TUNE:16,
+ <<?RESPONSE:1,
+ ?COMMAND_TUNE:15,
?VERSION_0:16,
FrameMax:32,
Heartbeat:32>>,
@@ -1088,7 +1105,8 @@ handle_frame_pre_auth(_Transport,
handle_frame_pre_auth(Transport,
#stream_connection{user = User, socket = S} = Connection,
State,
- <<?COMMAND_OPEN:16,
+ <<?REQUEST:1,
+ ?COMMAND_OPEN:15,
?VERSION_0:16,
CorrelationId:32,
VirtualHostLength:16,
@@ -1101,7 +1119,8 @@ handle_frame_pre_auth(Transport,
VirtualHost,
{socket, S},
#{}),
- F = <<?COMMAND_OPEN:16,
+ F = <<?RESPONSE:1,
+ ?COMMAND_OPEN:15,
?VERSION_0:16,
CorrelationId:32,
?RESPONSE_CODE_OK:16>>,
@@ -1111,7 +1130,8 @@ handle_frame_pre_auth(Transport,
F}
catch
exit:_ ->
- Fr = <<?COMMAND_OPEN:16,
+ Fr = <<?RESPONSE:1,
+ ?COMMAND_OPEN:15,
?VERSION_0:16,
CorrelationId:32,
?RESPONSE_VHOST_ACCESS_FAILURE:16>>,
@@ -1124,7 +1144,7 @@ handle_frame_pre_auth(Transport,
handle_frame_pre_auth(_Transport,
Connection,
State,
- <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>,
+ <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>,
Rest) ->
rabbit_log:info("Received heartbeat frame pre auth~n"),
{Connection, State, Rest};
@@ -1172,7 +1192,8 @@ handle_frame_post_auth(Transport,
publisher_to_ids = RefIds0} =
Connection0,
State,
- <<?COMMAND_DECLARE_PUBLISHER:16,
+ <<?REQUEST:1,
+ ?COMMAND_DECLARE_PUBLISHER:15,
?VERSION_0:16,
CorrelationId:32,
PublisherId:8,
@@ -1261,7 +1282,8 @@ handle_frame_post_auth(Transport,
publishers = Publishers} =
Connection,
State,
- <<?COMMAND_PUBLISH:16,
+ <<?REQUEST:1,
+ ?COMMAND_PUBLISH:15,
?VERSION_0:16,
PublisherId:8/unsigned,
MessageCount:32,
@@ -1299,7 +1321,8 @@ handle_frame_post_auth(Transport,
Messages),
Transport:send(S,
[<<FrameSize:32,
- ?COMMAND_PUBLISH_ERROR:16,
+ ?REQUEST:1,
+ ?COMMAND_PUBLISH_ERROR:15,
?VERSION_0:16,
PublisherId:8,
MessageCount:32,
@@ -1315,7 +1338,8 @@ handle_frame_post_auth(Transport,
Messages),
Transport:send(S,
[<<FrameSize:32,
- ?COMMAND_PUBLISH_ERROR:16,
+ ?REQUEST:1,
+ ?COMMAND_PUBLISH_ERROR:15,
?VERSION_0:16,
PublisherId:8,
MessageCount:32,
@@ -1328,7 +1352,8 @@ handle_frame_post_auth(Transport,
user = User} =
Connection,
State,
- <<?COMMAND_QUERY_PUBLISHER_SEQUENCE:16,
+ <<?REQUEST:1,
+ ?COMMAND_QUERY_PUBLISHER_SEQUENCE:15,
?VERSION_0:16,
CorrelationId:32,
ReferenceSize:16,
@@ -1364,7 +1389,9 @@ handle_frame_post_auth(Transport,
{?RESPONSE_CODE_ACCESS_REFUSED, 0}
end,
Transport:send(S,
- [<<FrameSize:32, ?COMMAND_QUERY_PUBLISHER_SEQUENCE:16,
+ [<<FrameSize:32,
+ ?RESPONSE:1,
+ ?COMMAND_QUERY_PUBLISHER_SEQUENCE:15,
?VERSION_0:16>>,
<<CorrelationId:32>>,
<<ResponseCode:16>>,
@@ -1375,7 +1402,8 @@ handle_frame_post_auth(Transport,
publisher_to_ids = PubToIds} =
Connection0,
State,
- <<?COMMAND_DELETE_PUBLISHER:16,
+ <<?REQUEST:1,
+ ?COMMAND_DELETE_PUBLISHER:15,
?VERSION_0:16,
CorrelationId:32,
PublisherId:8>>,
@@ -1418,7 +1446,8 @@ handle_frame_post_auth(Transport,
send_file_oct = SendFileOct} =
Connection,
#stream_connection_state{consumers = Consumers} = State,
- <<?COMMAND_SUBSCRIBE:16,
+ <<?REQUEST:1,
+ ?COMMAND_SUBSCRIBE:15,
?VERSION_0:16,
CorrelationId:32,
SubscriptionId:8/unsigned,
@@ -1574,7 +1603,8 @@ handle_frame_post_auth(Transport,
send_file_oct = SendFileOct} =
Connection,
#stream_connection_state{consumers = Consumers} = State,
- <<?COMMAND_CREDIT:16,
+ <<?REQUEST:1,
+ ?COMMAND_CREDIT:15,
?VERSION_0:16,
SubscriptionId:8/unsigned,
Credit:16/signed>>,
@@ -1599,7 +1629,8 @@ handle_frame_post_auth(Transport,
rabbit_log:warning("Giving credit to unknown subscription: ~p~n",
[SubscriptionId]),
Frame =
- <<?COMMAND_CREDIT:16,
+ <<?RESPONSE:1,
+ ?COMMAND_CREDIT:15,
?VERSION_0:16,
?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST:16,
SubscriptionId:8>>,
@@ -1612,7 +1643,8 @@ handle_frame_post_auth(_Transport,
user = User} =
Connection,
State,
- <<?COMMAND_COMMIT_OFFSET:16,
+ <<?REQUEST:1,
+ ?COMMAND_COMMIT_OFFSET:15,
?VERSION_0:16,
_CorrelationId:32,
ReferenceSize:16,
@@ -1651,7 +1683,8 @@ handle_frame_post_auth(Transport,
user = User} =
Connection,
State,
- <<?COMMAND_QUERY_OFFSET:16,
+ <<?REQUEST:1,
+ ?COMMAND_QUERY_OFFSET:15,
?VERSION_0:16,
CorrelationId:32,
ReferenceSize:16,
@@ -1686,7 +1719,10 @@ handle_frame_post_auth(Transport,
{?RESPONSE_CODE_ACCESS_REFUSED, 0}
end,
Transport:send(S,
- [<<FrameSize:32, ?COMMAND_QUERY_OFFSET:16, ?VERSION_0:16>>,
+ [<<FrameSize:32,
+ ?RESPONSE:1,
+ ?COMMAND_QUERY_OFFSET:15,
+ ?VERSION_0:16>>,
<<CorrelationId:32>>,
<<ResponseCode:16>>,
<<Offset:64>>]),
@@ -1696,7 +1732,8 @@ handle_frame_post_auth(Transport,
StreamSubscriptions} =
Connection,
#stream_connection_state{consumers = Consumers} = State,
- <<?COMMAND_UNSUBSCRIBE:16,
+ <<?REQUEST:1,
+ ?COMMAND_UNSUBSCRIBE:15,
?VERSION_0:16,
CorrelationId:32,
SubscriptionId:8/unsigned>>,
@@ -1748,7 +1785,8 @@ handle_frame_post_auth(Transport,
User} =
Connection,
State,
- <<?COMMAND_CREATE_STREAM:16,
+ <<?REQUEST:1,
+ ?COMMAND_CREATE_STREAM:15,
?VERSION_0:16,
CorrelationId:32,
StreamSize:16,
@@ -1831,7 +1869,8 @@ handle_frame_post_auth(Transport,
User} =
Connection,
State,
- <<?COMMAND_DELETE_STREAM:16,
+ <<?REQUEST:1,
+ ?COMMAND_DELETE_STREAM:15,
?VERSION_0:16,
CorrelationId:32,
StreamSize:16,
@@ -1862,7 +1901,8 @@ handle_frame_post_auth(Transport,
FrameSize = 2 + 2 + 2 + 2 + StreamSize,
Transport:send(S,
[<<FrameSize:32,
- ?COMMAND_METADATA_UPDATE:16,
+ ?REQUEST:1,
+ ?COMMAND_METADATA_UPDATE:15,
?VERSION_0:16,
?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16,
StreamSize:16,
@@ -1893,7 +1933,8 @@ handle_frame_post_auth(Transport,
virtual_host = VirtualHost} =
Connection,
State,
- <<?COMMAND_METADATA:16,
+ <<?REQUEST:1,
+ ?COMMAND_METADATA:15,
?VERSION_0:16,
CorrelationId:32,
StreamCount:32,
@@ -2018,7 +2059,8 @@ handle_frame_post_auth(Transport,
end,
<<StreamCount:32>>, Streams),
Frame =
- <<?COMMAND_METADATA:16,
+ <<?RESPONSE:1,
+ ?COMMAND_METADATA:15,
?VERSION_0:16,
CorrelationId:32,
BrokersBin/binary,
@@ -2101,7 +2143,8 @@ handle_frame_post_auth(Transport,
handle_frame_post_auth(Transport,
Connection,
State,
- <<?COMMAND_CLOSE:16,
+ <<?REQUEST:1,
+ ?COMMAND_CLOSE:15,
?VERSION_0:16,
CorrelationId:32,
ClosingCode:16,
@@ -2111,7 +2154,8 @@ handle_frame_post_auth(Transport,
rabbit_log:info("Received close command ~p ~p~n",
[ClosingCode, ClosingReason]),
Frame =
- <<?COMMAND_CLOSE:16,
+ <<?RESPONSE:1,
+ ?COMMAND_CLOSE:15,
?VERSION_0:16,
CorrelationId:32,
?RESPONSE_CODE_OK:16>>,
@@ -2121,7 +2165,7 @@ handle_frame_post_auth(Transport,
handle_frame_post_auth(_Transport,
Connection,
State,
- <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>,
+ <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>,
Rest) ->
rabbit_log:info("Received heartbeat frame post auth~n"),
{Connection, State, Rest};
@@ -2131,7 +2175,8 @@ handle_frame_post_auth(Transport, Connection, State, Frame, Rest) ->
CloseReason = <<"unknown frame">>,
CloseReasonLength = byte_size(CloseReason),
CloseFrame =
- <<?COMMAND_CLOSE:16,
+ <<?REQUEST:1,
+ ?COMMAND_CLOSE:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_UNKNOWN_FRAME:16,
@@ -2168,7 +2213,8 @@ notify_connection_closed(#stream_connection{name = Name,
handle_frame_post_close(_Transport,
Connection,
State,
- <<?COMMAND_CLOSE:16,
+ <<?RESPONSE:1,
+ ?COMMAND_CLOSE:15,
?VERSION_0:16,
_CorrelationId:32,
_ResponseCode:16>>,
@@ -2179,7 +2225,7 @@ handle_frame_post_close(_Transport,
handle_frame_post_close(_Transport,
Connection,
State,
- <<?COMMAND_HEARTBEAT:16, ?VERSION_0:16>>,
+ <<?REQUEST:1, ?COMMAND_HEARTBEAT:15, ?VERSION_0:16>>,
Rest) ->
rabbit_log:info("Received heartbeat frame post close~n"),
{Connection, State, Rest};
@@ -2380,7 +2426,10 @@ response(Transport,
CorrelationId,
ResponseCode) ->
Transport:send(S,
- [<<?RESPONSE_FRAME_SIZE:32, CommandId:16, ?VERSION_0:16>>,
+ [<<?RESPONSE_FRAME_SIZE:32,
+ ?RESPONSE:1,
+ CommandId:15,
+ ?VERSION_0:16>>,
<<CorrelationId:32>>, <<ResponseCode:16>>]).
subscription_exists(StreamSubscriptions, SubscriptionId) ->
@@ -2399,7 +2448,8 @@ send_file_callback(Transport,
FrameSize = 2 + 2 + 1 + Size,
FrameBeginning =
<<FrameSize:32,
- ?COMMAND_DELIVER:16,
+ ?REQUEST:1,
+ ?COMMAND_DELIVER:15,
?VERSION_0:16,
SubscriptionId:8/unsigned>>,
Transport:send(S, FrameBeginning),
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
index 46903ecc1c..a3aa1c3ba6 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
@@ -183,13 +183,18 @@ test_server(Port) ->
test_peer_properties(S) ->
PeerPropertiesFrame =
- <<?COMMAND_PEER_PROPERTIES:16, ?VERSION_0:16, 1:32, 0:32>>,
+ <<?REQUEST:1,
+ ?COMMAND_PEER_PROPERTIES:15,
+ ?VERSION_0:16,
+ 1:32,
+ 0:32>>,
PeerPropertiesFrameSize = byte_size(PeerPropertiesFrame),
gen_tcp:send(S,
<<PeerPropertiesFrameSize:32, PeerPropertiesFrame/binary>>),
{ok,
<<_Size:32,
- ?COMMAND_PEER_PROPERTIES:16,
+ ?RESPONSE:1,
+ ?COMMAND_PEER_PROPERTIES:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16,
@@ -198,7 +203,7 @@ test_peer_properties(S) ->
test_authenticate(S) ->
SaslHandshakeFrame =
- <<?COMMAND_SASL_HANDSHAKE:16, ?VERSION_0:16, 1:32>>,
+ <<?REQUEST:1, ?COMMAND_SASL_HANDSHAKE:15, ?VERSION_0:16, 1:32>>,
SaslHandshakeFrameSize = byte_size(SaslHandshakeFrame),
gen_tcp:send(S,
<<SaslHandshakeFrameSize:32, SaslHandshakeFrame/binary>>),
@@ -209,7 +214,8 @@ test_authenticate(S) ->
ok =
case SaslAvailable of
<<31:32,
- ?COMMAND_SASL_HANDSHAKE:16,
+ ?RESPONSE:1,
+ ?COMMAND_SASL_HANDSHAKE:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16,
@@ -220,7 +226,8 @@ test_authenticate(S) ->
AmqPlain:8/binary>> ->
ok;
<<31:32,
- ?COMMAND_SASL_HANDSHAKE:16,
+ ?RESPONSE:1,
+ ?COMMAND_SASL_HANDSHAKE:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16,
@@ -241,7 +248,8 @@ test_authenticate(S) ->
PlainSaslSize = byte_size(PlainSasl),
SaslAuthenticateFrame =
- <<?COMMAND_SASL_AUTHENTICATE:16,
+ <<?REQUEST:1,
+ ?COMMAND_SASL_AUTHENTICATE:15,
?VERSION_0:16,
2:32,
5:16,
@@ -256,7 +264,8 @@ test_authenticate(S) ->
SaslAuthenticateFrame/binary>>),
{ok,
<<10:32,
- ?COMMAND_SASL_AUTHENTICATE:16,
+ ?RESPONSE:1,
+ ?COMMAND_SASL_AUTHENTICATE:15,
?VERSION_0:16,
2:32,
?RESPONSE_CODE_OK:16,
@@ -265,7 +274,8 @@ test_authenticate(S) ->
TuneExpected =
<<12:32,
- ?COMMAND_TUNE:16,
+ ?REQUEST:1,
+ ?COMMAND_TUNE:15,
?VERSION_0:16,
?DEFAULT_FRAME_MAX:32,
?DEFAULT_HEARTBEAT:32>>,
@@ -277,14 +287,19 @@ test_authenticate(S) ->
end,
TuneFrame =
- <<?COMMAND_TUNE:16, ?VERSION_0:16, ?DEFAULT_FRAME_MAX:32, 0:32>>,
+ <<?RESPONSE:1,
+ ?COMMAND_TUNE:15,
+ ?VERSION_0:16,
+ ?DEFAULT_FRAME_MAX:32,
+ 0:32>>,
TuneFrameSize = byte_size(TuneFrame),
gen_tcp:send(S, <<TuneFrameSize:32, TuneFrame/binary>>),
VirtualHost = <<"/">>,
VirtualHostLength = byte_size(VirtualHost),
OpenFrame =
- <<?COMMAND_OPEN:16,
+ <<?REQUEST:1,
+ ?COMMAND_OPEN:15,
?VERSION_0:16,
3:32,
VirtualHostLength:16,
@@ -293,7 +308,8 @@ test_authenticate(S) ->
gen_tcp:send(S, <<OpenFrameSize:32, OpenFrame/binary>>),
{ok,
<<10:32,
- ?COMMAND_OPEN:16,
+ ?RESPONSE:1,
+ ?COMMAND_OPEN:15,
?VERSION_0:16,
3:32,
?RESPONSE_CODE_OK:16>>} =
@@ -302,7 +318,8 @@ test_authenticate(S) ->
test_create_stream(S, Stream) ->
StreamSize = byte_size(Stream),
CreateStreamFrame =
- <<?COMMAND_CREATE_STREAM:16,
+ <<?REQUEST:1,
+ ?COMMAND_CREATE_STREAM:15,
?VERSION_0:16,
1:32,
StreamSize:16,
@@ -312,7 +329,8 @@ test_create_stream(S, Stream) ->
gen_tcp:send(S, <<FrameSize:32, CreateStreamFrame/binary>>),
{ok,
<<_Size:32,
- ?COMMAND_CREATE_STREAM:16,
+ ?RESPONSE:1,
+ ?COMMAND_CREATE_STREAM:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16>>} =
@@ -321,7 +339,8 @@ test_create_stream(S, Stream) ->
test_delete_stream(S, Stream) ->
StreamSize = byte_size(Stream),
DeleteStreamFrame =
- <<?COMMAND_DELETE_STREAM:16,
+ <<?REQUEST:1,
+ ?COMMAND_DELETE_STREAM:15,
?VERSION_0:16,
1:32,
StreamSize:16,
@@ -331,7 +350,8 @@ test_delete_stream(S, Stream) ->
ResponseFrameSize = 10,
{ok,
<<ResponseFrameSize:32,
- ?COMMAND_DELETE_STREAM:16,
+ ?RESPONSE:1,
+ ?COMMAND_DELETE_STREAM:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16>>} =
@@ -340,7 +360,8 @@ test_delete_stream(S, Stream) ->
test_declare_publisher(S, PublisherId, Stream) ->
StreamSize = byte_size(Stream),
DeclarePublisherFrame =
- <<?COMMAND_DECLARE_PUBLISHER:16,
+ <<?REQUEST:1,
+ ?COMMAND_DECLARE_PUBLISHER:15,
?VERSION_0:16,
1:32,
PublisherId:8,
@@ -352,7 +373,8 @@ test_declare_publisher(S, PublisherId, Stream) ->
Res = gen_tcp:recv(S, 0, 5000),
{ok,
<<_Size:32,
- ?COMMAND_DECLARE_PUBLISHER:16,
+ ?RESPONSE:1,
+ ?COMMAND_DECLARE_PUBLISHER:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16,
@@ -363,7 +385,8 @@ test_declare_publisher(S, PublisherId, Stream) ->
test_publish_confirm(S, PublisherId, Body) ->
BodySize = byte_size(Body),
PublishFrame =
- <<?COMMAND_PUBLISH:16,
+ <<?REQUEST:1,
+ ?COMMAND_PUBLISH:15,
?VERSION_0:16,
PublisherId:8,
1:32,
@@ -374,7 +397,8 @@ test_publish_confirm(S, PublisherId, Body) ->
gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>),
{ok,
<<_Size:32,
- ?COMMAND_PUBLISH_CONFIRM:16,
+ ?REQUEST:1,
+ ?COMMAND_PUBLISH_CONFIRM:15,
?VERSION_0:16,
PublisherId:8,
1:32,
@@ -384,7 +408,8 @@ test_publish_confirm(S, PublisherId, Body) ->
test_subscribe(S, SubscriptionId, Stream) ->
StreamSize = byte_size(Stream),
SubscribeFrame =
- <<?COMMAND_SUBSCRIBE:16,
+ <<?REQUEST:1,
+ ?COMMAND_SUBSCRIBE:15,
?VERSION_0:16,
1:32,
SubscriptionId:8,
@@ -398,7 +423,8 @@ test_subscribe(S, SubscriptionId, Stream) ->
Res = gen_tcp:recv(S, 0, 5000),
{ok,
<<_Size:32,
- ?COMMAND_SUBSCRIBE:16,
+ ?RESPONSE:1,
+ ?COMMAND_SUBSCRIBE:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16,
@@ -410,7 +436,8 @@ test_deliver(S, Rest, SubscriptionId, Body) ->
BodySize = byte_size(Body),
Frame = read_frame(S, Rest),
<<58:32,
- ?COMMAND_DELIVER:16,
+ ?REQUEST:1,
+ ?COMMAND_DELIVER:15,
?VERSION_0:16,
SubscriptionId:8,
5:4/unsigned,
@@ -434,7 +461,8 @@ test_metadata_update_stream_deleted(S, Stream) ->
FrameSize = 2 + 2 + 2 + 2 + StreamSize,
{ok,
<<FrameSize:32,
- ?COMMAND_METADATA_UPDATE:16,
+ ?REQUEST:1,
+ ?COMMAND_METADATA_UPDATE:15,
?VERSION_0:16,
?RESPONSE_CODE_STREAM_NOT_AVAILABLE:16,
StreamSize:16,
@@ -445,7 +473,8 @@ test_close(S) ->
CloseReason = <<"OK">>,
CloseReasonSize = byte_size(CloseReason),
CloseFrame =
- <<?COMMAND_CLOSE:16,
+ <<?REQUEST:1,
+ ?COMMAND_CLOSE:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16,
@@ -455,7 +484,8 @@ test_close(S) ->
gen_tcp:send(S, <<CloseFrameSize:32, CloseFrame/binary>>),
{ok,
<<10:32,
- ?COMMAND_CLOSE:16,
+ ?RESPONSE:1,
+ ?COMMAND_CLOSE:15,
?VERSION_0:16,
1:32,
?RESPONSE_CODE_OK:16>>} =