diff options
Diffstat (limited to 'deps/rabbitmq_stream/src')
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_manager.erl | 36 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 62 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_utils.erl | 4 |
3 files changed, 50 insertions, 52 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index ced9f35b8b..1f2b5a5c97 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -185,19 +185,19 @@ handle_call({create, VirtualHost, Reference, Arguments, Username}, {reply, {error, reference_already_exists}, State}; {error, Err} -> - rabbit_log:warning("Error while creating ~p stream, ~p~n", + rabbit_log:warning("Error while creating ~p stream, ~p", [Reference, Err]), {reply, {error, internal_error}, State} end catch exit:Error -> - rabbit_log:info("Error while creating ~p stream, ~p~n", - [Reference, Error]), + rabbit_log:error("Error while creating ~p stream, ~p", + [Reference, Error]), {reply, {error, internal_error}, State} end; {error, {absent, _, Reason}} -> - rabbit_log:warning("Error while creating ~p stream, ~p~n", - [Reference, Reason]), + rabbit_log:error("Error while creating ~p stream, ~p", + [Reference, Reason]), {reply, {error, internal_error}, State} end; error -> @@ -209,27 +209,25 @@ handle_call({delete, VirtualHost, Reference, Username}, _From, #resource{virtual_host = VirtualHost, kind = queue, name = Reference}, - rabbit_log:debug("Trying to delete stream ~p~n", [Reference]), + rabbit_log:debug("Trying to delete stream ~p", [Reference]), case rabbit_amqqueue:lookup(Name) of {ok, Q} -> - rabbit_log:debug("Found queue record ~p, checking if it is a stream~n", + rabbit_log:debug("Found queue record ~p, checking if it is a stream", [Reference]), case is_stream_queue(Q) of true -> - rabbit_log:debug("Queue record ~p is a stream, trying to delete " - "it~n", + rabbit_log:debug("Queue record ~p is a stream, trying to delete it", [Reference]), - {ok, _} = - rabbit_stream_queue:delete(Q, false, false, Username), - rabbit_log:debug("Stream ~p deleted~n", [Reference]), + {ok, _} = rabbit_stream_queue:delete(Q, false, false, Username), + rabbit_log:debug("Stream ~p deleted", [Reference]), {reply, {ok, deleted}, State}; _ -> - rabbit_log:debug("Queue record ~p is NOT a stream, returning error~n", + rabbit_log:debug("Queue record ~p is NOT a stream, returning error", [Reference]), {reply, {error, reference_not_found}, State} end; {error, not_found} -> - rabbit_log:debug("Stream ~p not found, cannot delete it~n", + rabbit_log:debug("Stream ~p not found, cannot delete it", [Reference]), {reply, {error, reference_not_found}, State} end; @@ -361,8 +359,8 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From, end catch exit:Error -> - rabbit_log:info("Error while looking up exchange ~p, ~p~n", - [ExchangeName, Error]), + rabbit_log:error("Error while looking up exchange ~p, ~p", + [ExchangeName, Error]), {error, stream_not_found} end, {reply, Res, State}; @@ -384,8 +382,8 @@ handle_call({partitions, VirtualHost, SuperStream}, _From, State) -> [], rabbit_binding:list_for_source(ExchangeName))} catch exit:Error -> - rabbit_log:info("Error while looking up exchange ~p, ~p~n", - [ExchangeName, Error]), + rabbit_log:error("Error while looking up exchange ~p, ~p", + [ExchangeName, Error]), {error, stream_not_found} end, {reply, Res, State}; @@ -396,7 +394,7 @@ handle_cast(_, State) -> {noreply, State}. handle_info(Info, State) -> - rabbit_log:info("Received info ~p~n", [Info]), + rabbit_log:info("Received info ~p", [Info]), {noreply, State}. is_stream_queue(Q) -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 595574443b..0303a7272f 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -210,7 +210,7 @@ init([KeepaliveSup, heartbeat = Heartbeat}); {Error, Reason} -> rabbit_net:fast_close(RealSocket), - rabbit_log:warning("Closing connection because of ~p ~p~n", + rabbit_log:warning("Closing connection because of ~p ~p", [Error, Reason]) end. @@ -220,7 +220,7 @@ socket_op(Sock, Fun) -> {ok, Res} -> Res; {error, Reason} -> - rabbit_log:warning("Error during socket operation ~p~n", [Reason]), + rabbit_log:warning("Error during socket operation ~p", [Reason]), rabbit_net:fast_close(RealSocket), exit(normal) end. @@ -289,7 +289,7 @@ listen_loop_pre_auth(Transport, Data), Transport:setopts(S, [{active, once}]), #stream_connection{connection_step = ConnectionStep} = Connection1, - rabbit_log:info("Transitioned from ~p to ~p~n", + rabbit_log:info("Transitioned from ~p to ~p", [ConnectionStep0, ConnectionStep]), case ConnectionStep of authenticated -> @@ -336,12 +336,12 @@ listen_loop_pre_auth(Transport, Configuration) end; {Closed, S} -> - rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]), + rabbit_log:info("Socket ~w closed [~w]", [S, self()]), ok; {Error, S, Reason} -> - rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]); + rabbit_log:info("Socket error ~p [~w]", [Reason, S, self()]); M -> - rabbit_log:warning("Unknown message ~p~n", [M]), + rabbit_log:warning("Unknown message ~p", [M]), close(Transport, S) end. @@ -585,7 +585,7 @@ listen_loop_post_auth(Transport, {queue_event, #resource{name = StreamName}, {osiris_offset, _QueueResource, -1}}} -> rabbit_log:info("received osiris offset event for ~p with offset " - "~p~n", + "~p", [StreamName, -1]), listen_loop_post_auth(Transport, Connection, State, Configuration); {'$gen_cast', @@ -650,7 +650,7 @@ listen_loop_post_auth(Transport, State, Configuration); Unexpected -> - rabbit_log:info("Heartbeat send error ~p, closing connection~n", + rabbit_log:info("Heartbeat send error ~p, closing connection", [Unexpected]), C1 = demonitor_all_streams(Connection), close(Transport, C1) @@ -691,7 +691,7 @@ listen_loop_post_auth(Transport, {'$gen_call', From, {shutdown, Explanation}} -> % likely closing call from the management plugin gen_server:reply(From, ok), - rabbit_log:info("Forcing stream connection ~p closing: ~p~n", + rabbit_log:info("Forcing stream connection ~p closing: ~p", [self(), Explanation]), demonitor_all_streams(Connection), rabbit_networking:unregister_non_amqp_connection(self()), @@ -702,15 +702,15 @@ listen_loop_post_auth(Transport, demonitor_all_streams(Connection), rabbit_networking:unregister_non_amqp_connection(self()), notify_connection_closed(Connection, State), - rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]), + rabbit_log:warning("Socket ~w closed [~w]", [S, self()]), ok; {Error, S, Reason} -> demonitor_all_streams(Connection), rabbit_networking:unregister_non_amqp_connection(self()), notify_connection_closed(Connection, State), - rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]); + rabbit_log:error("Socket error ~p [~w]", [Reason, S, self()]); M -> - rabbit_log:warning("Unknown message ~p~n", [M]), + rabbit_log:warning("Unknown message ~p", [M]), %% FIXME send close listen_loop_post_auth(Transport, Connection, State, Configuration) end. @@ -747,15 +747,15 @@ listen_loop_post_close(Transport, {Closed, S} -> rabbit_networking:unregister_non_amqp_connection(self()), notify_connection_closed(Connection, State), - rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]), + rabbit_log:info("Socket ~w closed [~w]", [S, self()]), ok; {Error, S, Reason} -> - rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]), + rabbit_log:info("Socket error ~p [~w]", [Reason, S, self()]), close(Transport, S), rabbit_networking:unregister_non_amqp_connection(self()), notify_connection_closed(Connection, State); M -> - rabbit_log:warning("Ignored message on closing ~p~n", [M]) + rabbit_log:warning("Ignored message on closing ~p", [M]) end. handle_inbound_data_pre_auth(Transport, Connection, State, Rest) -> @@ -1037,7 +1037,7 @@ handle_frame_pre_auth(Transport, rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream), - rabbit_log:warning("User '~s' can only connect via localhost~n", + rabbit_log:warning("User '~s' can only connect via localhost", [Username]), {C1#stream_connection{connection_step = failure}, @@ -1076,7 +1076,7 @@ handle_frame_pre_auth(_Transport, FrameMax:32, Heartbeat:32>>, Rest) -> - rabbit_log:info("Tuning response ~p ~p ~n", [FrameMax, Heartbeat]), + rabbit_log:debug("Tuning response ~p ~p ", [FrameMax, Heartbeat]), Parent = self(), %% sending a message to the main process so the heartbeat frame is sent from this main process %% otherwise heartbeat frames can interleave with chunk delivery @@ -1149,7 +1149,7 @@ handle_frame_pre_auth(_Transport, rabbit_log:info("Received heartbeat frame pre auth~n"), {Connection, State, Rest}; handle_frame_pre_auth(_Transport, Connection, State, Frame, Rest) -> - rabbit_log:warning("unknown frame ~p ~p, closing connection.~n", + rabbit_log:warning("unknown frame ~p ~p, closing connection.", [Frame, Rest]), {Connection#stream_connection{connection_step = failure}, State, Rest}. @@ -1513,12 +1513,12 @@ handle_frame_post_auth(Transport, {{timestamp, Timestamp}, Crdt} end, rabbit_log:info("Creating subscription ~p to ~p, with offset specificat" - "ion ~p~n", + "ion ~p", [SubscriptionId, Stream, OffsetSpec]), {ok, Segment} = osiris:init_reader(LocalMemberPid, OffsetSpec), - rabbit_log:info("Next offset for subscription ~p is ~p~n", + rabbit_log:info("Next offset for subscription ~p is ~p", [SubscriptionId, osiris_log:next_offset(Segment)]), ConsumerCounters = @@ -1543,7 +1543,7 @@ handle_frame_post_auth(Transport, CorrelationId), rabbit_log:info("Distributing existing messages to subscription " - "~p~n", + "~p", [SubscriptionId]), {{segment, Segment1}, {credit, Credit1}} = send_chunks(Transport, ConsumerState, @@ -1571,7 +1571,7 @@ handle_frame_post_auth(Transport, ConsumerOffset = osiris_log:next_offset(Segment1), rabbit_log:info("Subscription ~p is now at offset ~p with ~p message(s) " - "distributed after subscription~n", + "distributed after subscription", [SubscriptionId, ConsumerOffset, messages_consumed(ConsumerCounters1)]), @@ -1626,7 +1626,7 @@ handle_frame_post_auth(Transport, Consumer1}}, Rest}; _ -> - rabbit_log:warning("Giving credit to unknown subscription: ~p~n", + rabbit_log:warning("Giving credit to unknown subscription: ~p", [SubscriptionId]), Frame = <<?RESPONSE:1, @@ -1663,8 +1663,8 @@ handle_frame_post_auth(_Transport, ok -> case lookup_leader(Stream, Connection) of cluster_not_found -> - rabbit_log:info("Could not find leader to commit offset on ~p~n", - [Stream]), + rabbit_log:warning("Could not find leader to commit offset on ~p", + [Stream]), %% FIXME commit offset is fire-and-forget, so no response even if error, change this? {Connection, State, Rest}; {ClusterLeader, Connection1} -> @@ -1673,7 +1673,7 @@ handle_frame_post_auth(_Transport, end; error -> %% FIXME commit offset is fire-and-forget, so no response even if error, change this? - rabbit_log:info("Not authorized to commit offset on ~p~n", + rabbit_log:info("Not authorized to commit offset on ~p", [Stream]), {Connection, State, Rest} end; @@ -1816,7 +1816,7 @@ handle_frame_post_auth(Transport, {ok, #{leader_pid := LeaderPid, replica_pids := ReturnedReplicas}} -> - rabbit_log:info("Created cluster with leader ~p and replicas ~p~n", + rabbit_log:info("Created cluster with leader ~p and replicas ~p", [LeaderPid, ReturnedReplicas]), response_ok(Transport, Connection, @@ -1981,7 +1981,7 @@ handle_frame_post_auth(Transport, {port, Port}}}, Index + 1}; _ -> - rabbit_log:warning("Error when retrieving broker metadata: ~p ~p~n", + rabbit_log:warning("Error when retrieving broker metadata: ~p ~p", [Host, Port]), {Acc, Index} end @@ -2151,7 +2151,7 @@ handle_frame_post_auth(Transport, ClosingReasonLength:16, ClosingReason:ClosingReasonLength/binary>>, _Rest) -> - rabbit_log:info("Received close command ~p ~p~n", + rabbit_log:info("Received close command ~p ~p", [ClosingCode, ClosingReason]), Frame = <<?RESPONSE:1, @@ -2170,7 +2170,7 @@ handle_frame_post_auth(_Transport, rabbit_log:info("Received heartbeat frame post auth~n"), {Connection, State, Rest}; handle_frame_post_auth(Transport, Connection, State, Frame, Rest) -> - rabbit_log:warning("unknown frame ~p ~p, sending close command.~n", + rabbit_log:warning("unknown frame ~p ~p, sending close command.", [Frame, Rest]), CloseReason = <<"unknown frame">>, CloseReasonLength = byte_size(CloseReason), @@ -2230,7 +2230,7 @@ handle_frame_post_close(_Transport, rabbit_log:info("Received heartbeat frame post close~n"), {Connection, State, Rest}; handle_frame_post_close(_Transport, Connection, State, Frame, Rest) -> - rabbit_log:warning("ignored frame on close ~p ~p.~n", [Frame, Rest]), + rabbit_log:warning("ignored frame on close ~p ~p.", [Frame, Rest]), {Connection, State, Rest}. stream_r(Stream, #stream_connection{virtual_host = VHost}) -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index 205cc9f6bb..1985c93f20 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -143,7 +143,7 @@ auth_mechanisms(Sock) -> auth_mechanism_to_module(TypeBin, Sock) -> case rabbit_registry:binary_to_type(TypeBin) of {error, not_found} -> - rabbit_log:warning("Unknown authentication mechanism '~p'~n", + rabbit_log:warning("Unknown authentication mechanism '~p'", [TypeBin]), {error, not_found}; T -> @@ -154,7 +154,7 @@ auth_mechanism_to_module(TypeBin, Sock) -> {true, {ok, Module}} -> {ok, Module}; _ -> - rabbit_log:warning("Invalid authentication mechanism '~p'~n", + rabbit_log:warning("Invalid authentication mechanism '~p'", [T]), {error, invalid} end |