summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-02 14:16:22 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-02 14:16:22 +0200
commit5e80aa9f3992e298c5e7da63d5f114d9b22eafd5 (patch)
tree7b96e6a35af68eeb720fbc866f422e4b0dac0733
parentb8bdb5ae7b633ed0c897af67337a27dcde838784 (diff)
downloadrabbitmq-server-git-5e80aa9f3992e298c5e7da63d5f114d9b22eafd5.tar.gz
Add correlation ID field in commit offset frame
Not used yet, but can avoid a command version change if the command should have confirm response.
-rw-r--r--deps/rabbitmq_stream/docs/PROTOCOL.adoc1
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl3
2 files changed, 3 insertions, 1 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
index 130f85590a..14e149664c 100644
--- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc
+++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
@@ -406,6 +406,7 @@ SaslAuthenticateResponse => Key Version CorrelationId ResponseCode PeerPropertie
CommitOffset => Key Version Reference Stream Offset
Key => int16 // 16
Version => int16
+ CorrelationId => int32 // not used yet
Reference => string // max 256 characters
SubscriptionId => uint8
Offset => int64
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index fd3f651ba8..6f285894db 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -712,7 +712,8 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S} = Connection,
end;
handle_frame_post_auth(_Transport, #stream_connection{virtual_host = VirtualHost, user = User} = Connection,
State,
- <<?COMMAND_COMMIT_OFFSET:16, ?VERSION_0:16, ReferenceSize:16, Reference:ReferenceSize/binary,
+ <<?COMMAND_COMMIT_OFFSET:16, ?VERSION_0:16, _CorrelationId:32,
+ ReferenceSize:16, Reference:ReferenceSize/binary,
StreamSize:16, Stream:StreamSize/binary, Offset:64>>, Rest) ->
case check_write_permitted(#resource{name = Stream, kind = queue, virtual_host = VirtualHost}, User, #{}) of