diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-11-26 10:02:13 +0100 |
---|---|---|
committer | Michael Klishin <michael@clojurewerkz.org> | 2020-11-30 13:29:17 +0300 |
commit | 3c6446fd6f3e32f6567b0baf16542a94aff03018 (patch) | |
tree | 24443517b03e699e5eb0eb8cbedb435fabb7941e | |
parent | 7a0ccecd7c0a714778f1fdc71f5cbc2c97b4fcdf (diff) | |
download | rabbitmq-server-git-3c6446fd6f3e32f6567b0baf16542a94aff03018.tar.gz |
Add query publisher sequence
-rw-r--r-- | deps/rabbitmq_stream/docs/PROTOCOL.adoc | 29 | ||||
-rw-r--r-- | deps/rabbitmq_stream/include/rabbit_stream.hrl | 1 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 28 |
3 files changed, 55 insertions, 3 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index 709763b57e..025a359d17 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -152,6 +152,11 @@ doest not contain a correlation ID. |19 |Yes +|<<querypublishersequence>> +|Client +|20 +|Yes + |<<create>> |Client |998 @@ -436,18 +441,18 @@ QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset Version => int16 CorrelationId => int32 ResponseCode => int16 - Offset => int64 + Offset => uint64 ``` === DeclarePublisher ``` -DeclarePublisherRequest => Key Version CorrelationId PublisherId [Reference] Stream +DeclarePublisherRequest => Key Version CorrelationId PublisherId [PublisherReference] Stream Key => int16 // 18 Version => int16 CorrelationId => int32 PublisherId => uint8 - Reference => string // max 256 characters + PublisherReference => string // max 256 characters Stream => string DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId @@ -473,6 +478,24 @@ DeletePublisherResponse => Key Version CorrelationId ResponseCode ResponseCode => int16 ``` +=== QueryPublisherSequence + +``` +QueryPublisherRequest => Key Version CorrelationId PublisherReference Stream + Key => int16 // 20 + Version => int16 + CorrelationId => int32 + PublisherReference => string // max 256 characters + Stream => string + +QueryPublisherResponse => Key Version CorrelationId ResponseCode Sequence + Key => int16 // 20 + Version => int16 + CorrelationId => int32 + ResponseCode => int16 + Sequence => uint64 +``` + === Create ``` diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl index 4ba630a0fe..6bb7e9504c 100644 --- a/deps/rabbitmq_stream/include/rabbit_stream.hrl +++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl @@ -18,6 +18,7 @@ -define(COMMAND_QUERY_OFFSET, 17). -define(COMMAND_DECLARE_PUBLISHER, 18). -define(COMMAND_DELETE_PUBLISHER, 19). +-define(COMMAND_QUERY_PUBLISHER_SEQUENCE, 20). -define(COMMAND_CREATE_STREAM, 998). -define(COMMAND_DELETE_STREAM, 999). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 338a03f92d..2a09eb1fd7 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -990,6 +990,34 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = Transport:send(S, [<<FrameSize:32, ?COMMAND_QUERY_OFFSET:16, ?VERSION_0:16>>, <<CorrelationId:32>>, <<ResponseCode:16>>, <<Offset:64>>]), {Connection, State, Rest}; +handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = VirtualHost, user = User} = Connection, + State, + <<?COMMAND_QUERY_PUBLISHER_SEQUENCE:16, ?VERSION_0:16, CorrelationId:32, + ReferenceSize:16, Reference:ReferenceSize/binary, + StreamSize:16, Stream:StreamSize/binary>>, Rest) -> + FrameSize = ?RESPONSE_FRAME_SIZE + 8, + {ResponseCode, Sequence} = case rabbit_stream_utils:check_read_permitted( + #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, + User, + #{}) of + ok -> + case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of + {error, not_found} -> + {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0}; + {ok, LocalMemberPid} -> + {?RESPONSE_CODE_OK, case osiris:fetch_writer_seq(LocalMemberPid, Reference) of + undefined -> + 0; + Offt -> + Offt + end} + end; + error -> + {?RESPONSE_CODE_ACCESS_REFUSED, 0} + end, + Transport:send(S, [<<FrameSize:32, ?COMMAND_QUERY_PUBLISHER_SEQUENCE:16, ?VERSION_0:16>>, + <<CorrelationId:32>>, <<ResponseCode:16>>, <<Sequence:64>>]), + {Connection, State, Rest}; handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, user = #user{username = Username} = User} = Connection, State, <<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary, |