summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-11-26 10:02:13 +0100
committerMichael Klishin <michael@clojurewerkz.org>2020-11-30 13:29:17 +0300
commit3c6446fd6f3e32f6567b0baf16542a94aff03018 (patch)
tree24443517b03e699e5eb0eb8cbedb435fabb7941e
parent7a0ccecd7c0a714778f1fdc71f5cbc2c97b4fcdf (diff)
downloadrabbitmq-server-git-3c6446fd6f3e32f6567b0baf16542a94aff03018.tar.gz
Add query publisher sequence
-rw-r--r--deps/rabbitmq_stream/docs/PROTOCOL.adoc29
-rw-r--r--deps/rabbitmq_stream/include/rabbit_stream.hrl1
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl28
3 files changed, 55 insertions, 3 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
index 709763b57e..025a359d17 100644
--- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc
+++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
@@ -152,6 +152,11 @@ doest not contain a correlation ID.
|19
|Yes
+|<<querypublishersequence>>
+|Client
+|20
+|Yes
+
|<<create>>
|Client
|998
@@ -436,18 +441,18 @@ QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset
Version => int16
CorrelationId => int32
ResponseCode => int16
- Offset => int64
+ Offset => uint64
```
=== DeclarePublisher
```
-DeclarePublisherRequest => Key Version CorrelationId PublisherId [Reference] Stream
+DeclarePublisherRequest => Key Version CorrelationId PublisherId [PublisherReference] Stream
Key => int16 // 18
Version => int16
CorrelationId => int32
PublisherId => uint8
- Reference => string // max 256 characters
+ PublisherReference => string // max 256 characters
Stream => string
DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId
@@ -473,6 +478,24 @@ DeletePublisherResponse => Key Version CorrelationId ResponseCode
ResponseCode => int16
```
+=== QueryPublisherSequence
+
+```
+QueryPublisherRequest => Key Version CorrelationId PublisherReference Stream
+ Key => int16 // 20
+ Version => int16
+ CorrelationId => int32
+ PublisherReference => string // max 256 characters
+ Stream => string
+
+QueryPublisherResponse => Key Version CorrelationId ResponseCode Sequence
+ Key => int16 // 20
+ Version => int16
+ CorrelationId => int32
+ ResponseCode => int16
+ Sequence => uint64
+```
+
=== Create
```
diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl
index 4ba630a0fe..6bb7e9504c 100644
--- a/deps/rabbitmq_stream/include/rabbit_stream.hrl
+++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl
@@ -18,6 +18,7 @@
-define(COMMAND_QUERY_OFFSET, 17).
-define(COMMAND_DECLARE_PUBLISHER, 18).
-define(COMMAND_DELETE_PUBLISHER, 19).
+-define(COMMAND_QUERY_PUBLISHER_SEQUENCE, 20).
-define(COMMAND_CREATE_STREAM, 998).
-define(COMMAND_DELETE_STREAM, 999).
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 338a03f92d..2a09eb1fd7 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -990,6 +990,34 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host =
Transport:send(S, [<<FrameSize:32, ?COMMAND_QUERY_OFFSET:16, ?VERSION_0:16>>,
<<CorrelationId:32>>, <<ResponseCode:16>>, <<Offset:64>>]),
{Connection, State, Rest};
+handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = VirtualHost, user = User} = Connection,
+ State,
+ <<?COMMAND_QUERY_PUBLISHER_SEQUENCE:16, ?VERSION_0:16, CorrelationId:32,
+ ReferenceSize:16, Reference:ReferenceSize/binary,
+ StreamSize:16, Stream:StreamSize/binary>>, Rest) ->
+ FrameSize = ?RESPONSE_FRAME_SIZE + 8,
+ {ResponseCode, Sequence} = case rabbit_stream_utils:check_read_permitted(
+ #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
+ User,
+ #{}) of
+ ok ->
+ case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of
+ {error, not_found} ->
+ {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0};
+ {ok, LocalMemberPid} ->
+ {?RESPONSE_CODE_OK, case osiris:fetch_writer_seq(LocalMemberPid, Reference) of
+ undefined ->
+ 0;
+ Offt ->
+ Offt
+ end}
+ end;
+ error ->
+ {?RESPONSE_CODE_ACCESS_REFUSED, 0}
+ end,
+ Transport:send(S, [<<FrameSize:32, ?COMMAND_QUERY_PUBLISHER_SEQUENCE:16, ?VERSION_0:16>>,
+ <<CorrelationId:32>>, <<ResponseCode:16>>, <<Sequence:64>>]),
+ {Connection, State, Rest};
handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, user = #user{username = Username} = User} = Connection,
State,
<<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary,