summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-11-20 11:43:04 +0100
committerMichael Klishin <michael@clojurewerkz.org>2020-11-30 13:29:16 +0300
commit87eea4acad1231ef3ce434672a8ce59a9dfba474 (patch)
tree9658622615a195f6f9bcd51ba11e1ae8d01d9eda
parentc40d90b2330ba2648bcb35ef97180b7222a398f5 (diff)
downloadrabbitmq-server-git-87eea4acad1231ef3ce434672a8ce59a9dfba474.tar.gz
Include publish dedup in stream protocol
-rw-r--r--deps/rabbitmq_stream/docs/PROTOCOL.adoc49
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl2
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_utils.erl4
3 files changed, 50 insertions, 5 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
index 14e149664c..6cb2019ec7 100644
--- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc
+++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
@@ -142,6 +142,16 @@ doest not contain a correlation ID.
|17
|Yes
+|<<declareproducer>>
+|Client
+|18
+|Yes
+
+|<<deleteproducer>>
+|Client
+|19
+|Yes
+
|<<create>>
|Client
|998
@@ -159,7 +169,6 @@ doest not contain a correlation ID.
Publish => Key Version Stream PublishedMessages
Key => int16 // 0
Version => int16
- Stream => string // the name of the stream
PublisherId => uint8
PublishedMessages => [PublishedMessage]
PublishedMessage => PublishingId Message
@@ -249,7 +258,7 @@ Unsubscribe => Key Version CorrelationId SubscriptionId
PublishError => Key Version [PublishingError]
Key => int16 // 6
Version => int16
- PublisherId => int8
+ PublisherId => uint8
PublishingError => PublishingId Code
PublishingId => int64
Code => int16 // code to identify the problem
@@ -422,7 +431,7 @@ QueryOffsetRequest => Key Version CorrelationId Reference Stream
Reference => string // max 256 characters
Stream => string
-QueryOffsetResponse => Key Version CorrelationId Reference Stream
+QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset
Key => int16 // 17
Version => int16
CorrelationId => int32
@@ -430,6 +439,40 @@ QueryOffsetResponse => Key Version CorrelationId Reference Stream
Offset => int64
```
+=== DeclarePublisher
+
+```
+DeclarePublisherRequest => Key Version CorrelationId [Reference] Stream
+ Key => int16 // 18
+ Version => int16
+ CorrelationId => int32
+ Reference => string // max 256 characters
+ Stream => string
+
+DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId
+ Key => int16 // 17
+ Version => int16
+ CorrelationId => int32
+ ResponseCode => int16
+ PublisherId => uint8
+```
+
+=== DeletePublisher
+
+```
+DeletePublisherRequest => Key Version CorrelationId PublisherId
+ Key => int16 // 18
+ Version => int16
+ CorrelationId => int32
+ PublisherId => uint8
+
+DeletePublisherResponse => Key Version CorrelationId ResponseCode
+ Key => int16 // 17
+ Version => int16
+ CorrelationId => int32
+ ResponseCode => int16
+```
+
=== Create
```
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index e418dd1022..f329479d2e 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -161,6 +161,7 @@ handle_call({lookup_leader, VirtualHost, Stream}, _From, State) ->
case is_stream_queue(Q) of
true ->
#{leader_pid := LeaderPid} = amqqueue:get_type_state(Q),
+ % FIXME check if pid is alive in case of stale information
LeaderPid;
_ ->
cluster_not_found
@@ -184,6 +185,7 @@ handle_call({lookup_local_member, VirtualHost, Stream}, _From, State) ->
Acc
end
end, undefined, [LeaderPid] ++ ReplicaPids),
+ % FIXME check if pid is alive in case of stale information
case LocalMember of
undefined ->
{error, not_available};
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
index c20aacb12c..b5d8ce7760 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
@@ -45,11 +45,11 @@ write_messages(_ClusterLeader, _PublisherId, <<>>) ->
ok;
write_messages(ClusterLeader, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) ->
% FIXME handle write error
- ok = osiris:write(ClusterLeader, {PublisherId, PublishingId}, Message),
+ ok = osiris:write(ClusterLeader, undefined, {PublisherId, PublishingId}, Message),
write_messages(ClusterLeader, PublisherId, Rest);
write_messages(ClusterLeader, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) ->
% FIXME handle write error
- ok = osiris:write(ClusterLeader, {PublisherId, PublishingId}, {batch, MessageCount, CompressionType, Batch}),
+ ok = osiris:write(ClusterLeader, undefined, {PublisherId, PublishingId}, {batch, MessageCount, CompressionType, Batch}),
write_messages(ClusterLeader, PublisherId, Rest).