summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-09-15 17:52:05 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-09-15 17:52:05 +0200
commitcf5e99cd2908636fa16cab4b1726f060dda81b49 (patch)
tree73d9f594fce54921684d3f07b959aa01300e899c
parentc25d89d67a77a2011bc4d77fee9ed62d4e6b91c9 (diff)
downloadrabbitmq-server-git-cf5e99cd2908636fa16cab4b1726f060dda81b49.tar.gz
Add offset tracking commands
-rw-r--r--deps/rabbitmq_stream/docs/PROTOCOL.adoc39
-rw-r--r--deps/rabbitmq_stream/include/rabbit_stream.hrl2
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl46
3 files changed, 87 insertions, 0 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
index c5613fe908..f3e089430b 100644
--- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc
+++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
@@ -132,6 +132,16 @@ doest not contain a correlation ID.
|15
|Yes
+|<<commitoffset>>
+|Client
+|16
+|No
+
+|<<queryoffset>>
+|Client
+|17
+|Yes
+
|<<create>>
|Client
|998
@@ -389,6 +399,35 @@ SaslAuthenticateResponse => Key Version CorrelationId ResponseCode PeerPropertie
Value => string
```
+=== CommitOffset
+
+```
+CommitOffset => Key Version Reference Stream Offset
+ Key => int16 // 16
+ Version => int16
+ Reference => string // max 256 characters
+ Stream => string
+ Offset => int64
+```
+
+=== QueryOffset
+
+```
+QueryOffsetRequest => Key Version CorrelationId Reference Stream
+ Key => int16 // 17
+ Version => int16
+ CorrelationId => int32
+ Reference => string // max 256 characters
+ Stream => string
+
+QueryOffsetResponse => Key Version CorrelationId Reference Stream
+ Key => int16 // 17
+ Version => int16
+ CorrelationId => int32
+ ResponseCode => int16
+ Offset => int64
+```
+
=== Create
```
diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl
index 3612be127e..88cf0fadfb 100644
--- a/deps/rabbitmq_stream/include/rabbit_stream.hrl
+++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl
@@ -14,6 +14,8 @@
-define(COMMAND_CLOSE, 13).
-define(COMMAND_HEARTBEAT, 14).
-define(COMMAND_PEER_PROPERTIES, 15).
+-define(COMMAND_COMMIT_OFFSET, 16).
+-define(COMMAND_QUERY_OFFSET, 17).
-define(COMMAND_CREATE_STREAM, 998).
-define(COMMAND_DELETE_STREAM, 999).
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index ad7c0c409f..d41ff48a8c 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -707,6 +707,52 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S} = Connection,
Transport:send(S, [<<FrameSize:32>>, Frame]),
{Connection, State, Rest}
end;
+handle_frame_post_auth(_Transport, #stream_connection{virtual_host = VirtualHost, user = User} = Connection,
+ State,
+ <<?COMMAND_COMMIT_OFFSET:16, ?VERSION_0:16, ReferenceSize:16, Reference:ReferenceSize/binary,
+ StreamSize:16, Stream:StreamSize/binary, Offset:64>>, Rest) ->
+
+ case check_write_permitted(#resource{name = Stream, kind = queue, virtual_host = VirtualHost}, User, #{}) of
+ ok ->
+ case lookup_leader(Stream, Connection) of
+ cluster_not_found ->
+ rabbit_log:info("Could not find leader to commit offset on ~p~n", [Stream]),
+ %% FIXME commit offset is fire-and-forget, so no response even if error, change this?
+ {Connection, State, Rest};
+ {ClusterLeader, Connection1} ->
+ osiris:write_tracking(ClusterLeader, Reference, Offset),
+ {Connection1, State, Rest}
+ end;
+ error ->
+ %% FIXME commit offset is fire-and-forget, so no response even if error, change this?
+ rabbit_log:info("Not authorized to commit offset on ~p~n", [Stream]),
+ {Connection, State, Rest}
+ end;
+handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = VirtualHost, user = User} = Connection,
+ State,
+ <<?COMMAND_QUERY_OFFSET:16, ?VERSION_0:16, CorrelationId:32,
+ ReferenceSize:16, Reference:ReferenceSize/binary,
+ StreamSize:16, Stream:StreamSize/binary>>, Rest) ->
+ FrameSize = ?RESPONSE_FRAME_SIZE + 8,
+ {ResponseCode, Offset} = case check_read_permitted(#resource{name = Stream, kind = queue, virtual_host = VirtualHost}, User, #{}) of
+ ok ->
+ case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of
+ {error, not_found} ->
+ {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0};
+ {ok, LocalMemberPid} ->
+ {?RESPONSE_CODE_OK, case osiris:read_tracking(LocalMemberPid, Reference) of
+ undefined ->
+ 0;
+ Offt ->
+ Offt
+ end}
+ end;
+ error ->
+ {?RESPONSE_CODE_ACCESS_REFUSED, 0}
+ end,
+ Transport:send(S, [<<FrameSize:32, ?COMMAND_QUERY_OFFSET:16, ?VERSION_0:16>>,
+ <<CorrelationId:32>>, <<ResponseCode:16>>, <<Offset:64>>]),
+ {Connection, State, Rest};
handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, user = #user{username = Username} = User} = Connection,
State,
<<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary,