summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGabriele Santomaggio <G.santomaggio@gmail.com>2021-10-12 15:35:02 +0200
committerGitHub <noreply@github.com>2021-10-12 15:35:02 +0200
commit04c9aa24a82d0309ae06c3e838cd9afac87bb3db (patch)
treeaa9e0174fceb69a9c2e665af69f90c64bf8afca2
parent5865cd02a124712b1d34485d703b739e7122fe90 (diff)
parent9377b1fddce405f3b2dc6565e20458614b70d8ce (diff)
downloadrabbitmq-server-git-04c9aa24a82d0309ae06c3e838cd9afac87bb3db.tar.gz
Merge pull request #3559 from rabbitmq/mergify/bp/v3.9.x/pr-3550
Stream queue: use local pid for offset listeners (backport #3550)
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl37
1 files changed, 17 insertions, 20 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index 20166bfa9c..14e83d85d3 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -274,7 +274,8 @@ begin_stream(#stream_client{name = QName, readers = Readers0} = State0,
Actions = [],
%% TODO: we need to monitor the local pid in case the stream is
%% restarted
- {ok, State#stream_client{readers = Readers0#{Tag => Str0}}, Actions}
+ {ok, State#stream_client{local_pid = LocalPid,
+ readers = Readers0#{Tag => Str0}}, Actions}
end.
cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
@@ -290,11 +291,11 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
credit(CTag, Credit, Drain, #stream_client{readers = Readers0,
name = Name,
- leader = Leader} = State) ->
+ local_pid = LocalPid} = State) ->
{Readers1, Msgs} = case Readers0 of
#{CTag := #stream{credit = Credit0} = Str0} ->
Str1 = Str0#stream{credit = Credit0 + Credit},
- {Str, Msgs0} = stream_entries(Name, Leader, Str1),
+ {Str, Msgs0} = stream_entries(Name, LocalPid, Str1),
{Readers0#{CTag => Str}, Msgs0};
_ ->
{Readers0, []}
@@ -379,19 +380,15 @@ handle_event({osiris_written, From, _WriterId, Corrs},
{ok, State#stream_client{correlation = Correlation,
slow = Slow}, [{settled, From, MsgIds}]};
handle_event({osiris_offset, _From, _Offs},
- State = #stream_client{leader = Leader,
+ State = #stream_client{local_pid = LocalPid,
readers = Readers0,
name = Name}) ->
%% offset isn't actually needed as we use the atomic to read the
%% current committed
{Readers, TagMsgs} = maps:fold(
fun (Tag, Str0, {Acc, TM}) ->
- {Str, Msgs} = stream_entries(Name, Leader, Str0),
- %% HACK for now, better to just return but
- %% tricky with acks credits
- %% that also evaluate the stream
- % gen_server:cast(self(), {stream_delivery, Tag, Msgs}),
- {Acc#{Tag => Str}, [{Tag, Leader, Msgs} | TM]}
+ {Str, Msgs} = stream_entries(Name, LocalPid, Str0),
+ {Acc#{Tag => Str}, [{Tag, LocalPid, Msgs} | TM]}
end, {#{}, []}, Readers0),
Ack = true,
Deliveries = [{deliver, Tag, Ack, OffsetMsg}
@@ -414,13 +411,13 @@ recover(_VHost, Queues) ->
end, {[], []}, Queues).
settle(complete, CTag, MsgIds, #stream_client{readers = Readers0,
- name = Name,
- leader = Leader} = State) ->
+ local_pid = LocalPid,
+ name = Name} = State) ->
Credit = length(MsgIds),
{Readers, Msgs} = case Readers0 of
#{CTag := #stream{credit = Credit0} = Str0} ->
Str1 = Str0#stream{credit = Credit0 + Credit},
- {Str, Msgs0} = stream_entries(Name, Leader, Str1),
+ {Str, Msgs0} = stream_entries(Name, LocalPid, Str1),
{Readers0#{CTag => Str}, Msgs0};
_ ->
{Readers0, []}
@@ -848,10 +845,10 @@ check_queue_exists_in_local_node(Q) ->
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
-stream_entries(Name, Id, Str) ->
- stream_entries(Name, Id, Str, []).
+stream_entries(Name, LocalPid, Str) ->
+ stream_entries(Name, LocalPid, Str, []).
-stream_entries(Name, LeaderPid,
+stream_entries(Name, LocalPid,
#stream{name = QName,
credit = Credit,
start_offset = StartOffs,
@@ -863,7 +860,7 @@ stream_entries(Name, LeaderPid,
NextOffset = osiris_log:next_offset(Seg),
case NextOffset > LOffs of
true ->
- osiris:register_offset_listener(LeaderPid, NextOffset),
+ osiris:register_offset_listener(LocalPid, NextOffset),
{Str0#stream{log = Seg,
listening_offset = NextOffset}, MsgIn};
false ->
@@ -877,7 +874,7 @@ stream_entries(Name, LeaderPid,
Msg0 = binary_to_msg(QName, B),
Msg = rabbit_basic:add_header(<<"x-stream-offset">>,
long, O, Msg0),
- {Name, LeaderPid, O, false, Msg}
+ {Name, LocalPid, O, false, Msg}
end || {O, B} <- Records,
O >= StartOffs],
@@ -892,10 +889,10 @@ stream_entries(Name, LeaderPid,
false ->
%% if there are fewer Msgs than Entries0 it means there were non-events
%% in the log and we should recurse and try again
- stream_entries(Name, LeaderPid, Str, MsgIn ++ Msgs)
+ stream_entries(Name, LocalPid, Str, MsgIn ++ Msgs)
end
end;
-stream_entries(_Name, _Id, Str, Msgs) ->
+stream_entries(_Name, _LocalPid, Str, Msgs) ->
{Str, Msgs}.
binary_to_msg(#resource{virtual_host = VHost,