summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2021-04-13 15:01:26 +0100
committerkjnilsson <knilsson@pivotal.io>2021-04-13 15:02:32 +0100
commit9bd02b351014492844b38e2691dbabb57f1863b1 (patch)
treef301888f2ec9bc091f13a5adb47b45da2289add5
parentafbedd654e1c469c36d792f4cd47ac46425840e0 (diff)
downloadrabbitmq-server-git-9bd02b351014492844b38e2691dbabb57f1863b1.tar.gz
Make streams compatible with osiris API changes
In osiris:init_reader
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl6
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl23
2 files changed, 16 insertions, 13 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index 6024986e7c..12d62c70a5 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -221,15 +221,15 @@ get_local_pid(#stream_client{stream_id = StreamId,
{undefined, State}
end.
-begin_stream(#stream_client{readers = Readers0} = State0,
+begin_stream(#stream_client{name = QName, readers = Readers0} = State0,
Q, Tag, Offset, Max) ->
{LocalPid, State} = get_local_pid(State0),
case LocalPid of
undefined ->
{error, no_local_stream_replica_available};
_ ->
-
- {ok, Seg0} = osiris:init_reader(LocalPid, Offset),
+ CounterSpec = {{?MODULE, QName, self()}, []},
+ {ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec),
NextOffset = osiris_log:next_offset(Seg0) - 1,
osiris:register_offset_listener(LocalPid, NextOffset),
%% TODO: avoid double calls to the same process
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 65ed7e0ec5..8678fb2aec 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -328,7 +328,7 @@ listen_loop_pre_auth(Transport,
State1,
Configuration);
failure ->
- close(Transport, S);
+ close(Transport, S, State);
_ ->
listen_loop_pre_auth(Transport,
Connection1,
@@ -342,7 +342,7 @@ listen_loop_pre_auth(Transport,
rabbit_log:info("Socket error ~p [~w]", [Reason, S, self()]);
M ->
rabbit_log:warning("Unknown message ~p", [M]),
- close(Transport, S)
+ close(Transport, S, State)
end.
augment_infos_with_user_provided_connection_name(Infos,
@@ -356,7 +356,9 @@ augment_infos_with_user_provided_connection_name(Infos,
Infos
end.
-close(Transport, S) ->
+close(Transport, S, #stream_connection_state{consumers = Consumers}) ->
+ [osiris_log:close(Log)
+ || #consumer{segment = Log} <- maps:values(Consumers)],
Transport:shutdown(S, write),
Transport:close(S).
@@ -390,7 +392,7 @@ listen_loop_post_auth(Transport,
#stream_connection{connection_step = Step} = Connection1,
case Step of
closing ->
- close(Transport, S),
+ close(Transport, S, State),
rabbit_networking:unregister_non_amqp_connection(self()),
notify_connection_closed(Connection1, State1);
close_sent ->
@@ -652,12 +654,12 @@ listen_loop_post_auth(Transport,
rabbit_log:info("Heartbeat send error ~p, closing connection",
[Unexpected]),
C1 = demonitor_all_streams(Connection),
- close(Transport, C1)
+ close(Transport, C1, State)
end;
heartbeat_timeout ->
rabbit_log:info("Heartbeat timeout, closing connection~n"),
C1 = demonitor_all_streams(Connection),
- close(Transport, C1);
+ close(Transport, C1, State);
{infos, From} ->
From ! {self(), ClientProperties},
listen_loop_post_auth(Transport, Connection, State, Configuration);
@@ -695,7 +697,7 @@ listen_loop_post_auth(Transport,
demonitor_all_streams(Connection),
rabbit_networking:unregister_non_amqp_connection(self()),
notify_connection_closed(Connection, State),
- close(Transport, S),
+ close(Transport, S, State),
ok;
{Closed, S} ->
demonitor_all_streams(Connection),
@@ -733,7 +735,7 @@ listen_loop_post_close(Transport,
case Step of
closing_done ->
rabbit_log:debug("Received close confirmation from client"),
- close(Transport, S),
+ close(Transport, S, State),
rabbit_networking:unregister_non_amqp_connection(self()),
notify_connection_closed(Connection1, State1);
_ ->
@@ -750,7 +752,7 @@ listen_loop_post_close(Transport,
ok;
{Error, S, Reason} ->
rabbit_log:info("Socket error ~p [~w]", [Reason, S, self()]),
- close(Transport, S),
+ close(Transport, S, State),
rabbit_networking:unregister_non_amqp_connection(self()),
notify_connection_closed(Connection, State);
M ->
@@ -1515,8 +1517,9 @@ handle_frame_post_auth(Transport,
"ion ~p",
[SubscriptionId, Stream,
OffsetSpec]),
+ CounterSpec = {{?MODULE, Stream, SubscriptionId, self()}, []},
{ok, Segment} =
- osiris:init_reader(LocalMemberPid, OffsetSpec),
+ osiris:init_reader(LocalMemberPid, OffsetSpec, CounterSpec),
rabbit_log:info("Next offset for subscription ~p is ~p",
[SubscriptionId,
osiris_log:next_offset(Segment)]),