summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-01-18 11:25:21 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-01-18 11:25:21 +0100
commit0d73b58be06aa05fd8dad4b4181cf5c25a99e744 (patch)
treec12ed4a2b94850d8d983a8b39c3d7e2eca2800a5
parenta087621257f1de355ea16ceebcfb798df7b880c6 (diff)
downloadrabbitmq-server-git-0d73b58be06aa05fd8dad4b4181cf5c25a99e744.tar.gz
Re-order stream command constants
The order is more usage-oriented.
-rw-r--r--deps/rabbitmq_stream/Makefile2
-rw-r--r--deps/rabbitmq_stream/docs/PROTOCOL.adoc406
-rw-r--r--deps/rabbitmq_stream/include/rabbit_stream.hrl46
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl264
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl1
5 files changed, 359 insertions, 360 deletions
diff --git a/deps/rabbitmq_stream/Makefile b/deps/rabbitmq_stream/Makefile
index 88d7717281..bc7268aeb5 100644
--- a/deps/rabbitmq_stream/Makefile
+++ b/deps/rabbitmq_stream/Makefile
@@ -19,7 +19,7 @@ endef
DEPS = rabbit
-TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers
+TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client
DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk
DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
index 025a359d17..eb759972e3 100644
--- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc
+++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
@@ -52,127 +52,145 @@ doest not contain a correlation ID.
|===
|Command |From |Key | Expects response?
-|<<publish>>
+|<<declareproducer>>
|Client
|0
-|No
+|Yes
-|<<publishconfirm>>
-|Server
+|<<publish>>
+|Client
|1
|No
-|<<subscribe>>
-|Client
+|<<publishconfirm>>
+|Server
|2
-|Yes
+|No
-|<<deliver>>
+|<<publisherror>>
|Server
|3
|No
-|<<credit>>
+|<<querypublishersequence>>
|Client
|4
-|No
+|Yes
-|<<unsubscribe>>
+|<<deleteproducer>>
|Client
|5
|Yes
-|<<publisherror>>
-|Server
+|<<subscribe>>
+|Client
|6
-|No
+|Yes
-|<<metadataupdate>>
+|<<deliver>>
|Server
|7
|No
-|<<metadata>>
+|<<credit>>
|Client
|8
|No
-|<<saslhandshake>>
+|<<commitoffset>>
|Client
|9
-|Yes
+|No
-|<<saslauthenticate>>
+|<<queryoffset>>
|Client
|10
|Yes
-|<<tune>>
-|Server
+|<<unsubscribe>>
+|Client
|11
|Yes
-|<<open>>
-|Server
+|<<create>>
+|Client
|12
|Yes
-|<<close>>
-|Client & Server
+|<<delete>>
+|Client
|13
|Yes
-|<<heartbeat>>
-|Client & Server
+|<<metadata>>
+|Client
|14
|No
-|<<peerproperties>>
-|Client
+|<<metadataupdate>>
+|Server
|15
-|Yes
+|No
-|<<commitoffset>>
+|<<peerproperties>>
|Client
|16
-|No
+|Yes
-|<<queryoffset>>
+|<<saslhandshake>>
|Client
|17
|Yes
-|<<declareproducer>>
+|<<saslauthenticate>>
|Client
|18
|Yes
-|<<deleteproducer>>
-|Client
+|<<tune>>
+|Server
|19
|Yes
-|<<querypublishersequence>>
-|Client
+|<<open>>
+|Server
|20
|Yes
-|<<create>>
-|Client
-|998
+|<<close>>
+|Client & Server
+|21
|Yes
-|<<delete>>
-|Client
-|999
-|Yes
+|<<heartbeat>>
+|Client & Server
+|22
+|No
|===
+=== DeclarePublisher
+
+```
+DeclarePublisherRequest => Key Version CorrelationId PublisherId [PublisherReference] Stream
+ Key => int16 // 0
+ Version => int16
+ CorrelationId => int32
+ PublisherId => uint8
+ PublisherReference => string // max 256 characters
+ Stream => string
+
+DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId
+ Key => int16 // 18
+ Version => int16
+ CorrelationId => int32
+ ResponseCode => int16
+```
+
=== Publish
```
Publish => Key Version Stream PublishedMessages
- Key => int16 // 0
+ Key => int16 // 1
Version => int16
PublisherId => uint8
PublishedMessages => [PublishedMessage]
@@ -185,17 +203,63 @@ Publish => Key Version Stream PublishedMessages
```
PublishConfirm => Key Version PublishingIds
- Key => int16 // 1
+ Key => int16 // 2
Version => int16
PublisherId => uint8
PublishingIds => [int64] // to correlate with the messages sent
```
+=== PublishError
+
+```
+PublishError => Key Version [PublishingError]
+ Key => int16 // 3
+ Version => int16
+ PublisherId => uint8
+ PublishingError => PublishingId Code
+ PublishingId => int64
+ Code => int16 // code to identify the problem
+```
+
+=== QueryPublisherSequence
+
+```
+QueryPublisherRequest => Key Version CorrelationId PublisherReference Stream
+ Key => int16 // 4
+ Version => int16
+ CorrelationId => int32
+ PublisherReference => string // max 256 characters
+ Stream => string
+
+QueryPublisherResponse => Key Version CorrelationId ResponseCode Sequence
+ Key => int16 // 4
+ Version => int16
+ CorrelationId => int32
+ ResponseCode => int16
+ Sequence => uint64
+```
+
+=== DeletePublisher
+
+```
+DeletePublisherRequest => Key Version CorrelationId PublisherId
+ Key => int16 // 5
+ Version => int16
+ CorrelationId => int32
+ PublisherId => uint8
+
+DeletePublisherResponse => Key Version CorrelationId ResponseCode
+ Key => int16 // 5
+ Version => int16
+ CorrelationId => int32
+ ResponseCode => int16
+```
+
=== Subscribe
```
Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification Credit
- Key => int16 // 2
+ Key => int16 // 6
Version => int16
CorrelationId => int32 // correlation id to correlate the response
SubscriptionId => uint8 // client-supplied id to identify the subscription
@@ -210,7 +274,7 @@ Subscribe => Key Version CorrelationId SubscriptionId Stream OffsetSpecification
```
Deliver => Key Version SubscriptionId OsirisChunk
- Key => int16 // 3
+ Key => int16 // 7
Version => int32
SubscriptionId => uint8
OsirisChunk => MagicVersion NumEntries NumRecords Epoch ChunkFirstOffset ChunkCrc DataLength Messages
@@ -233,13 +297,13 @@ for details on the structure of messages.
```
Credit => Key Version SubscriptionId Credit
- Key => int16 // 4
+ Key => int16 // 8
Version => int16
SubscriptionId => int8
Credit => int16 // the number of chunks that can be sent
CreditResponse => Key Version ResponseCode SubscriptionId
- Key => int16 // 4
+ Key => int16 // 8
Version => int16
ResponseCode => int16
SubscriptionId => int8
@@ -247,155 +311,110 @@ CreditResponse => Key Version ResponseCode SubscriptionId
NB: the server sent a response only in case of problem, e.g. crediting an unknown subscription.
-=== Unsubscribe
-
-```
-Unsubscribe => Key Version CorrelationId SubscriptionId
- Key => int16 // 5
- Version => int16
- CorrelationId => int32
- SubscriptionId => int8
-```
-
-=== PublishError
-
-```
-PublishError => Key Version [PublishingError]
- Key => int16 // 6
- Version => int16
- PublisherId => uint8
- PublishingError => PublishingId Code
- PublishingId => int64
- Code => int16 // code to identify the problem
-```
-
-=== MetadataUpdate
-
-```
-MetadataUpdate => Key Version MetadataInfo
- Key => int16 // 7
- Version => int16
- MetadataInfo => Code Stream
- Code => int16 // code to identify the information
- Stream => string // the stream implied
-```
-
-=== Metadata
-
-```
-MetadataQuery => Key Version CorrelationId [Stream]
- Key => int16 // 8
- Version => int16
- CorrelationId => int32
- Stream => string
-
-MetadataResponse => Key Version CorrelationId [Broker] [StreamMetadata]
- Key => int16 // 8
- Version => int16
- CorrelationId => int32
- Broker => Reference Host Port
- Reference => int16
- Host => string
- Port => int32
- StreamMetadata => StreamName LeaderReference ReplicasReferences
- StreamName => string
- ResponseCode => int16
- LeaderReference => int16
- ReplicasReferences => [int16]
-```
-
-=== SaslHandshake
+=== CommitOffset
```
-SaslHandshakeRequest => Key Version CorrelationId Mechanism
- Key => int16 // 9
- Version => int16
- CorrelationId => int32
-
-SaslHandshakeResponse => Key Version CorrelationId ResponseCode [Mechanism]
+CommitOffset => Key Version Reference Stream Offset
Key => int16 // 9
Version => int16
- CorrelationId => int32
- ResponseCode => int16
- Mechanism => string
+ CorrelationId => int32 // not used yet
+ Reference => string // max 256 characters
+ SubscriptionId => uint8
+ Offset => int64
```
-=== SaslAuthenticate
+=== QueryOffset
```
-SaslAuthenticateRequest => Key Version CorrelationId Mechanism SaslOpaqueData
+QueryOffsetRequest => Key Version CorrelationId Reference Stream
Key => int16 // 10
Version => int16
CorrelationId => int32
- Mechanism => string
- SaslOpaqueData => bytes
+ Reference => string // max 256 characters
+ Stream => string
-SaslAuthenticateResponse => Key Version CorrelationId ResponseCode SaslOpaqueData
+QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset
Key => int16 // 10
Version => int16
CorrelationId => int32
ResponseCode => int16
- SaslOpaqueData => bytes
+ Offset => uint64
```
-=== Tune
+=== Unsubscribe
```
-TuneRequest => Key Version FrameMax Heartbeat
- Key => int16 // 11, to identify the command
+Unsubscribe => Key Version CorrelationId SubscriptionId
+ Key => int16 // 11
Version => int16
- FrameMax => int32 // in bytes, 0 means no limit
- Heartbeat => int32 // in seconds, 0 means no heartbeat
-
-TuneResponse => TuneRequest
+ CorrelationId => int32
+ SubscriptionId => int8
```
-=== Open
+=== Create
```
-OpenRequest => Key Version CorrelationId VirtualHost
+Create => Key Version CorrelationId Stream Arguments
Key => int16 // 12
Version => int16
CorrelationId => int32
- VirtualHost => string
+ Stream => string
+ Arguments => [Argument]
+ Argument => Key Value
+ Key => string
+ Value => string
+```
-OpenResponse => Key Version CorrelationId ResponseCode
- Key => int16 // 12
+=== Delete
+
+```
+Delete => Key Version CorrelationId Stream
+ Key => int16 // 13
Version => int16
CorrelationId => int32
- ResponseCode => int16
+ Stream => string
```
-=== Close
+=== Metadata
```
-CloseRequest => Key Version CorrelationId ClosingCode ClosingReason
- Key => int16 // 13
+MetadataQuery => Key Version CorrelationId [Stream]
+ Key => int16 // 14
Version => int16
CorrelationId => int32
- ClosingCode => int16
- ClosingReason => string
+ Stream => string
-CloseResponse => Key Version CorrelationId ResponseCode
- Key => int16 // 13
+MetadataResponse => Key Version CorrelationId [Broker] [StreamMetadata]
+ Key => int16 // 14
Version => int16
CorrelationId => int32
- ResponseCode => int16
+ Broker => Reference Host Port
+ Reference => int16
+ Host => string
+ Port => int32
+ StreamMetadata => StreamName LeaderReference ReplicasReferences
+ StreamName => string
+ ResponseCode => int16
+ LeaderReference => int16
+ ReplicasReferences => [int16]
```
-=== Heartbeat
+=== MetadataUpdate
```
-Heartbeat => Key Version
- Key => int16 // 14
+MetadataUpdate => Key Version MetadataInfo
+ Key => int16 // 15
Version => int16
+ MetadataInfo => Code Stream
+ Code => int16 // code to identify the information
+ Stream => string // the stream implied
```
=== PeerProperties
```
PeerPropertiesRequest => Key Version PeerProperties
- Key => int16 // 15
+ Key => int16 // 16
Version => int16
CorrelationId => int32
PeerProperties => [PeerProperty]
@@ -403,8 +422,8 @@ PeerPropertiesRequest => Key Version PeerProperties
Key => string
Value => string
-SaslAuthenticateResponse => Key Version CorrelationId ResponseCode PeerProperties
- Key => int16 // 15
+PeerPropertiesResponse => Key Version CorrelationId ResponseCode PeerProperties
+ Key => int16 // 16
Version => int16
CorrelationId => int32
ResponseCode => int16
@@ -414,110 +433,91 @@ SaslAuthenticateResponse => Key Version CorrelationId ResponseCode PeerPropertie
Value => string
```
-=== CommitOffset
-
-```
-CommitOffset => Key Version Reference Stream Offset
- Key => int16 // 16
- Version => int16
- CorrelationId => int32 // not used yet
- Reference => string // max 256 characters
- SubscriptionId => uint8
- Offset => int64
-```
-
-=== QueryOffset
+=== SaslHandshake
```
-QueryOffsetRequest => Key Version CorrelationId Reference Stream
+SaslHandshakeRequest => Key Version CorrelationId Mechanism
Key => int16 // 17
Version => int16
CorrelationId => int32
- Reference => string // max 256 characters
- Stream => string
-QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset
+SaslHandshakeResponse => Key Version CorrelationId ResponseCode [Mechanism]
Key => int16 // 17
Version => int16
CorrelationId => int32
ResponseCode => int16
- Offset => uint64
+ Mechanism => string
```
-=== DeclarePublisher
+=== SaslAuthenticate
```
-DeclarePublisherRequest => Key Version CorrelationId PublisherId [PublisherReference] Stream
+SaslAuthenticateRequest => Key Version CorrelationId Mechanism SaslOpaqueData
Key => int16 // 18
Version => int16
CorrelationId => int32
- PublisherId => uint8
- PublisherReference => string // max 256 characters
- Stream => string
+ Mechanism => string
+ SaslOpaqueData => bytes
-DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId
+SaslAuthenticateResponse => Key Version CorrelationId ResponseCode SaslOpaqueData
Key => int16 // 18
Version => int16
CorrelationId => int32
ResponseCode => int16
+ SaslOpaqueData => bytes
```
-=== DeletePublisher
+=== Tune
```
-DeletePublisherRequest => Key Version CorrelationId PublisherId
+TuneRequest => Key Version FrameMax Heartbeat
Key => int16 // 19
Version => int16
- CorrelationId => int32
- PublisherId => uint8
+ FrameMax => int32 // in bytes, 0 means no limit
+ Heartbeat => int32 // in seconds, 0 means no heartbeat
-DeletePublisherResponse => Key Version CorrelationId ResponseCode
- Key => int16 // 19
- Version => int16
- CorrelationId => int32
- ResponseCode => int16
+TuneResponse => TuneRequest
```
-=== QueryPublisherSequence
+=== Open
```
-QueryPublisherRequest => Key Version CorrelationId PublisherReference Stream
+OpenRequest => Key Version CorrelationId VirtualHost
Key => int16 // 20
Version => int16
CorrelationId => int32
- PublisherReference => string // max 256 characters
- Stream => string
+ VirtualHost => string
-QueryPublisherResponse => Key Version CorrelationId ResponseCode Sequence
+OpenResponse => Key Version CorrelationId ResponseCode
Key => int16 // 20
Version => int16
CorrelationId => int32
ResponseCode => int16
- Sequence => uint64
```
-=== Create
+=== Close
```
-Create => Key Version CorrelationId Stream Arguments
- Key => int16 // 998
+CloseRequest => Key Version CorrelationId ClosingCode ClosingReason
+ Key => int16 // 21
Version => int16
CorrelationId => int32
- Stream => string
- Arguments => [Argument]
- Argument => Key Value
- Key => string
- Value => string
+ ClosingCode => int16
+ ClosingReason => string
+
+CloseResponse => Key Version CorrelationId ResponseCode
+ Key => int16 // 21
+ Version => int16
+ CorrelationId => int32
+ ResponseCode => int16
```
-=== Delete
+=== Heartbeat
```
-Delete => Key Version CorrelationId Stream
- Key => int16 // 999
+Heartbeat => Key Version
+ Key => int16 // 22
Version => int16
- CorrelationId => int32
- Stream => string
```
== Authentication
diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl
index 6bb7e9504c..67a59d1dbb 100644
--- a/deps/rabbitmq_stream/include/rabbit_stream.hrl
+++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl
@@ -1,26 +1,26 @@
--define(COMMAND_PUBLISH, 0).
--define(COMMAND_PUBLISH_CONFIRM, 1).
--define(COMMAND_SUBSCRIBE, 2).
--define(COMMAND_DELIVER, 3).
--define(COMMAND_CREDIT, 4).
--define(COMMAND_UNSUBSCRIBE, 5).
--define(COMMAND_PUBLISH_ERROR, 6).
--define(COMMAND_METADATA_UPDATE, 7).
--define(COMMAND_METADATA, 8).
--define(COMMAND_SASL_HANDSHAKE, 9).
--define(COMMAND_SASL_AUTHENTICATE, 10).
--define(COMMAND_TUNE, 11).
--define(COMMAND_OPEN, 12).
--define(COMMAND_CLOSE, 13).
--define(COMMAND_HEARTBEAT, 14).
--define(COMMAND_PEER_PROPERTIES, 15).
--define(COMMAND_COMMIT_OFFSET, 16).
--define(COMMAND_QUERY_OFFSET, 17).
--define(COMMAND_DECLARE_PUBLISHER, 18).
--define(COMMAND_DELETE_PUBLISHER, 19).
--define(COMMAND_QUERY_PUBLISHER_SEQUENCE, 20).
--define(COMMAND_CREATE_STREAM, 998).
--define(COMMAND_DELETE_STREAM, 999).
+-define(COMMAND_DECLARE_PUBLISHER, 0).
+-define(COMMAND_PUBLISH, 1).
+-define(COMMAND_PUBLISH_CONFIRM, 2).
+-define(COMMAND_PUBLISH_ERROR, 3).
+-define(COMMAND_QUERY_PUBLISHER_SEQUENCE, 4).
+-define(COMMAND_DELETE_PUBLISHER, 5).
+-define(COMMAND_SUBSCRIBE, 6).
+-define(COMMAND_DELIVER, 7).
+-define(COMMAND_CREDIT, 8).
+-define(COMMAND_COMMIT_OFFSET, 9).
+-define(COMMAND_QUERY_OFFSET, 10).
+-define(COMMAND_UNSUBSCRIBE, 11).
+-define(COMMAND_CREATE_STREAM, 12).
+-define(COMMAND_DELETE_STREAM, 13).
+-define(COMMAND_METADATA, 14).
+-define(COMMAND_METADATA_UPDATE, 15).
+-define(COMMAND_PEER_PROPERTIES, 16).
+-define(COMMAND_SASL_HANDSHAKE, 17).
+-define(COMMAND_SASL_AUTHENTICATE, 18).
+-define(COMMAND_TUNE, 19).
+-define(COMMAND_OPEN, 20).
+-define(COMMAND_CLOSE, 21).
+-define(COMMAND_HEARTBEAT, 22).
-define(VERSION_0, 0).
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 4a125bf368..1f2b912bbd 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -1245,45 +1245,6 @@ handle_frame_post_auth(Transport,
{Connection0, State, Rest}
end;
handle_frame_post_auth(Transport,
- #stream_connection{publishers = Publishers,
- publisher_to_ids = PubToIds} =
- Connection0,
- State,
- <<?COMMAND_DELETE_PUBLISHER:16,
- ?VERSION_0:16,
- CorrelationId:32,
- PublisherId:8>>,
- Rest) ->
- case Publishers of
- #{PublisherId := #publisher{stream = Stream, reference = Ref}} ->
- Connection1 =
- Connection0#stream_connection{publishers =
- maps:remove(PublisherId,
- Publishers),
- publisher_to_ids =
- maps:remove({Stream, Ref},
- PubToIds)},
- Connection2 =
- maybe_clean_connection_from_stream(Stream, Connection1),
- response(Transport,
- Connection1,
- ?COMMAND_DELETE_PUBLISHER,
- CorrelationId,
- ?RESPONSE_CODE_OK),
- rabbit_stream_metrics:publisher_deleted(self(),
- stream_r(Stream,
- Connection2),
- PublisherId),
- {Connection2, State, Rest};
- _ ->
- response(Transport,
- Connection0,
- ?COMMAND_DELETE_PUBLISHER,
- CorrelationId,
- ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST),
- {Connection0, State, Rest}
- end;
-handle_frame_post_auth(Transport,
#stream_connection{socket = S,
credits = Credits,
virtual_host = VirtualHost,
@@ -1353,6 +1314,93 @@ handle_frame_post_auth(Transport,
{Connection, State, Rest}
end;
handle_frame_post_auth(Transport,
+ #stream_connection{socket = S,
+ virtual_host = VirtualHost,
+ user = User} =
+ Connection,
+ State,
+ <<?COMMAND_QUERY_PUBLISHER_SEQUENCE:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ ReferenceSize:16,
+ Reference:ReferenceSize/binary,
+ StreamSize:16,
+ Stream:StreamSize/binary>>,
+ Rest) ->
+ FrameSize = ?RESPONSE_FRAME_SIZE + 8,
+ {ResponseCode, Sequence} =
+ case rabbit_stream_utils:check_read_permitted(#resource{name = Stream,
+ kind = queue,
+ virtual_host =
+ VirtualHost},
+ User, #{})
+ of
+ ok ->
+ case rabbit_stream_manager:lookup_local_member(VirtualHost,
+ Stream)
+ of
+ {error, not_found} ->
+ {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0};
+ {ok, LocalMemberPid} ->
+ {?RESPONSE_CODE_OK,
+ case osiris:fetch_writer_seq(LocalMemberPid, Reference)
+ of
+ undefined ->
+ 0;
+ Offt ->
+ Offt
+ end}
+ end;
+ error ->
+ {?RESPONSE_CODE_ACCESS_REFUSED, 0}
+ end,
+ Transport:send(S,
+ [<<FrameSize:32, ?COMMAND_QUERY_PUBLISHER_SEQUENCE:16,
+ ?VERSION_0:16>>,
+ <<CorrelationId:32>>,
+ <<ResponseCode:16>>,
+ <<Sequence:64>>]),
+ {Connection, State, Rest};
+handle_frame_post_auth(Transport,
+ #stream_connection{publishers = Publishers,
+ publisher_to_ids = PubToIds} =
+ Connection0,
+ State,
+ <<?COMMAND_DELETE_PUBLISHER:16,
+ ?VERSION_0:16,
+ CorrelationId:32,
+ PublisherId:8>>,
+ Rest) ->
+ case Publishers of
+ #{PublisherId := #publisher{stream = Stream, reference = Ref}} ->
+ Connection1 =
+ Connection0#stream_connection{publishers =
+ maps:remove(PublisherId,
+ Publishers),
+ publisher_to_ids =
+ maps:remove({Stream, Ref},
+ PubToIds)},
+ Connection2 =
+ maybe_clean_connection_from_stream(Stream, Connection1),
+ response(Transport,
+ Connection1,
+ ?COMMAND_DELETE_PUBLISHER,
+ CorrelationId,
+ ?RESPONSE_CODE_OK),
+ rabbit_stream_metrics:publisher_deleted(self(),
+ stream_r(Stream,
+ Connection2),
+ PublisherId),
+ {Connection2, State, Rest};
+ _ ->
+ response(Transport,
+ Connection0,
+ ?COMMAND_DELETE_PUBLISHER,
+ CorrelationId,
+ ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST),
+ {Connection0, State, Rest}
+ end;
+handle_frame_post_auth(Transport,
#stream_connection{socket = Socket,
stream_subscriptions =
StreamSubscriptions,
@@ -1493,56 +1541,6 @@ handle_frame_post_auth(Transport,
{Connection, State, Rest}
end;
handle_frame_post_auth(Transport,
- #stream_connection{stream_subscriptions =
- StreamSubscriptions} =
- Connection,
- #stream_connection_state{consumers = Consumers} = State,
- <<?COMMAND_UNSUBSCRIBE:16,
- ?VERSION_0:16,
- CorrelationId:32,
- SubscriptionId:8/unsigned>>,
- Rest) ->
- case subscription_exists(StreamSubscriptions, SubscriptionId) of
- false ->
- response(Transport,
- Connection,
- ?COMMAND_UNSUBSCRIBE,
- CorrelationId,
- ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST),
- {Connection, State, Rest};
- true ->
- #{SubscriptionId := Consumer} = Consumers,
- Stream = Consumer#consumer.stream,
- #{Stream := SubscriptionsForThisStream} = StreamSubscriptions,
- SubscriptionsForThisStream1 =
- lists:delete(SubscriptionId, SubscriptionsForThisStream),
- StreamSubscriptions1 =
- case length(SubscriptionsForThisStream1) of
- 0 ->
- % no more subscription for this stream
- maps:remove(Stream, StreamSubscriptions);
- _ ->
- StreamSubscriptions#{Stream =>
- SubscriptionsForThisStream1}
- end,
- Connection1 =
- Connection#stream_connection{stream_subscriptions =
- StreamSubscriptions1},
- Consumers1 = maps:remove(SubscriptionId, Consumers),
- Connection2 =
- maybe_clean_connection_from_stream(Stream, Connection1),
- rabbit_stream_metrics:consumer_cancelled(self(),
- stream_r(Stream,
- Connection2),
- SubscriptionId),
- response_ok(Transport,
- Connection,
- ?COMMAND_SUBSCRIBE,
- CorrelationId),
- {Connection2, State#stream_connection_state{consumers = Consumers1},
- Rest}
- end;
-handle_frame_post_auth(Transport,
#stream_connection{socket = S,
send_file_oct = SendFileOct} =
Connection,
@@ -1665,53 +1663,55 @@ handle_frame_post_auth(Transport,
<<Offset:64>>]),
{Connection, State, Rest};
handle_frame_post_auth(Transport,
- #stream_connection{socket = S,
- virtual_host = VirtualHost,
- user = User} =
+ #stream_connection{stream_subscriptions =
+ StreamSubscriptions} =
Connection,
- State,
- <<?COMMAND_QUERY_PUBLISHER_SEQUENCE:16,
+ #stream_connection_state{consumers = Consumers} = State,
+ <<?COMMAND_UNSUBSCRIBE:16,
?VERSION_0:16,
CorrelationId:32,
- ReferenceSize:16,
- Reference:ReferenceSize/binary,
- StreamSize:16,
- Stream:StreamSize/binary>>,
+ SubscriptionId:8/unsigned>>,
Rest) ->
- FrameSize = ?RESPONSE_FRAME_SIZE + 8,
- {ResponseCode, Sequence} =
- case rabbit_stream_utils:check_read_permitted(#resource{name = Stream,
- kind = queue,
- virtual_host =
- VirtualHost},
- User, #{})
- of
- ok ->
- case rabbit_stream_manager:lookup_local_member(VirtualHost,
- Stream)
- of
- {error, not_found} ->
- {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0};
- {ok, LocalMemberPid} ->
- {?RESPONSE_CODE_OK,
- case osiris:fetch_writer_seq(LocalMemberPid, Reference)
- of
- undefined ->
- 0;
- Offt ->
- Offt
- end}
- end;
- error ->
- {?RESPONSE_CODE_ACCESS_REFUSED, 0}
- end,
- Transport:send(S,
- [<<FrameSize:32, ?COMMAND_QUERY_PUBLISHER_SEQUENCE:16,
- ?VERSION_0:16>>,
- <<CorrelationId:32>>,
- <<ResponseCode:16>>,
- <<Sequence:64>>]),
- {Connection, State, Rest};
+ case subscription_exists(StreamSubscriptions, SubscriptionId) of
+ false ->
+ response(Transport,
+ Connection,
+ ?COMMAND_UNSUBSCRIBE,
+ CorrelationId,
+ ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST),
+ {Connection, State, Rest};
+ true ->
+ #{SubscriptionId := Consumer} = Consumers,
+ Stream = Consumer#consumer.stream,
+ #{Stream := SubscriptionsForThisStream} = StreamSubscriptions,
+ SubscriptionsForThisStream1 =
+ lists:delete(SubscriptionId, SubscriptionsForThisStream),
+ StreamSubscriptions1 =
+ case length(SubscriptionsForThisStream1) of
+ 0 ->
+ % no more subscription for this stream
+ maps:remove(Stream, StreamSubscriptions);
+ _ ->
+ StreamSubscriptions#{Stream =>
+ SubscriptionsForThisStream1}
+ end,
+ Connection1 =
+ Connection#stream_connection{stream_subscriptions =
+ StreamSubscriptions1},
+ Consumers1 = maps:remove(SubscriptionId, Consumers),
+ Connection2 =
+ maybe_clean_connection_from_stream(Stream, Connection1),
+ rabbit_stream_metrics:consumer_cancelled(self(),
+ stream_r(Stream,
+ Connection2),
+ SubscriptionId),
+ response_ok(Transport,
+ Connection,
+ ?COMMAND_SUBSCRIBE,
+ CorrelationId),
+ {Connection2, State#stream_connection_state{consumers = Consumers1},
+ Rest}
+ end;
handle_frame_post_auth(Transport,
#stream_connection{virtual_host = VirtualHost,
user =
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
index 6bffc78b0e..6a4159aef2 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
@@ -277,7 +277,6 @@ test_authenticate(S) ->
gen_tcp:send(S,
<<SaslAuthenticateFrameSize:32,
SaslAuthenticateFrame/binary>>),
-
{ok,
<<10:32,
?COMMAND_SASL_AUTHENTICATE:16,