diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-01-18 11:25:21 +0100 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-01-18 11:25:21 +0100 |
commit | 0d73b58be06aa05fd8dad4b4181cf5c25a99e744 (patch) | |
tree | c12ed4a2b94850d8d983a8b39c3d7e2eca2800a5 | |
parent | a087621257f1de355ea16ceebcfb798df7b880c6 (diff) | |
download | rabbitmq-server-git-0d73b58be06aa05fd8dad4b4181cf5c25a99e744.tar.gz |
Re-order stream command constants
The order is more usage-oriented.
-rw-r--r-- | deps/rabbitmq_stream/Makefile | 2 | ||||
-rw-r--r-- | deps/rabbitmq_stream/docs/PROTOCOL.adoc | 406 | ||||
-rw-r--r-- | deps/rabbitmq_stream/include/rabbit_stream.hrl | 46 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 264 | ||||
-rw-r--r-- | deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl | 1 |
5 files changed, 359 insertions, 360 deletions
diff --git a/deps/rabbitmq_stream/Makefile b/deps/rabbitmq_stream/Makefile index 88d7717281..bc7268aeb5 100644 --- a/deps/rabbitmq_stream/Makefile +++ b/deps/rabbitmq_stream/Makefile @@ -19,7 +19,7 @@ endef DEPS = rabbit -TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers +TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index 025a359d17..eb759972e3 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -52,127 +52,145 @@ doest not contain a correlation ID. |=== |Command |From |Key | Expects response? -|<<publish>> +|<<declareproducer>> |Client |0 -|No +|Yes -|<<publishconfirm>> -|Server +|<<publish>> +|Client |1 |No -|<<subscribe>> -|Client +|<<publishconfirm>> +|Server |2 -|Yes +|No -|<<deliver>> +|<<publisherror>> |Server |3 |No -|<<credit>> +|<<querypublishersequence>> |Client |4 -|No +|Yes -|<<unsubscribe>> +|<<deleteproducer>> |Client |5 |Yes -|<<publisherror>> -|Server +|<<subscribe>> +|Client |6 -|No +|Yes -|<<metadataupdate>> +|<<deliver>> |Server |7 |No -|<<metadata>> +|<<credit>> |Client |8 |No -|<<saslhandshake>> +|<<commitoffset>> |Client |9 -|Yes +|No -|<<saslauthenticate>> +|<<queryoffset>> |Client |10 |Yes -|<<tune>> -|Server +|<<unsubscribe>> +|Client |11 |Yes -|<<open>> -|Server +|<<create>> +|Client |12 |Yes -|<<close>> -|Client & Server +|<<delete>> +|Client |13 |Yes -|<<heartbeat>> -|Client & Server +|<<metadata>> +|Client |14 |No -|<<peerproperties>> -|Client +|<<metadataupdate>> +|Server |15 -|Yes +|No -|<<commitoffset>> +|<<peerproperties>> |Client |16 -|No +|Yes -|<<queryoffset>> +|<<saslhandshake>> |Client |17 |Yes -|<<declareproducer>> +|<<saslauthenticate>> |Client |18 |Yes -|<<deleteproducer>> -|Client +|<<tune>> +|Server |19 |Yes -|<<querypublishersequence>> -|Client +|<<open>> +|Server |20 |Yes -|<<create>> -|Client -|998 +|<<close>> +|Client & Server +|21 |Yes -|<<delete>> -|Client -|999 -|Yes +|<<heartbeat>> +|Client & Server +|22 +|No |=== +=== DeclarePublisher + +``` +DeclarePublisherRequest => Key Version CorrelationId PublisherId [PublisherReference] Stream + Key => int16 // 0 + Version => int16 + CorrelationId => int32 + PublisherId => uint8 + PublisherReference => string // max 256 characters + Stream => string + +DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId + Key => int16 // 18 + Version => int16 + CorrelationId => int32 + ResponseCode => int16 +``` + === Publish ``` Publish => Key Version Stream PublishedMessages - Key => int16 // 0 + Key => int16 // 1 Version => int16 PublisherId => uint8 PublishedMessages => [PublishedMessage] @@ -185,17 +203,63 @@ Publish => Key Version Stream PublishedMessages ``` PublishConfirm => Key Version PublishingIds - Key => int16 // 1 + Key => int16 // 2 Version => int16 PublisherId => uint8 PublishingIds => [int64] // to correlate with the messages sent ``` +=== PublishError + +``` +PublishError => Key Version [PublishingError] + Key => int16 // 3 + Version => int16 + PublisherId => uint8 + PublishingError => PublishingId Code + PublishingId => int64 + Code => int16 // code to identify the problem +``` + +=== QueryPublisherSequence + +``` +QueryPublisherRequest => Key Version CorrelationId PublisherReference Stream + Key => int16 // 4 + Version => int16 + CorrelationId => int32 + PublisherReference => string // max 256 characters + Stream => string + +QueryPublisherResponse => Key Version CorrelationId ResponseCode Sequence + Key => int16 // 4 + Version => int16 + CorrelationId => int32 + ResponseCode => int16 + Sequence => uint64 +``` + +=== DeletePublisher + +``` +DeletePublisherRequest => Key Version CorrelationId PublisherId + Key => int16 // 5 + Version => int16 + CorrelationId => int32 + PublisherId => uint8 + +DeletePublisherResponse => Key Version CorrelationId ResponseCode + Key => int16 // 5 + Version => int16 + CorrelationId => int32 + ResponseCode => int16 +``` + === Subscribe ``` Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification Credit - Key => int16 // 2 + Key => int16 // 6 Version => int16 CorrelationId => int32 // correlation id to correlate the response SubscriptionId => uint8 // client-supplied id to identify the subscription @@ -210,7 +274,7 @@ Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification ``` Deliver => Key Version SubscriptionId OsirisChunk - Key => int16 // 3 + Key => int16 // 7 Version => int32 SubscriptionId => uint8 OsirisChunk => MagicVersion NumEntries NumRecords Epoch ChunkFirstOffset ChunkCrc DataLength Messages @@ -233,13 +297,13 @@ for details on the structure of messages. ``` Credit => Key Version SubscriptionId Credit - Key => int16 // 4 + Key => int16 // 8 Version => int16 SubscriptionId => int8 Credit => int16 // the number of chunks that can be sent CreditResponse => Key Version ResponseCode SubscriptionId - Key => int16 // 4 + Key => int16 // 8 Version => int16 ResponseCode => int16 SubscriptionId => int8 @@ -247,155 +311,110 @@ CreditResponse => Key Version ResponseCode SubscriptionId NB: the server sent a response only in case of problem, e.g. crediting an unknown subscription. -=== Unsubscribe - -``` -Unsubscribe => Key Version CorrelationId SubscriptionId - Key => int16 // 5 - Version => int16 - CorrelationId => int32 - SubscriptionId => int8 -``` - -=== PublishError - -``` -PublishError => Key Version [PublishingError] - Key => int16 // 6 - Version => int16 - PublisherId => uint8 - PublishingError => PublishingId Code - PublishingId => int64 - Code => int16 // code to identify the problem -``` - -=== MetadataUpdate - -``` -MetadataUpdate => Key Version MetadataInfo - Key => int16 // 7 - Version => int16 - MetadataInfo => Code Stream - Code => int16 // code to identify the information - Stream => string // the stream implied -``` - -=== Metadata - -``` -MetadataQuery => Key Version CorrelationId [Stream] - Key => int16 // 8 - Version => int16 - CorrelationId => int32 - Stream => string - -MetadataResponse => Key Version CorrelationId [Broker] [StreamMetadata] - Key => int16 // 8 - Version => int16 - CorrelationId => int32 - Broker => Reference Host Port - Reference => int16 - Host => string - Port => int32 - StreamMetadata => StreamName LeaderReference ReplicasReferences - StreamName => string - ResponseCode => int16 - LeaderReference => int16 - ReplicasReferences => [int16] -``` - -=== SaslHandshake +=== CommitOffset ``` -SaslHandshakeRequest => Key Version CorrelationId Mechanism - Key => int16 // 9 - Version => int16 - CorrelationId => int32 - -SaslHandshakeResponse => Key Version CorrelationId ResponseCode [Mechanism] +CommitOffset => Key Version Reference Stream Offset Key => int16 // 9 Version => int16 - CorrelationId => int32 - ResponseCode => int16 - Mechanism => string + CorrelationId => int32 // not used yet + Reference => string // max 256 characters + SubscriptionId => uint8 + Offset => int64 ``` -=== SaslAuthenticate +=== QueryOffset ``` -SaslAuthenticateRequest => Key Version CorrelationId Mechanism SaslOpaqueData +QueryOffsetRequest => Key Version CorrelationId Reference Stream Key => int16 // 10 Version => int16 CorrelationId => int32 - Mechanism => string - SaslOpaqueData => bytes + Reference => string // max 256 characters + Stream => string -SaslAuthenticateResponse => Key Version CorrelationId ResponseCode SaslOpaqueData +QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset Key => int16 // 10 Version => int16 CorrelationId => int32 ResponseCode => int16 - SaslOpaqueData => bytes + Offset => uint64 ``` -=== Tune +=== Unsubscribe ``` -TuneRequest => Key Version FrameMax Heartbeat - Key => int16 // 11, to identify the command +Unsubscribe => Key Version CorrelationId SubscriptionId + Key => int16 // 11 Version => int16 - FrameMax => int32 // in bytes, 0 means no limit - Heartbeat => int32 // in seconds, 0 means no heartbeat - -TuneResponse => TuneRequest + CorrelationId => int32 + SubscriptionId => int8 ``` -=== Open +=== Create ``` -OpenRequest => Key Version CorrelationId VirtualHost +Create => Key Version CorrelationId Stream Arguments Key => int16 // 12 Version => int16 CorrelationId => int32 - VirtualHost => string + Stream => string + Arguments => [Argument] + Argument => Key Value + Key => string + Value => string +``` -OpenResponse => Key Version CorrelationId ResponseCode - Key => int16 // 12 +=== Delete + +``` +Delete => Key Version CorrelationId Stream + Key => int16 // 13 Version => int16 CorrelationId => int32 - ResponseCode => int16 + Stream => string ``` -=== Close +=== Metadata ``` -CloseRequest => Key Version CorrelationId ClosingCode ClosingReason - Key => int16 // 13 +MetadataQuery => Key Version CorrelationId [Stream] + Key => int16 // 14 Version => int16 CorrelationId => int32 - ClosingCode => int16 - ClosingReason => string + Stream => string -CloseResponse => Key Version CorrelationId ResponseCode - Key => int16 // 13 +MetadataResponse => Key Version CorrelationId [Broker] [StreamMetadata] + Key => int16 // 14 Version => int16 CorrelationId => int32 - ResponseCode => int16 + Broker => Reference Host Port + Reference => int16 + Host => string + Port => int32 + StreamMetadata => StreamName LeaderReference ReplicasReferences + StreamName => string + ResponseCode => int16 + LeaderReference => int16 + ReplicasReferences => [int16] ``` -=== Heartbeat +=== MetadataUpdate ``` -Heartbeat => Key Version - Key => int16 // 14 +MetadataUpdate => Key Version MetadataInfo + Key => int16 // 15 Version => int16 + MetadataInfo => Code Stream + Code => int16 // code to identify the information + Stream => string // the stream implied ``` === PeerProperties ``` PeerPropertiesRequest => Key Version PeerProperties - Key => int16 // 15 + Key => int16 // 16 Version => int16 CorrelationId => int32 PeerProperties => [PeerProperty] @@ -403,8 +422,8 @@ PeerPropertiesRequest => Key Version PeerProperties Key => string Value => string -SaslAuthenticateResponse => Key Version CorrelationId ResponseCode PeerProperties - Key => int16 // 15 +PeerPropertiesResponse => Key Version CorrelationId ResponseCode PeerProperties + Key => int16 // 16 Version => int16 CorrelationId => int32 ResponseCode => int16 @@ -414,110 +433,91 @@ SaslAuthenticateResponse => Key Version CorrelationId ResponseCode PeerPropertie Value => string ``` -=== CommitOffset - -``` -CommitOffset => Key Version Reference Stream Offset - Key => int16 // 16 - Version => int16 - CorrelationId => int32 // not used yet - Reference => string // max 256 characters - SubscriptionId => uint8 - Offset => int64 -``` - -=== QueryOffset +=== SaslHandshake ``` -QueryOffsetRequest => Key Version CorrelationId Reference Stream +SaslHandshakeRequest => Key Version CorrelationId Mechanism Key => int16 // 17 Version => int16 CorrelationId => int32 - Reference => string // max 256 characters - Stream => string -QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset +SaslHandshakeResponse => Key Version CorrelationId ResponseCode [Mechanism] Key => int16 // 17 Version => int16 CorrelationId => int32 ResponseCode => int16 - Offset => uint64 + Mechanism => string ``` -=== DeclarePublisher +=== SaslAuthenticate ``` -DeclarePublisherRequest => Key Version CorrelationId PublisherId [PublisherReference] Stream +SaslAuthenticateRequest => Key Version CorrelationId Mechanism SaslOpaqueData Key => int16 // 18 Version => int16 CorrelationId => int32 - PublisherId => uint8 - PublisherReference => string // max 256 characters - Stream => string + Mechanism => string + SaslOpaqueData => bytes -DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId +SaslAuthenticateResponse => Key Version CorrelationId ResponseCode SaslOpaqueData Key => int16 // 18 Version => int16 CorrelationId => int32 ResponseCode => int16 + SaslOpaqueData => bytes ``` -=== DeletePublisher +=== Tune ``` -DeletePublisherRequest => Key Version CorrelationId PublisherId +TuneRequest => Key Version FrameMax Heartbeat Key => int16 // 19 Version => int16 - CorrelationId => int32 - PublisherId => uint8 + FrameMax => int32 // in bytes, 0 means no limit + Heartbeat => int32 // in seconds, 0 means no heartbeat -DeletePublisherResponse => Key Version CorrelationId ResponseCode - Key => int16 // 19 - Version => int16 - CorrelationId => int32 - ResponseCode => int16 +TuneResponse => TuneRequest ``` -=== QueryPublisherSequence +=== Open ``` -QueryPublisherRequest => Key Version CorrelationId PublisherReference Stream +OpenRequest => Key Version CorrelationId VirtualHost Key => int16 // 20 Version => int16 CorrelationId => int32 - PublisherReference => string // max 256 characters - Stream => string + VirtualHost => string -QueryPublisherResponse => Key Version CorrelationId ResponseCode Sequence +OpenResponse => Key Version CorrelationId ResponseCode Key => int16 // 20 Version => int16 CorrelationId => int32 ResponseCode => int16 - Sequence => uint64 ``` -=== Create +=== Close ``` -Create => Key Version CorrelationId Stream Arguments - Key => int16 // 998 +CloseRequest => Key Version CorrelationId ClosingCode ClosingReason + Key => int16 // 21 Version => int16 CorrelationId => int32 - Stream => string - Arguments => [Argument] - Argument => Key Value - Key => string - Value => string + ClosingCode => int16 + ClosingReason => string + +CloseResponse => Key Version CorrelationId ResponseCode + Key => int16 // 21 + Version => int16 + CorrelationId => int32 + ResponseCode => int16 ``` -=== Delete +=== Heartbeat ``` -Delete => Key Version CorrelationId Stream - Key => int16 // 999 +Heartbeat => Key Version + Key => int16 // 22 Version => int16 - CorrelationId => int32 - Stream => string ``` == Authentication diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl index 6bb7e9504c..67a59d1dbb 100644 --- a/deps/rabbitmq_stream/include/rabbit_stream.hrl +++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl @@ -1,26 +1,26 @@ --define(COMMAND_PUBLISH, 0). --define(COMMAND_PUBLISH_CONFIRM, 1). --define(COMMAND_SUBSCRIBE, 2). --define(COMMAND_DELIVER, 3). --define(COMMAND_CREDIT, 4). --define(COMMAND_UNSUBSCRIBE, 5). --define(COMMAND_PUBLISH_ERROR, 6). --define(COMMAND_METADATA_UPDATE, 7). --define(COMMAND_METADATA, 8). --define(COMMAND_SASL_HANDSHAKE, 9). --define(COMMAND_SASL_AUTHENTICATE, 10). --define(COMMAND_TUNE, 11). --define(COMMAND_OPEN, 12). --define(COMMAND_CLOSE, 13). --define(COMMAND_HEARTBEAT, 14). --define(COMMAND_PEER_PROPERTIES, 15). --define(COMMAND_COMMIT_OFFSET, 16). --define(COMMAND_QUERY_OFFSET, 17). --define(COMMAND_DECLARE_PUBLISHER, 18). --define(COMMAND_DELETE_PUBLISHER, 19). --define(COMMAND_QUERY_PUBLISHER_SEQUENCE, 20). --define(COMMAND_CREATE_STREAM, 998). --define(COMMAND_DELETE_STREAM, 999). +-define(COMMAND_DECLARE_PUBLISHER, 0). +-define(COMMAND_PUBLISH, 1). +-define(COMMAND_PUBLISH_CONFIRM, 2). +-define(COMMAND_PUBLISH_ERROR, 3). +-define(COMMAND_QUERY_PUBLISHER_SEQUENCE, 4). +-define(COMMAND_DELETE_PUBLISHER, 5). +-define(COMMAND_SUBSCRIBE, 6). +-define(COMMAND_DELIVER, 7). +-define(COMMAND_CREDIT, 8). +-define(COMMAND_COMMIT_OFFSET, 9). +-define(COMMAND_QUERY_OFFSET, 10). +-define(COMMAND_UNSUBSCRIBE, 11). +-define(COMMAND_CREATE_STREAM, 12). +-define(COMMAND_DELETE_STREAM, 13). +-define(COMMAND_METADATA, 14). +-define(COMMAND_METADATA_UPDATE, 15). +-define(COMMAND_PEER_PROPERTIES, 16). +-define(COMMAND_SASL_HANDSHAKE, 17). +-define(COMMAND_SASL_AUTHENTICATE, 18). +-define(COMMAND_TUNE, 19). +-define(COMMAND_OPEN, 20). +-define(COMMAND_CLOSE, 21). +-define(COMMAND_HEARTBEAT, 22). -define(VERSION_0, 0). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 4a125bf368..1f2b912bbd 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -1245,45 +1245,6 @@ handle_frame_post_auth(Transport, {Connection0, State, Rest} end; handle_frame_post_auth(Transport, - #stream_connection{publishers = Publishers, - publisher_to_ids = PubToIds} = - Connection0, - State, - <<?COMMAND_DELETE_PUBLISHER:16, - ?VERSION_0:16, - CorrelationId:32, - PublisherId:8>>, - Rest) -> - case Publishers of - #{PublisherId := #publisher{stream = Stream, reference = Ref}} -> - Connection1 = - Connection0#stream_connection{publishers = - maps:remove(PublisherId, - Publishers), - publisher_to_ids = - maps:remove({Stream, Ref}, - PubToIds)}, - Connection2 = - maybe_clean_connection_from_stream(Stream, Connection1), - response(Transport, - Connection1, - ?COMMAND_DELETE_PUBLISHER, - CorrelationId, - ?RESPONSE_CODE_OK), - rabbit_stream_metrics:publisher_deleted(self(), - stream_r(Stream, - Connection2), - PublisherId), - {Connection2, State, Rest}; - _ -> - response(Transport, - Connection0, - ?COMMAND_DELETE_PUBLISHER, - CorrelationId, - ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST), - {Connection0, State, Rest} - end; -handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits, virtual_host = VirtualHost, @@ -1353,6 +1314,93 @@ handle_frame_post_auth(Transport, {Connection, State, Rest} end; handle_frame_post_auth(Transport, + #stream_connection{socket = S, + virtual_host = VirtualHost, + user = User} = + Connection, + State, + <<?COMMAND_QUERY_PUBLISHER_SEQUENCE:16, + ?VERSION_0:16, + CorrelationId:32, + ReferenceSize:16, + Reference:ReferenceSize/binary, + StreamSize:16, + Stream:StreamSize/binary>>, + Rest) -> + FrameSize = ?RESPONSE_FRAME_SIZE + 8, + {ResponseCode, Sequence} = + case rabbit_stream_utils:check_read_permitted(#resource{name = Stream, + kind = queue, + virtual_host = + VirtualHost}, + User, #{}) + of + ok -> + case rabbit_stream_manager:lookup_local_member(VirtualHost, + Stream) + of + {error, not_found} -> + {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0}; + {ok, LocalMemberPid} -> + {?RESPONSE_CODE_OK, + case osiris:fetch_writer_seq(LocalMemberPid, Reference) + of + undefined -> + 0; + Offt -> + Offt + end} + end; + error -> + {?RESPONSE_CODE_ACCESS_REFUSED, 0} + end, + Transport:send(S, + [<<FrameSize:32, ?COMMAND_QUERY_PUBLISHER_SEQUENCE:16, + ?VERSION_0:16>>, + <<CorrelationId:32>>, + <<ResponseCode:16>>, + <<Sequence:64>>]), + {Connection, State, Rest}; +handle_frame_post_auth(Transport, + #stream_connection{publishers = Publishers, + publisher_to_ids = PubToIds} = + Connection0, + State, + <<?COMMAND_DELETE_PUBLISHER:16, + ?VERSION_0:16, + CorrelationId:32, + PublisherId:8>>, + Rest) -> + case Publishers of + #{PublisherId := #publisher{stream = Stream, reference = Ref}} -> + Connection1 = + Connection0#stream_connection{publishers = + maps:remove(PublisherId, + Publishers), + publisher_to_ids = + maps:remove({Stream, Ref}, + PubToIds)}, + Connection2 = + maybe_clean_connection_from_stream(Stream, Connection1), + response(Transport, + Connection1, + ?COMMAND_DELETE_PUBLISHER, + CorrelationId, + ?RESPONSE_CODE_OK), + rabbit_stream_metrics:publisher_deleted(self(), + stream_r(Stream, + Connection2), + PublisherId), + {Connection2, State, Rest}; + _ -> + response(Transport, + Connection0, + ?COMMAND_DELETE_PUBLISHER, + CorrelationId, + ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST), + {Connection0, State, Rest} + end; +handle_frame_post_auth(Transport, #stream_connection{socket = Socket, stream_subscriptions = StreamSubscriptions, @@ -1493,56 +1541,6 @@ handle_frame_post_auth(Transport, {Connection, State, Rest} end; handle_frame_post_auth(Transport, - #stream_connection{stream_subscriptions = - StreamSubscriptions} = - Connection, - #stream_connection_state{consumers = Consumers} = State, - <<?COMMAND_UNSUBSCRIBE:16, - ?VERSION_0:16, - CorrelationId:32, - SubscriptionId:8/unsigned>>, - Rest) -> - case subscription_exists(StreamSubscriptions, SubscriptionId) of - false -> - response(Transport, - Connection, - ?COMMAND_UNSUBSCRIBE, - CorrelationId, - ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST), - {Connection, State, Rest}; - true -> - #{SubscriptionId := Consumer} = Consumers, - Stream = Consumer#consumer.stream, - #{Stream := SubscriptionsForThisStream} = StreamSubscriptions, - SubscriptionsForThisStream1 = - lists:delete(SubscriptionId, SubscriptionsForThisStream), - StreamSubscriptions1 = - case length(SubscriptionsForThisStream1) of - 0 -> - % no more subscription for this stream - maps:remove(Stream, StreamSubscriptions); - _ -> - StreamSubscriptions#{Stream => - SubscriptionsForThisStream1} - end, - Connection1 = - Connection#stream_connection{stream_subscriptions = - StreamSubscriptions1}, - Consumers1 = maps:remove(SubscriptionId, Consumers), - Connection2 = - maybe_clean_connection_from_stream(Stream, Connection1), - rabbit_stream_metrics:consumer_cancelled(self(), - stream_r(Stream, - Connection2), - SubscriptionId), - response_ok(Transport, - Connection, - ?COMMAND_SUBSCRIBE, - CorrelationId), - {Connection2, State#stream_connection_state{consumers = Consumers1}, - Rest} - end; -handle_frame_post_auth(Transport, #stream_connection{socket = S, send_file_oct = SendFileOct} = Connection, @@ -1665,53 +1663,55 @@ handle_frame_post_auth(Transport, <<Offset:64>>]), {Connection, State, Rest}; handle_frame_post_auth(Transport, - #stream_connection{socket = S, - virtual_host = VirtualHost, - user = User} = + #stream_connection{stream_subscriptions = + StreamSubscriptions} = Connection, - State, - <<?COMMAND_QUERY_PUBLISHER_SEQUENCE:16, + #stream_connection_state{consumers = Consumers} = State, + <<?COMMAND_UNSUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, - ReferenceSize:16, - Reference:ReferenceSize/binary, - StreamSize:16, - Stream:StreamSize/binary>>, + SubscriptionId:8/unsigned>>, Rest) -> - FrameSize = ?RESPONSE_FRAME_SIZE + 8, - {ResponseCode, Sequence} = - case rabbit_stream_utils:check_read_permitted(#resource{name = Stream, - kind = queue, - virtual_host = - VirtualHost}, - User, #{}) - of - ok -> - case rabbit_stream_manager:lookup_local_member(VirtualHost, - Stream) - of - {error, not_found} -> - {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0}; - {ok, LocalMemberPid} -> - {?RESPONSE_CODE_OK, - case osiris:fetch_writer_seq(LocalMemberPid, Reference) - of - undefined -> - 0; - Offt -> - Offt - end} - end; - error -> - {?RESPONSE_CODE_ACCESS_REFUSED, 0} - end, - Transport:send(S, - [<<FrameSize:32, ?COMMAND_QUERY_PUBLISHER_SEQUENCE:16, - ?VERSION_0:16>>, - <<CorrelationId:32>>, - <<ResponseCode:16>>, - <<Sequence:64>>]), - {Connection, State, Rest}; + case subscription_exists(StreamSubscriptions, SubscriptionId) of + false -> + response(Transport, + Connection, + ?COMMAND_UNSUBSCRIBE, + CorrelationId, + ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST), + {Connection, State, Rest}; + true -> + #{SubscriptionId := Consumer} = Consumers, + Stream = Consumer#consumer.stream, + #{Stream := SubscriptionsForThisStream} = StreamSubscriptions, + SubscriptionsForThisStream1 = + lists:delete(SubscriptionId, SubscriptionsForThisStream), + StreamSubscriptions1 = + case length(SubscriptionsForThisStream1) of + 0 -> + % no more subscription for this stream + maps:remove(Stream, StreamSubscriptions); + _ -> + StreamSubscriptions#{Stream => + SubscriptionsForThisStream1} + end, + Connection1 = + Connection#stream_connection{stream_subscriptions = + StreamSubscriptions1}, + Consumers1 = maps:remove(SubscriptionId, Consumers), + Connection2 = + maybe_clean_connection_from_stream(Stream, Connection1), + rabbit_stream_metrics:consumer_cancelled(self(), + stream_r(Stream, + Connection2), + SubscriptionId), + response_ok(Transport, + Connection, + ?COMMAND_SUBSCRIBE, + CorrelationId), + {Connection2, State#stream_connection_state{consumers = Consumers1}, + Rest} + end; handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, user = diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 6bffc78b0e..6a4159aef2 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -277,7 +277,6 @@ test_authenticate(S) -> gen_tcp:send(S, <<SaslAuthenticateFrameSize:32, SaslAuthenticateFrame/binary>>), - {ok, <<10:32, ?COMMAND_SASL_AUTHENTICATE:16, |