diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-09-15 17:52:05 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-09-15 17:52:05 +0200 |
commit | cf5e99cd2908636fa16cab4b1726f060dda81b49 (patch) | |
tree | 73d9f594fce54921684d3f07b959aa01300e899c | |
parent | c25d89d67a77a2011bc4d77fee9ed62d4e6b91c9 (diff) | |
download | rabbitmq-server-git-cf5e99cd2908636fa16cab4b1726f060dda81b49.tar.gz |
Add offset tracking commands
-rw-r--r-- | deps/rabbitmq_stream/docs/PROTOCOL.adoc | 39 | ||||
-rw-r--r-- | deps/rabbitmq_stream/include/rabbit_stream.hrl | 2 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 46 |
3 files changed, 87 insertions, 0 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index c5613fe908..f3e089430b 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -132,6 +132,16 @@ doest not contain a correlation ID. |15 |Yes +|<<commitoffset>> +|Client +|16 +|No + +|<<queryoffset>> +|Client +|17 +|Yes + |<<create>> |Client |998 @@ -389,6 +399,35 @@ SaslAuthenticateResponse => Key Version CorrelationId ResponseCode PeerPropertie Value => string ``` +=== CommitOffset + +``` +CommitOffset => Key Version Reference Stream Offset + Key => int16 // 16 + Version => int16 + Reference => string // max 256 characters + Stream => string + Offset => int64 +``` + +=== QueryOffset + +``` +QueryOffsetRequest => Key Version CorrelationId Reference Stream + Key => int16 // 17 + Version => int16 + CorrelationId => int32 + Reference => string // max 256 characters + Stream => string + +QueryOffsetResponse => Key Version CorrelationId Reference Stream + Key => int16 // 17 + Version => int16 + CorrelationId => int32 + ResponseCode => int16 + Offset => int64 +``` + === Create ``` diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl index 3612be127e..88cf0fadfb 100644 --- a/deps/rabbitmq_stream/include/rabbit_stream.hrl +++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl @@ -14,6 +14,8 @@ -define(COMMAND_CLOSE, 13). -define(COMMAND_HEARTBEAT, 14). -define(COMMAND_PEER_PROPERTIES, 15). +-define(COMMAND_COMMIT_OFFSET, 16). +-define(COMMAND_QUERY_OFFSET, 17). -define(COMMAND_CREATE_STREAM, 998). -define(COMMAND_DELETE_STREAM, 999). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index ad7c0c409f..d41ff48a8c 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -707,6 +707,52 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S} = Connection, Transport:send(S, [<<FrameSize:32>>, Frame]), {Connection, State, Rest} end; +handle_frame_post_auth(_Transport, #stream_connection{virtual_host = VirtualHost, user = User} = Connection, + State, + <<?COMMAND_COMMIT_OFFSET:16, ?VERSION_0:16, ReferenceSize:16, Reference:ReferenceSize/binary, + StreamSize:16, Stream:StreamSize/binary, Offset:64>>, Rest) -> + + case check_write_permitted(#resource{name = Stream, kind = queue, virtual_host = VirtualHost}, User, #{}) of + ok -> + case lookup_leader(Stream, Connection) of + cluster_not_found -> + rabbit_log:info("Could not find leader to commit offset on ~p~n", [Stream]), + %% FIXME commit offset is fire-and-forget, so no response even if error, change this? + {Connection, State, Rest}; + {ClusterLeader, Connection1} -> + osiris:write_tracking(ClusterLeader, Reference, Offset), + {Connection1, State, Rest} + end; + error -> + %% FIXME commit offset is fire-and-forget, so no response even if error, change this? + rabbit_log:info("Not authorized to commit offset on ~p~n", [Stream]), + {Connection, State, Rest} + end; +handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = VirtualHost, user = User} = Connection, + State, + <<?COMMAND_QUERY_OFFSET:16, ?VERSION_0:16, CorrelationId:32, + ReferenceSize:16, Reference:ReferenceSize/binary, + StreamSize:16, Stream:StreamSize/binary>>, Rest) -> + FrameSize = ?RESPONSE_FRAME_SIZE + 8, + {ResponseCode, Offset} = case check_read_permitted(#resource{name = Stream, kind = queue, virtual_host = VirtualHost}, User, #{}) of + ok -> + case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of + {error, not_found} -> + {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0}; + {ok, LocalMemberPid} -> + {?RESPONSE_CODE_OK, case osiris:read_tracking(LocalMemberPid, Reference) of + undefined -> + 0; + Offt -> + Offt + end} + end; + error -> + {?RESPONSE_CODE_ACCESS_REFUSED, 0} + end, + Transport:send(S, [<<FrameSize:32, ?COMMAND_QUERY_OFFSET:16, ?VERSION_0:16>>, + <<CorrelationId:32>>, <<ResponseCode:16>>, <<Offset:64>>]), + {Connection, State, Rest}; handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, user = #user{username = Username} = User} = Connection, State, <<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary, |