summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-10-07 14:31:09 +0100
committermergify-bot <noreply@mergify.io>2021-10-12 12:44:01 +0000
commit9377b1fddce405f3b2dc6565e20458614b70d8ce (patch)
treeaa9e0174fceb69a9c2e665af69f90c64bf8afca2
parent5865cd02a124712b1d34485d703b739e7122fe90 (diff)
downloadrabbitmq-server-git-9377b1fddce405f3b2dc6565e20458614b70d8ce.tar.gz
Stream queue: use local pid for offset listenersmergify/bp/v3.9.x/pr-3550
When a consumer reaches the end of a stream it need to register an offset listener with the local stream member so that it can be notified when new stream messages are committed. The stream queue implementation for some reason registered offset listeners with the leader, not the local member. (cherry picked from commit db3944cfc4eb80f0ee261f41b4fdaf331399f186)
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl37
1 files changed, 17 insertions, 20 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index 20166bfa9c..14e83d85d3 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -274,7 +274,8 @@ begin_stream(#stream_client{name = QName, readers = Readers0} = State0,
Actions = [],
%% TODO: we need to monitor the local pid in case the stream is
%% restarted
- {ok, State#stream_client{readers = Readers0#{Tag => Str0}}, Actions}
+ {ok, State#stream_client{local_pid = LocalPid,
+ readers = Readers0#{Tag => Str0}}, Actions}
end.
cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
@@ -290,11 +291,11 @@ cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
credit(CTag, Credit, Drain, #stream_client{readers = Readers0,
name = Name,
- leader = Leader} = State) ->
+ local_pid = LocalPid} = State) ->
{Readers1, Msgs} = case Readers0 of
#{CTag := #stream{credit = Credit0} = Str0} ->
Str1 = Str0#stream{credit = Credit0 + Credit},
- {Str, Msgs0} = stream_entries(Name, Leader, Str1),
+ {Str, Msgs0} = stream_entries(Name, LocalPid, Str1),
{Readers0#{CTag => Str}, Msgs0};
_ ->
{Readers0, []}
@@ -379,19 +380,15 @@ handle_event({osiris_written, From, _WriterId, Corrs},
{ok, State#stream_client{correlation = Correlation,
slow = Slow}, [{settled, From, MsgIds}]};
handle_event({osiris_offset, _From, _Offs},
- State = #stream_client{leader = Leader,
+ State = #stream_client{local_pid = LocalPid,
readers = Readers0,
name = Name}) ->
%% offset isn't actually needed as we use the atomic to read the
%% current committed
{Readers, TagMsgs} = maps:fold(
fun (Tag, Str0, {Acc, TM}) ->
- {Str, Msgs} = stream_entries(Name, Leader, Str0),
- %% HACK for now, better to just return but
- %% tricky with acks credits
- %% that also evaluate the stream
- % gen_server:cast(self(), {stream_delivery, Tag, Msgs}),
- {Acc#{Tag => Str}, [{Tag, Leader, Msgs} | TM]}
+ {Str, Msgs} = stream_entries(Name, LocalPid, Str0),
+ {Acc#{Tag => Str}, [{Tag, LocalPid, Msgs} | TM]}
end, {#{}, []}, Readers0),
Ack = true,
Deliveries = [{deliver, Tag, Ack, OffsetMsg}
@@ -414,13 +411,13 @@ recover(_VHost, Queues) ->
end, {[], []}, Queues).
settle(complete, CTag, MsgIds, #stream_client{readers = Readers0,
- name = Name,
- leader = Leader} = State) ->
+ local_pid = LocalPid,
+ name = Name} = State) ->
Credit = length(MsgIds),
{Readers, Msgs} = case Readers0 of
#{CTag := #stream{credit = Credit0} = Str0} ->
Str1 = Str0#stream{credit = Credit0 + Credit},
- {Str, Msgs0} = stream_entries(Name, Leader, Str1),
+ {Str, Msgs0} = stream_entries(Name, LocalPid, Str1),
{Readers0#{CTag => Str}, Msgs0};
_ ->
{Readers0, []}
@@ -848,10 +845,10 @@ check_queue_exists_in_local_node(Q) ->
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
-stream_entries(Name, Id, Str) ->
- stream_entries(Name, Id, Str, []).
+stream_entries(Name, LocalPid, Str) ->
+ stream_entries(Name, LocalPid, Str, []).
-stream_entries(Name, LeaderPid,
+stream_entries(Name, LocalPid,
#stream{name = QName,
credit = Credit,
start_offset = StartOffs,
@@ -863,7 +860,7 @@ stream_entries(Name, LeaderPid,
NextOffset = osiris_log:next_offset(Seg),
case NextOffset > LOffs of
true ->
- osiris:register_offset_listener(LeaderPid, NextOffset),
+ osiris:register_offset_listener(LocalPid, NextOffset),
{Str0#stream{log = Seg,
listening_offset = NextOffset}, MsgIn};
false ->
@@ -877,7 +874,7 @@ stream_entries(Name, LeaderPid,
Msg0 = binary_to_msg(QName, B),
Msg = rabbit_basic:add_header(<<"x-stream-offset">>,
long, O, Msg0),
- {Name, LeaderPid, O, false, Msg}
+ {Name, LocalPid, O, false, Msg}
end || {O, B} <- Records,
O >= StartOffs],
@@ -892,10 +889,10 @@ stream_entries(Name, LeaderPid,
false ->
%% if there are fewer Msgs than Entries0 it means there were non-events
%% in the log and we should recurse and try again
- stream_entries(Name, LeaderPid, Str, MsgIn ++ Msgs)
+ stream_entries(Name, LocalPid, Str, MsgIn ++ Msgs)
end
end;
-stream_entries(_Name, _Id, Str, Msgs) ->
+stream_entries(_Name, _LocalPid, Str, Msgs) ->
{Str, Msgs}.
binary_to_msg(#resource{virtual_host = VHost,