diff options
author | kjnilsson <knilsson@pivotal.io> | 2021-04-13 15:01:26 +0100 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2021-04-13 15:02:32 +0100 |
commit | 9bd02b351014492844b38e2691dbabb57f1863b1 (patch) | |
tree | f301888f2ec9bc091f13a5adb47b45da2289add5 | |
parent | afbedd654e1c469c36d792f4cd47ac46425840e0 (diff) | |
download | rabbitmq-server-git-9bd02b351014492844b38e2691dbabb57f1863b1.tar.gz |
Make streams compatible with osiris API changes
In osiris:init_reader
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 6 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 23 |
2 files changed, 16 insertions, 13 deletions
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 6024986e7c..12d62c70a5 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -221,15 +221,15 @@ get_local_pid(#stream_client{stream_id = StreamId, {undefined, State} end. -begin_stream(#stream_client{readers = Readers0} = State0, +begin_stream(#stream_client{name = QName, readers = Readers0} = State0, Q, Tag, Offset, Max) -> {LocalPid, State} = get_local_pid(State0), case LocalPid of undefined -> {error, no_local_stream_replica_available}; _ -> - - {ok, Seg0} = osiris:init_reader(LocalPid, Offset), + CounterSpec = {{?MODULE, QName, self()}, []}, + {ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec), NextOffset = osiris_log:next_offset(Seg0) - 1, osiris:register_offset_listener(LocalPid, NextOffset), %% TODO: avoid double calls to the same process diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 65ed7e0ec5..8678fb2aec 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -328,7 +328,7 @@ listen_loop_pre_auth(Transport, State1, Configuration); failure -> - close(Transport, S); + close(Transport, S, State); _ -> listen_loop_pre_auth(Transport, Connection1, @@ -342,7 +342,7 @@ listen_loop_pre_auth(Transport, rabbit_log:info("Socket error ~p [~w]", [Reason, S, self()]); M -> rabbit_log:warning("Unknown message ~p", [M]), - close(Transport, S) + close(Transport, S, State) end. augment_infos_with_user_provided_connection_name(Infos, @@ -356,7 +356,9 @@ augment_infos_with_user_provided_connection_name(Infos, Infos end. -close(Transport, S) -> +close(Transport, S, #stream_connection_state{consumers = Consumers}) -> + [osiris_log:close(Log) + || #consumer{segment = Log} <- maps:values(Consumers)], Transport:shutdown(S, write), Transport:close(S). @@ -390,7 +392,7 @@ listen_loop_post_auth(Transport, #stream_connection{connection_step = Step} = Connection1, case Step of closing -> - close(Transport, S), + close(Transport, S, State), rabbit_networking:unregister_non_amqp_connection(self()), notify_connection_closed(Connection1, State1); close_sent -> @@ -652,12 +654,12 @@ listen_loop_post_auth(Transport, rabbit_log:info("Heartbeat send error ~p, closing connection", [Unexpected]), C1 = demonitor_all_streams(Connection), - close(Transport, C1) + close(Transport, C1, State) end; heartbeat_timeout -> rabbit_log:info("Heartbeat timeout, closing connection~n"), C1 = demonitor_all_streams(Connection), - close(Transport, C1); + close(Transport, C1, State); {infos, From} -> From ! {self(), ClientProperties}, listen_loop_post_auth(Transport, Connection, State, Configuration); @@ -695,7 +697,7 @@ listen_loop_post_auth(Transport, demonitor_all_streams(Connection), rabbit_networking:unregister_non_amqp_connection(self()), notify_connection_closed(Connection, State), - close(Transport, S), + close(Transport, S, State), ok; {Closed, S} -> demonitor_all_streams(Connection), @@ -733,7 +735,7 @@ listen_loop_post_close(Transport, case Step of closing_done -> rabbit_log:debug("Received close confirmation from client"), - close(Transport, S), + close(Transport, S, State), rabbit_networking:unregister_non_amqp_connection(self()), notify_connection_closed(Connection1, State1); _ -> @@ -750,7 +752,7 @@ listen_loop_post_close(Transport, ok; {Error, S, Reason} -> rabbit_log:info("Socket error ~p [~w]", [Reason, S, self()]), - close(Transport, S), + close(Transport, S, State), rabbit_networking:unregister_non_amqp_connection(self()), notify_connection_closed(Connection, State); M -> @@ -1515,8 +1517,9 @@ handle_frame_post_auth(Transport, "ion ~p", [SubscriptionId, Stream, OffsetSpec]), + CounterSpec = {{?MODULE, Stream, SubscriptionId, self()}, []}, {ok, Segment} = - osiris:init_reader(LocalMemberPid, OffsetSpec), + osiris:init_reader(LocalMemberPid, OffsetSpec, CounterSpec), rabbit_log:info("Next offset for subscription ~p is ~p", [SubscriptionId, osiris_log:next_offset(Segment)]), |