summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2023-03-16 13:13:59 +0100
committerGitHub <noreply@github.com>2023-03-16 13:13:59 +0100
commit3f74df76784737a241c5678fbbd4cad92699b4bf (patch)
treef148501cd59d00caaec12f545f9dfe0f37e6326c
parentc23fd7159dd91cf32ab0aa4ae16feb974d7a8d97 (diff)
parente9f2fa41ce296b3c4ee494b963f34de509f9ff11 (diff)
downloadrabbitmq-server-git-3f74df76784737a241c5678fbbd4cad92699b4bf.tar.gz
Merge pull request #7638 from rabbitmq/stream-take-credit-even-for-inactive-subscription
Take credits for inactive stream subscription
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl44
1 files changed, 31 insertions, 13 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 177a9c4278..d9209b7bab 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -2008,11 +2008,14 @@ handle_frame_post_auth(Transport,
#stream_connection_state{consumers = Consumers} = State,
{credit, SubscriptionId, Credit}) ->
case Consumers of
- #{SubscriptionId := #consumer{log = undefined}} ->
+ #{SubscriptionId := #consumer{log = undefined} = Consumer} ->
%% the consumer is not active, it's likely to be credit leftovers
- %% from a formerly active consumer, just logging and send an error
+ %% from a formerly active consumer. Taking the credits,
+ %% logging and sending an error
rabbit_log:debug("Giving credit to an inactive consumer: ~tp",
[SubscriptionId]),
+ #consumer{credit = AvailableCredit} = Consumer,
+ Consumer1 = Consumer#consumer{credit = AvailableCredit + Credit},
Code = ?RESPONSE_CODE_PRECONDITION_FAILED,
Frame =
@@ -2022,7 +2025,9 @@ handle_frame_post_auth(Transport,
rabbit_global_counters:increase_protocol_counter(stream,
?PRECONDITION_FAILED,
1),
- {Connection, State};
+ {Connection,
+ State#stream_connection_state{consumers =
+ Consumers#{SubscriptionId => Consumer1}}};
#{SubscriptionId := Consumer} ->
#consumer{credit = AvailableCredit, last_listener_offset = LLO} =
Consumer,
@@ -2519,9 +2524,10 @@ handle_frame_post_auth(Transport,
ROS
end,
- rabbit_log:debug("Initializing reader for active consumer, offset "
+ rabbit_log:debug("Initializing reader for active consumer "
+ "(subscription ~tp, stream ~tp), offset "
"spec is ~tp",
- [OffsetSpec]),
+ [SubscriptionId, Stream, OffsetSpec]),
QueueResource =
#resource{name = Stream,
kind = queue,
@@ -2535,6 +2541,19 @@ handle_frame_post_auth(Transport,
Properties,
OffsetSpec),
Consumer1 = Consumer#consumer{log = Segment},
+ #consumer{credit = Crdt,
+ send_limit = SndLmt,
+ configuration = #consumer_configuration{counters = ConsumerCounters}} = Consumer1,
+
+ rabbit_log:debug("Dispatching to subscription ~tp (stream ~tp), "
+ "credit(s) ~tp, send limit ~tp",
+ [SubscriptionId,
+ Stream,
+ Crdt,
+ SndLmt]),
+
+ ConsumedMessagesBefore = messages_consumed(ConsumerCounters),
+
Consumer2 =
case send_chunks(DeliverVersion,
Transport,
@@ -2554,17 +2573,16 @@ handle_frame_post_auth(Transport,
{ok, Csmr} ->
Csmr
end,
- #consumer{configuration =
- #consumer_configuration{counters =
- ConsumerCounters},
- log = Log2} =
- Consumer2,
+ #consumer{log = Log2} = Consumer2,
ConsumerOffset = osiris_log:next_offset(Log2),
- rabbit_log:debug("Subscription ~tp is now at offset ~tp with ~tp "
+ ConsumedMessagesAfter = messages_consumed(ConsumerCounters),
+ rabbit_log:debug("Subscription ~tp (stream ~tp) is now at offset ~tp with ~tp "
"message(s) distributed after subscription",
- [SubscriptionId, ConsumerOffset,
- messages_consumed(ConsumerCounters)]),
+ [SubscriptionId,
+ Stream,
+ ConsumerOffset,
+ ConsumedMessagesAfter - ConsumedMessagesBefore]),
Consumers#{SubscriptionId => Consumer2};
#{SubscriptionId :=