diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2023-03-16 13:13:59 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-03-16 13:13:59 +0100 |
commit | 3f74df76784737a241c5678fbbd4cad92699b4bf (patch) | |
tree | f148501cd59d00caaec12f545f9dfe0f37e6326c | |
parent | c23fd7159dd91cf32ab0aa4ae16feb974d7a8d97 (diff) | |
parent | e9f2fa41ce296b3c4ee494b963f34de509f9ff11 (diff) | |
download | rabbitmq-server-git-3f74df76784737a241c5678fbbd4cad92699b4bf.tar.gz |
Merge pull request #7638 from rabbitmq/stream-take-credit-even-for-inactive-subscription
Take credits for inactive stream subscription
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 44 |
1 files changed, 31 insertions, 13 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 177a9c4278..d9209b7bab 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -2008,11 +2008,14 @@ handle_frame_post_auth(Transport, #stream_connection_state{consumers = Consumers} = State, {credit, SubscriptionId, Credit}) -> case Consumers of - #{SubscriptionId := #consumer{log = undefined}} -> + #{SubscriptionId := #consumer{log = undefined} = Consumer} -> %% the consumer is not active, it's likely to be credit leftovers - %% from a formerly active consumer, just logging and send an error + %% from a formerly active consumer. Taking the credits, + %% logging and sending an error rabbit_log:debug("Giving credit to an inactive consumer: ~tp", [SubscriptionId]), + #consumer{credit = AvailableCredit} = Consumer, + Consumer1 = Consumer#consumer{credit = AvailableCredit + Credit}, Code = ?RESPONSE_CODE_PRECONDITION_FAILED, Frame = @@ -2022,7 +2025,9 @@ handle_frame_post_auth(Transport, rabbit_global_counters:increase_protocol_counter(stream, ?PRECONDITION_FAILED, 1), - {Connection, State}; + {Connection, + State#stream_connection_state{consumers = + Consumers#{SubscriptionId => Consumer1}}}; #{SubscriptionId := Consumer} -> #consumer{credit = AvailableCredit, last_listener_offset = LLO} = Consumer, @@ -2519,9 +2524,10 @@ handle_frame_post_auth(Transport, ROS end, - rabbit_log:debug("Initializing reader for active consumer, offset " + rabbit_log:debug("Initializing reader for active consumer " + "(subscription ~tp, stream ~tp), offset " "spec is ~tp", - [OffsetSpec]), + [SubscriptionId, Stream, OffsetSpec]), QueueResource = #resource{name = Stream, kind = queue, @@ -2535,6 +2541,19 @@ handle_frame_post_auth(Transport, Properties, OffsetSpec), Consumer1 = Consumer#consumer{log = Segment}, + #consumer{credit = Crdt, + send_limit = SndLmt, + configuration = #consumer_configuration{counters = ConsumerCounters}} = Consumer1, + + rabbit_log:debug("Dispatching to subscription ~tp (stream ~tp), " + "credit(s) ~tp, send limit ~tp", + [SubscriptionId, + Stream, + Crdt, + SndLmt]), + + ConsumedMessagesBefore = messages_consumed(ConsumerCounters), + Consumer2 = case send_chunks(DeliverVersion, Transport, @@ -2554,17 +2573,16 @@ handle_frame_post_auth(Transport, {ok, Csmr} -> Csmr end, - #consumer{configuration = - #consumer_configuration{counters = - ConsumerCounters}, - log = Log2} = - Consumer2, + #consumer{log = Log2} = Consumer2, ConsumerOffset = osiris_log:next_offset(Log2), - rabbit_log:debug("Subscription ~tp is now at offset ~tp with ~tp " + ConsumedMessagesAfter = messages_consumed(ConsumerCounters), + rabbit_log:debug("Subscription ~tp (stream ~tp) is now at offset ~tp with ~tp " "message(s) distributed after subscription", - [SubscriptionId, ConsumerOffset, - messages_consumed(ConsumerCounters)]), + [SubscriptionId, + Stream, + ConsumerOffset, + ConsumedMessagesAfter - ConsumedMessagesBefore]), Consumers#{SubscriptionId => Consumer2}; #{SubscriptionId := |