summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-12-03 11:19:34 +0100
committerGitHub <noreply@github.com>2021-12-03 11:19:34 +0100
commit08e7244827ef18d55f471fc32d9aaeba53e44d34 (patch)
treeeb75c8b0f2f4e6918667bad4ee16f2fc97e935cf
parented045df05f89ed6c00c9657cf17817a419b4475a (diff)
parent3dcb04d82b1400fd103739440ec1cb8d5b05b588 (diff)
downloadrabbitmq-server-git-08e7244827ef18d55f471fc32d9aaeba53e44d34.tar.gz
Merge pull request #3845 from rabbitmq/mergify/bp/v3.9.x/pr-3842
Create configuration nested record in stream consumer record (backport #3842)
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl129
1 files changed, 79 insertions, 50 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 8bc685cffe..12d4352603 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -32,16 +32,17 @@
reference :: undefined | publisher_reference(),
leader :: pid(),
message_counters :: atomics:atomics_ref()}).
--record(consumer,
+-record(consumer_configuration,
{socket :: rabbit_net:socket(), %% ranch_transport:socket(),
member_pid :: pid(),
- offset :: osiris:offset(),
subscription_id :: subscription_id(),
- segment :: osiris_log:state(),
- credit :: integer(),
stream :: stream(),
+ offset :: osiris:offset(),
counters :: atomics:atomics_ref(),
properties :: map()}).
+-record(consumer,
+ {configuration :: #consumer_configuration{}, credit :: integer(),
+ log :: osiris_log:state()}).
-record(stream_connection_state,
{data :: rabbit_stream_core:state(), blocked :: boolean(),
consumers :: #{subscription_id() => #consumer{}}}).
@@ -640,7 +641,7 @@ augment_infos_with_user_provided_connection_name(Infos,
close(Transport, S,
#stream_connection_state{consumers = Consumers}) ->
[osiris_log:close(Log)
- || #consumer{segment = Log} <- maps:values(Consumers)],
+ || #consumer{log = Log} <- maps:values(Consumers)],
Transport:shutdown(S, write),
Transport:close(S).
@@ -1014,11 +1015,10 @@ open(cast,
[Reason]),
%% likely a connection problem
Consumer;
- {{segment, Segment1},
+ {{segment, Log1},
{credit, Credit1}} ->
- Consumer#consumer{segment
- =
- Segment1,
+ Consumer#consumer{log =
+ Log1,
credit
=
Credit1}
@@ -1792,26 +1792,32 @@ handle_frame_post_auth(Transport,
#{transport => ConnTransport,
chunk_selector =>
get_chunk_selector(Properties)},
- {ok, Segment} =
+ {ok, Log} =
osiris:init_reader(LocalMemberPid,
OffsetSpec,
CounterSpec,
Options),
rabbit_log:debug("Next offset for subscription ~p is ~p",
[SubscriptionId,
- osiris_log:next_offset(Segment)]),
+ osiris_log:next_offset(Log)]),
ConsumerCounters =
atomics:new(2, [{signed, false}]),
+ ConsumerConfiguration =
+ #consumer_configuration{member_pid =
+ LocalMemberPid,
+ subscription_id =
+ SubscriptionId,
+ socket = Socket,
+ stream = Stream,
+ offset = OffsetSpec,
+ counters =
+ ConsumerCounters,
+ properties =
+ Properties},
ConsumerState =
- #consumer{member_pid = LocalMemberPid,
- offset = OffsetSpec,
- subscription_id = SubscriptionId,
- socket = Socket,
- segment = Segment,
- credit = Credit,
- stream = Stream,
- counters = ConsumerCounters,
- properties = Properties},
+ #consumer{configuration = ConsumerConfiguration,
+ log = Log,
+ credit = Credit},
Connection1 =
maybe_monitor_stream(LocalMemberPid, Stream,
@@ -1833,10 +1839,9 @@ handle_frame_post_auth(Transport,
"peer",
[]),
throw({stop, normal});
- {{segment, Segment1}, {credit, Credit1}} ->
+ {{segment, Log1}, {credit, Credit1}} ->
ConsumerState1 =
- ConsumerState#consumer{segment =
- Segment1,
+ ConsumerState#consumer{log = Log1,
credit =
Credit1},
Consumers1 =
@@ -1854,11 +1859,14 @@ handle_frame_post_auth(Transport,
[SubscriptionId]}
end,
- #consumer{counters = ConsumerCounters1} =
+ #consumer{configuration =
+ #consumer_configuration{counters
+ =
+ ConsumerCounters1}} =
ConsumerState1,
ConsumerOffset =
- osiris_log:next_offset(Segment1),
+ osiris_log:next_offset(Log1),
ConsumerOffsetLag =
consumer_i(offset_lag, ConsumerState1),
@@ -1915,9 +1923,8 @@ handle_frame_post_auth(Transport,
"peer",
[]),
throw({stop, normal});
- {{segment, Segment1}, {credit, Credit1}} ->
- Consumer1 =
- Consumer#consumer{segment = Segment1, credit = Credit1},
+ {{segment, Log1}, {credit, Credit1}} ->
+ Consumer1 = Consumer#consumer{log = Log1, credit = Credit1},
{Connection,
State#stream_connection_state{consumers =
Consumers#{SubscriptionId
@@ -2403,7 +2410,9 @@ notify_connection_closed(#statem_data{connection =
rabbit_core_metrics:connection_closed(self()),
[rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(S, Connection), SubId)
- || #consumer{stream = S, subscription_id = SubId}
+ || #consumer{configuration =
+ #consumer_configuration{stream = S,
+ subscription_id = SubId}}
<- maps:values(Consumers)],
[rabbit_stream_metrics:publisher_deleted(self(),
stream_r(S, Connection), PubId)
@@ -2546,7 +2555,8 @@ remove_subscription(SubscriptionId,
Connection,
#stream_connection_state{consumers = Consumers} = State) ->
#{SubscriptionId := Consumer} = Consumers,
- Stream = Consumer#consumer.stream,
+ Stream =
+ Consumer#consumer.configuration#consumer_configuration.stream,
#{Stream := SubscriptionsForThisStream} = StreamSubscriptions,
SubscriptionsForThisStream1 =
lists:delete(SubscriptionId, SubscriptionsForThisStream),
@@ -2657,9 +2667,11 @@ subscription_exists(StreamSubscriptions, SubscriptionId) ->
lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds).
send_file_callback(Transport,
- #consumer{socket = S,
- subscription_id = SubscriptionId,
- counters = Counters},
+ #consumer{configuration =
+ #consumer_configuration{socket = S,
+ subscription_id =
+ SubscriptionId,
+ counters = Counters}},
Counter) ->
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
Size) ->
@@ -2679,13 +2691,13 @@ send_file_callback(Transport,
send_chunks(Transport, #consumer{credit = Credit} = State, Counter) ->
send_chunks(Transport, State, Credit, Counter).
-send_chunks(_Transport, #consumer{segment = Segment}, 0, _Counter) ->
- {{segment, Segment}, {credit, 0}};
+send_chunks(_Transport, #consumer{log = Log}, 0, _Counter) ->
+ {{segment, Log}, {credit, 0}};
send_chunks(Transport,
- #consumer{segment = Segment} = State,
+ #consumer{log = Log} = State,
Credit,
Counter) ->
- send_chunks(Transport, State, Segment, Credit, true, Counter).
+ send_chunks(Transport, State, Log, Credit, true, Counter).
send_chunks(_Transport,
_State,
@@ -2695,7 +2707,8 @@ send_chunks(_Transport,
_Counter) ->
{{segment, Segment}, {credit, 0}};
send_chunks(Transport,
- #consumer{socket = S} = State,
+ #consumer{configuration = #consumer_configuration{socket = S}} =
+ State,
Segment,
Credit,
Retry,
@@ -2722,7 +2735,10 @@ send_chunks(Transport,
false,
Counter);
false ->
- #consumer{member_pid = LocalMember} = State,
+ #consumer{configuration =
+ #consumer_configuration{member_pid =
+ LocalMember}} =
+ State,
osiris:register_offset_listener(LocalMember,
osiris_log:next_offset(Segment1)),
{{segment, Segment1}, {credit, Credit}}
@@ -2748,11 +2764,12 @@ emit_stats(#stream_connection{publishers = Publishers} = Connection,
consumer_offset(Counters),
consumer_i(offset_lag, Consumer),
Properties)
- || #consumer{stream = S,
- subscription_id = Id,
- credit = Credit,
- counters = Counters,
- properties = Properties} =
+ || #consumer{configuration =
+ #consumer_configuration{stream = S,
+ subscription_id = Id,
+ counters = Counters,
+ properties = Properties},
+ credit = Credit} =
Consumer
<- maps:values(Consumers)],
[rabbit_stream_metrics:publisher_updated(self(),
@@ -2799,22 +2816,34 @@ consumers_infos(Items,
[[{Item, consumer_i(Item, Consumer)} || Item <- Items]
|| Consumer <- maps:values(Consumers)].
-consumer_i(subscription_id, #consumer{subscription_id = SubId}) ->
+consumer_i(subscription_id,
+ #consumer{configuration =
+ #consumer_configuration{subscription_id = SubId}}) ->
SubId;
consumer_i(credits, #consumer{credit = Credits}) ->
Credits;
-consumer_i(messages_consumed, #consumer{counters = Counters}) ->
+consumer_i(messages_consumed,
+ #consumer{configuration =
+ #consumer_configuration{counters = Counters}}) ->
messages_consumed(Counters);
-consumer_i(offset, #consumer{counters = Counters}) ->
+consumer_i(offset,
+ #consumer{configuration =
+ #consumer_configuration{counters = Counters}}) ->
consumer_offset(Counters);
consumer_i(offset_lag,
- #consumer{counters = Counters, segment = Log}) ->
+ #consumer{configuration =
+ #consumer_configuration{counters = Counters},
+ log = Log}) ->
stream_stored_offset(Log) - consumer_offset(Counters);
consumer_i(connection_pid, _) ->
self();
-consumer_i(properties, #consumer{properties = Properties}) ->
+consumer_i(properties,
+ #consumer{configuration =
+ #consumer_configuration{properties = Properties}}) ->
Properties;
-consumer_i(stream, #consumer{stream = Stream}) ->
+consumer_i(stream,
+ #consumer{configuration =
+ #consumer_configuration{stream = Stream}}) ->
Stream.
publishers_info(Pid, InfoItems) ->