diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-11-20 11:43:04 +0100 |
---|---|---|
committer | Michael Klishin <michael@clojurewerkz.org> | 2020-11-30 13:29:16 +0300 |
commit | 87eea4acad1231ef3ce434672a8ce59a9dfba474 (patch) | |
tree | 9658622615a195f6f9bcd51ba11e1ae8d01d9eda | |
parent | c40d90b2330ba2648bcb35ef97180b7222a398f5 (diff) | |
download | rabbitmq-server-git-87eea4acad1231ef3ce434672a8ce59a9dfba474.tar.gz |
Include publish dedup in stream protocol
-rw-r--r-- | deps/rabbitmq_stream/docs/PROTOCOL.adoc | 49 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_manager.erl | 2 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_utils.erl | 4 |
3 files changed, 50 insertions, 5 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index 14e149664c..6cb2019ec7 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -142,6 +142,16 @@ doest not contain a correlation ID. |17 |Yes +|<<declareproducer>> +|Client +|18 +|Yes + +|<<deleteproducer>> +|Client +|19 +|Yes + |<<create>> |Client |998 @@ -159,7 +169,6 @@ doest not contain a correlation ID. Publish => Key Version Stream PublishedMessages Key => int16 // 0 Version => int16 - Stream => string // the name of the stream PublisherId => uint8 PublishedMessages => [PublishedMessage] PublishedMessage => PublishingId Message @@ -249,7 +258,7 @@ Unsubscribe => Key Version CorrelationId SubscriptionId PublishError => Key Version [PublishingError] Key => int16 // 6 Version => int16 - PublisherId => int8 + PublisherId => uint8 PublishingError => PublishingId Code PublishingId => int64 Code => int16 // code to identify the problem @@ -422,7 +431,7 @@ QueryOffsetRequest => Key Version CorrelationId Reference Stream Reference => string // max 256 characters Stream => string -QueryOffsetResponse => Key Version CorrelationId Reference Stream +QueryOffsetResponse => Key Version CorrelationId ResponseCode Offset Key => int16 // 17 Version => int16 CorrelationId => int32 @@ -430,6 +439,40 @@ QueryOffsetResponse => Key Version CorrelationId Reference Stream Offset => int64 ``` +=== DeclarePublisher + +``` +DeclarePublisherRequest => Key Version CorrelationId [Reference] Stream + Key => int16 // 18 + Version => int16 + CorrelationId => int32 + Reference => string // max 256 characters + Stream => string + +DeclarePublisherResponse => Key Version CorrelationId ResponseCode PublisherId + Key => int16 // 17 + Version => int16 + CorrelationId => int32 + ResponseCode => int16 + PublisherId => uint8 +``` + +=== DeletePublisher + +``` +DeletePublisherRequest => Key Version CorrelationId PublisherId + Key => int16 // 18 + Version => int16 + CorrelationId => int32 + PublisherId => uint8 + +DeletePublisherResponse => Key Version CorrelationId ResponseCode + Key => int16 // 17 + Version => int16 + CorrelationId => int32 + ResponseCode => int16 +``` + === Create ``` diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index e418dd1022..f329479d2e 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -161,6 +161,7 @@ handle_call({lookup_leader, VirtualHost, Stream}, _From, State) -> case is_stream_queue(Q) of true -> #{leader_pid := LeaderPid} = amqqueue:get_type_state(Q), + % FIXME check if pid is alive in case of stale information LeaderPid; _ -> cluster_not_found @@ -184,6 +185,7 @@ handle_call({lookup_local_member, VirtualHost, Stream}, _From, State) -> Acc end end, undefined, [LeaderPid] ++ ReplicaPids), + % FIXME check if pid is alive in case of stale information case LocalMember of undefined -> {error, not_available}; diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index c20aacb12c..b5d8ce7760 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -45,11 +45,11 @@ write_messages(_ClusterLeader, _PublisherId, <<>>) -> ok; write_messages(ClusterLeader, PublisherId, <<PublishingId:64, 0:1, MessageSize:31, Message:MessageSize/binary, Rest/binary>>) -> % FIXME handle write error - ok = osiris:write(ClusterLeader, {PublisherId, PublishingId}, Message), + ok = osiris:write(ClusterLeader, undefined, {PublisherId, PublishingId}, Message), write_messages(ClusterLeader, PublisherId, Rest); write_messages(ClusterLeader, PublisherId, <<PublishingId:64, 1:1, CompressionType:3, _Unused:4, MessageCount:16, BatchSize:32, Batch:BatchSize/binary, Rest/binary>>) -> % FIXME handle write error - ok = osiris:write(ClusterLeader, {PublisherId, PublishingId}, {batch, MessageCount, CompressionType, Batch}), + ok = osiris:write(ClusterLeader, undefined, {PublisherId, PublishingId}, {batch, MessageCount, CompressionType, Batch}), write_messages(ClusterLeader, PublisherId, Rest). |