diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-12-03 11:19:34 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-03 11:19:34 +0100 |
commit | 08e7244827ef18d55f471fc32d9aaeba53e44d34 (patch) | |
tree | eb75c8b0f2f4e6918667bad4ee16f2fc97e935cf | |
parent | ed045df05f89ed6c00c9657cf17817a419b4475a (diff) | |
parent | 3dcb04d82b1400fd103739440ec1cb8d5b05b588 (diff) | |
download | rabbitmq-server-git-08e7244827ef18d55f471fc32d9aaeba53e44d34.tar.gz |
Merge pull request #3845 from rabbitmq/mergify/bp/v3.9.x/pr-3842
Create configuration nested record in stream consumer record (backport #3842)
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 129 |
1 files changed, 79 insertions, 50 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 8bc685cffe..12d4352603 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -32,16 +32,17 @@ reference :: undefined | publisher_reference(), leader :: pid(), message_counters :: atomics:atomics_ref()}). --record(consumer, +-record(consumer_configuration, {socket :: rabbit_net:socket(), %% ranch_transport:socket(), member_pid :: pid(), - offset :: osiris:offset(), subscription_id :: subscription_id(), - segment :: osiris_log:state(), - credit :: integer(), stream :: stream(), + offset :: osiris:offset(), counters :: atomics:atomics_ref(), properties :: map()}). +-record(consumer, + {configuration :: #consumer_configuration{}, credit :: integer(), + log :: osiris_log:state()}). -record(stream_connection_state, {data :: rabbit_stream_core:state(), blocked :: boolean(), consumers :: #{subscription_id() => #consumer{}}}). @@ -640,7 +641,7 @@ augment_infos_with_user_provided_connection_name(Infos, close(Transport, S, #stream_connection_state{consumers = Consumers}) -> [osiris_log:close(Log) - || #consumer{segment = Log} <- maps:values(Consumers)], + || #consumer{log = Log} <- maps:values(Consumers)], Transport:shutdown(S, write), Transport:close(S). @@ -1014,11 +1015,10 @@ open(cast, [Reason]), %% likely a connection problem Consumer; - {{segment, Segment1}, + {{segment, Log1}, {credit, Credit1}} -> - Consumer#consumer{segment - = - Segment1, + Consumer#consumer{log = + Log1, credit = Credit1} @@ -1792,26 +1792,32 @@ handle_frame_post_auth(Transport, #{transport => ConnTransport, chunk_selector => get_chunk_selector(Properties)}, - {ok, Segment} = + {ok, Log} = osiris:init_reader(LocalMemberPid, OffsetSpec, CounterSpec, Options), rabbit_log:debug("Next offset for subscription ~p is ~p", [SubscriptionId, - osiris_log:next_offset(Segment)]), + osiris_log:next_offset(Log)]), ConsumerCounters = atomics:new(2, [{signed, false}]), + ConsumerConfiguration = + #consumer_configuration{member_pid = + LocalMemberPid, + subscription_id = + SubscriptionId, + socket = Socket, + stream = Stream, + offset = OffsetSpec, + counters = + ConsumerCounters, + properties = + Properties}, ConsumerState = - #consumer{member_pid = LocalMemberPid, - offset = OffsetSpec, - subscription_id = SubscriptionId, - socket = Socket, - segment = Segment, - credit = Credit, - stream = Stream, - counters = ConsumerCounters, - properties = Properties}, + #consumer{configuration = ConsumerConfiguration, + log = Log, + credit = Credit}, Connection1 = maybe_monitor_stream(LocalMemberPid, Stream, @@ -1833,10 +1839,9 @@ handle_frame_post_auth(Transport, "peer", []), throw({stop, normal}); - {{segment, Segment1}, {credit, Credit1}} -> + {{segment, Log1}, {credit, Credit1}} -> ConsumerState1 = - ConsumerState#consumer{segment = - Segment1, + ConsumerState#consumer{log = Log1, credit = Credit1}, Consumers1 = @@ -1854,11 +1859,14 @@ handle_frame_post_auth(Transport, [SubscriptionId]} end, - #consumer{counters = ConsumerCounters1} = + #consumer{configuration = + #consumer_configuration{counters + = + ConsumerCounters1}} = ConsumerState1, ConsumerOffset = - osiris_log:next_offset(Segment1), + osiris_log:next_offset(Log1), ConsumerOffsetLag = consumer_i(offset_lag, ConsumerState1), @@ -1915,9 +1923,8 @@ handle_frame_post_auth(Transport, "peer", []), throw({stop, normal}); - {{segment, Segment1}, {credit, Credit1}} -> - Consumer1 = - Consumer#consumer{segment = Segment1, credit = Credit1}, + {{segment, Log1}, {credit, Credit1}} -> + Consumer1 = Consumer#consumer{log = Log1, credit = Credit1}, {Connection, State#stream_connection_state{consumers = Consumers#{SubscriptionId @@ -2403,7 +2410,9 @@ notify_connection_closed(#statem_data{connection = rabbit_core_metrics:connection_closed(self()), [rabbit_stream_metrics:consumer_cancelled(self(), stream_r(S, Connection), SubId) - || #consumer{stream = S, subscription_id = SubId} + || #consumer{configuration = + #consumer_configuration{stream = S, + subscription_id = SubId}} <- maps:values(Consumers)], [rabbit_stream_metrics:publisher_deleted(self(), stream_r(S, Connection), PubId) @@ -2546,7 +2555,8 @@ remove_subscription(SubscriptionId, Connection, #stream_connection_state{consumers = Consumers} = State) -> #{SubscriptionId := Consumer} = Consumers, - Stream = Consumer#consumer.stream, + Stream = + Consumer#consumer.configuration#consumer_configuration.stream, #{Stream := SubscriptionsForThisStream} = StreamSubscriptions, SubscriptionsForThisStream1 = lists:delete(SubscriptionId, SubscriptionsForThisStream), @@ -2657,9 +2667,11 @@ subscription_exists(StreamSubscriptions, SubscriptionId) -> lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds). send_file_callback(Transport, - #consumer{socket = S, - subscription_id = SubscriptionId, - counters = Counters}, + #consumer{configuration = + #consumer_configuration{socket = S, + subscription_id = + SubscriptionId, + counters = Counters}}, Counter) -> fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries}, Size) -> @@ -2679,13 +2691,13 @@ send_file_callback(Transport, send_chunks(Transport, #consumer{credit = Credit} = State, Counter) -> send_chunks(Transport, State, Credit, Counter). -send_chunks(_Transport, #consumer{segment = Segment}, 0, _Counter) -> - {{segment, Segment}, {credit, 0}}; +send_chunks(_Transport, #consumer{log = Log}, 0, _Counter) -> + {{segment, Log}, {credit, 0}}; send_chunks(Transport, - #consumer{segment = Segment} = State, + #consumer{log = Log} = State, Credit, Counter) -> - send_chunks(Transport, State, Segment, Credit, true, Counter). + send_chunks(Transport, State, Log, Credit, true, Counter). send_chunks(_Transport, _State, @@ -2695,7 +2707,8 @@ send_chunks(_Transport, _Counter) -> {{segment, Segment}, {credit, 0}}; send_chunks(Transport, - #consumer{socket = S} = State, + #consumer{configuration = #consumer_configuration{socket = S}} = + State, Segment, Credit, Retry, @@ -2722,7 +2735,10 @@ send_chunks(Transport, false, Counter); false -> - #consumer{member_pid = LocalMember} = State, + #consumer{configuration = + #consumer_configuration{member_pid = + LocalMember}} = + State, osiris:register_offset_listener(LocalMember, osiris_log:next_offset(Segment1)), {{segment, Segment1}, {credit, Credit}} @@ -2748,11 +2764,12 @@ emit_stats(#stream_connection{publishers = Publishers} = Connection, consumer_offset(Counters), consumer_i(offset_lag, Consumer), Properties) - || #consumer{stream = S, - subscription_id = Id, - credit = Credit, - counters = Counters, - properties = Properties} = + || #consumer{configuration = + #consumer_configuration{stream = S, + subscription_id = Id, + counters = Counters, + properties = Properties}, + credit = Credit} = Consumer <- maps:values(Consumers)], [rabbit_stream_metrics:publisher_updated(self(), @@ -2799,22 +2816,34 @@ consumers_infos(Items, [[{Item, consumer_i(Item, Consumer)} || Item <- Items] || Consumer <- maps:values(Consumers)]. -consumer_i(subscription_id, #consumer{subscription_id = SubId}) -> +consumer_i(subscription_id, + #consumer{configuration = + #consumer_configuration{subscription_id = SubId}}) -> SubId; consumer_i(credits, #consumer{credit = Credits}) -> Credits; -consumer_i(messages_consumed, #consumer{counters = Counters}) -> +consumer_i(messages_consumed, + #consumer{configuration = + #consumer_configuration{counters = Counters}}) -> messages_consumed(Counters); -consumer_i(offset, #consumer{counters = Counters}) -> +consumer_i(offset, + #consumer{configuration = + #consumer_configuration{counters = Counters}}) -> consumer_offset(Counters); consumer_i(offset_lag, - #consumer{counters = Counters, segment = Log}) -> + #consumer{configuration = + #consumer_configuration{counters = Counters}, + log = Log}) -> stream_stored_offset(Log) - consumer_offset(Counters); consumer_i(connection_pid, _) -> self(); -consumer_i(properties, #consumer{properties = Properties}) -> +consumer_i(properties, + #consumer{configuration = + #consumer_configuration{properties = Properties}}) -> Properties; -consumer_i(stream, #consumer{stream = Stream}) -> +consumer_i(stream, + #consumer{configuration = + #consumer_configuration{stream = Stream}}) -> Stream. publishers_info(Pid, InfoItems) -> |