summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_stream/src
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_stream/src')
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl36
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl62
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_utils.erl4
3 files changed, 50 insertions, 52 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index ced9f35b8b..1f2b5a5c97 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -185,19 +185,19 @@ handle_call({create, VirtualHost, Reference, Arguments, Username},
{reply, {error, reference_already_exists},
State};
{error, Err} ->
- rabbit_log:warning("Error while creating ~p stream, ~p~n",
+ rabbit_log:warning("Error while creating ~p stream, ~p",
[Reference, Err]),
{reply, {error, internal_error}, State}
end
catch
exit:Error ->
- rabbit_log:info("Error while creating ~p stream, ~p~n",
- [Reference, Error]),
+ rabbit_log:error("Error while creating ~p stream, ~p",
+ [Reference, Error]),
{reply, {error, internal_error}, State}
end;
{error, {absent, _, Reason}} ->
- rabbit_log:warning("Error while creating ~p stream, ~p~n",
- [Reference, Reason]),
+ rabbit_log:error("Error while creating ~p stream, ~p",
+ [Reference, Reason]),
{reply, {error, internal_error}, State}
end;
error ->
@@ -209,27 +209,25 @@ handle_call({delete, VirtualHost, Reference, Username}, _From,
#resource{virtual_host = VirtualHost,
kind = queue,
name = Reference},
- rabbit_log:debug("Trying to delete stream ~p~n", [Reference]),
+ rabbit_log:debug("Trying to delete stream ~p", [Reference]),
case rabbit_amqqueue:lookup(Name) of
{ok, Q} ->
- rabbit_log:debug("Found queue record ~p, checking if it is a stream~n",
+ rabbit_log:debug("Found queue record ~p, checking if it is a stream",
[Reference]),
case is_stream_queue(Q) of
true ->
- rabbit_log:debug("Queue record ~p is a stream, trying to delete "
- "it~n",
+ rabbit_log:debug("Queue record ~p is a stream, trying to delete it",
[Reference]),
- {ok, _} =
- rabbit_stream_queue:delete(Q, false, false, Username),
- rabbit_log:debug("Stream ~p deleted~n", [Reference]),
+ {ok, _} = rabbit_stream_queue:delete(Q, false, false, Username),
+ rabbit_log:debug("Stream ~p deleted", [Reference]),
{reply, {ok, deleted}, State};
_ ->
- rabbit_log:debug("Queue record ~p is NOT a stream, returning error~n",
+ rabbit_log:debug("Queue record ~p is NOT a stream, returning error",
[Reference]),
{reply, {error, reference_not_found}, State}
end;
{error, not_found} ->
- rabbit_log:debug("Stream ~p not found, cannot delete it~n",
+ rabbit_log:debug("Stream ~p not found, cannot delete it",
[Reference]),
{reply, {error, reference_not_found}, State}
end;
@@ -361,8 +359,8 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
end
catch
exit:Error ->
- rabbit_log:info("Error while looking up exchange ~p, ~p~n",
- [ExchangeName, Error]),
+ rabbit_log:error("Error while looking up exchange ~p, ~p",
+ [ExchangeName, Error]),
{error, stream_not_found}
end,
{reply, Res, State};
@@ -384,8 +382,8 @@ handle_call({partitions, VirtualHost, SuperStream}, _From, State) ->
[], rabbit_binding:list_for_source(ExchangeName))}
catch
exit:Error ->
- rabbit_log:info("Error while looking up exchange ~p, ~p~n",
- [ExchangeName, Error]),
+ rabbit_log:error("Error while looking up exchange ~p, ~p",
+ [ExchangeName, Error]),
{error, stream_not_found}
end,
{reply, Res, State};
@@ -396,7 +394,7 @@ handle_cast(_, State) ->
{noreply, State}.
handle_info(Info, State) ->
- rabbit_log:info("Received info ~p~n", [Info]),
+ rabbit_log:info("Received info ~p", [Info]),
{noreply, State}.
is_stream_queue(Q) ->
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 595574443b..0303a7272f 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -210,7 +210,7 @@ init([KeepaliveSup,
heartbeat = Heartbeat});
{Error, Reason} ->
rabbit_net:fast_close(RealSocket),
- rabbit_log:warning("Closing connection because of ~p ~p~n",
+ rabbit_log:warning("Closing connection because of ~p ~p",
[Error, Reason])
end.
@@ -220,7 +220,7 @@ socket_op(Sock, Fun) ->
{ok, Res} ->
Res;
{error, Reason} ->
- rabbit_log:warning("Error during socket operation ~p~n", [Reason]),
+ rabbit_log:warning("Error during socket operation ~p", [Reason]),
rabbit_net:fast_close(RealSocket),
exit(normal)
end.
@@ -289,7 +289,7 @@ listen_loop_pre_auth(Transport,
Data),
Transport:setopts(S, [{active, once}]),
#stream_connection{connection_step = ConnectionStep} = Connection1,
- rabbit_log:info("Transitioned from ~p to ~p~n",
+ rabbit_log:info("Transitioned from ~p to ~p",
[ConnectionStep0, ConnectionStep]),
case ConnectionStep of
authenticated ->
@@ -336,12 +336,12 @@ listen_loop_pre_auth(Transport,
Configuration)
end;
{Closed, S} ->
- rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]),
+ rabbit_log:info("Socket ~w closed [~w]", [S, self()]),
ok;
{Error, S, Reason} ->
- rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]);
+ rabbit_log:info("Socket error ~p [~w]", [Reason, S, self()]);
M ->
- rabbit_log:warning("Unknown message ~p~n", [M]),
+ rabbit_log:warning("Unknown message ~p", [M]),
close(Transport, S)
end.
@@ -585,7 +585,7 @@ listen_loop_post_auth(Transport,
{queue_event, #resource{name = StreamName},
{osiris_offset, _QueueResource, -1}}} ->
rabbit_log:info("received osiris offset event for ~p with offset "
- "~p~n",
+ "~p",
[StreamName, -1]),
listen_loop_post_auth(Transport, Connection, State, Configuration);
{'$gen_cast',
@@ -650,7 +650,7 @@ listen_loop_post_auth(Transport,
State,
Configuration);
Unexpected ->
- rabbit_log:info("Heartbeat send error ~p, closing connection~n",
+ rabbit_log:info("Heartbeat send error ~p, closing connection",
[Unexpected]),
C1 = demonitor_all_streams(Connection),
close(Transport, C1)
@@ -691,7 +691,7 @@ listen_loop_post_auth(Transport,
{'$gen_call', From, {shutdown, Explanation}} ->
% likely closing call from the management plugin
gen_server:reply(From, ok),
- rabbit_log:info("Forcing stream connection ~p closing: ~p~n",
+ rabbit_log:info("Forcing stream connection ~p closing: ~p",
[self(), Explanation]),
demonitor_all_streams(Connection),
rabbit_networking:unregister_non_amqp_connection(self()),
@@ -702,15 +702,15 @@ listen_loop_post_auth(Transport,
demonitor_all_streams(Connection),
rabbit_networking:unregister_non_amqp_connection(self()),
notify_connection_closed(Connection, State),
- rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]),
+ rabbit_log:warning("Socket ~w closed [~w]", [S, self()]),
ok;
{Error, S, Reason} ->
demonitor_all_streams(Connection),
rabbit_networking:unregister_non_amqp_connection(self()),
notify_connection_closed(Connection, State),
- rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]);
+ rabbit_log:error("Socket error ~p [~w]", [Reason, S, self()]);
M ->
- rabbit_log:warning("Unknown message ~p~n", [M]),
+ rabbit_log:warning("Unknown message ~p", [M]),
%% FIXME send close
listen_loop_post_auth(Transport, Connection, State, Configuration)
end.
@@ -747,15 +747,15 @@ listen_loop_post_close(Transport,
{Closed, S} ->
rabbit_networking:unregister_non_amqp_connection(self()),
notify_connection_closed(Connection, State),
- rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]),
+ rabbit_log:info("Socket ~w closed [~w]", [S, self()]),
ok;
{Error, S, Reason} ->
- rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]),
+ rabbit_log:info("Socket error ~p [~w]", [Reason, S, self()]),
close(Transport, S),
rabbit_networking:unregister_non_amqp_connection(self()),
notify_connection_closed(Connection, State);
M ->
- rabbit_log:warning("Ignored message on closing ~p~n", [M])
+ rabbit_log:warning("Ignored message on closing ~p", [M])
end.
handle_inbound_data_pre_auth(Transport, Connection, State, Rest) ->
@@ -1037,7 +1037,7 @@ handle_frame_pre_auth(Transport,
rabbit_core_metrics:auth_attempt_failed(RemoteAddress,
Username,
stream),
- rabbit_log:warning("User '~s' can only connect via localhost~n",
+ rabbit_log:warning("User '~s' can only connect via localhost",
[Username]),
{C1#stream_connection{connection_step =
failure},
@@ -1076,7 +1076,7 @@ handle_frame_pre_auth(_Transport,
FrameMax:32,
Heartbeat:32>>,
Rest) ->
- rabbit_log:info("Tuning response ~p ~p ~n", [FrameMax, Heartbeat]),
+ rabbit_log:debug("Tuning response ~p ~p ", [FrameMax, Heartbeat]),
Parent = self(),
%% sending a message to the main process so the heartbeat frame is sent from this main process
%% otherwise heartbeat frames can interleave with chunk delivery
@@ -1149,7 +1149,7 @@ handle_frame_pre_auth(_Transport,
rabbit_log:info("Received heartbeat frame pre auth~n"),
{Connection, State, Rest};
handle_frame_pre_auth(_Transport, Connection, State, Frame, Rest) ->
- rabbit_log:warning("unknown frame ~p ~p, closing connection.~n",
+ rabbit_log:warning("unknown frame ~p ~p, closing connection.",
[Frame, Rest]),
{Connection#stream_connection{connection_step = failure}, State,
Rest}.
@@ -1513,12 +1513,12 @@ handle_frame_post_auth(Transport,
{{timestamp, Timestamp}, Crdt}
end,
rabbit_log:info("Creating subscription ~p to ~p, with offset specificat"
- "ion ~p~n",
+ "ion ~p",
[SubscriptionId, Stream,
OffsetSpec]),
{ok, Segment} =
osiris:init_reader(LocalMemberPid, OffsetSpec),
- rabbit_log:info("Next offset for subscription ~p is ~p~n",
+ rabbit_log:info("Next offset for subscription ~p is ~p",
[SubscriptionId,
osiris_log:next_offset(Segment)]),
ConsumerCounters =
@@ -1543,7 +1543,7 @@ handle_frame_post_auth(Transport,
CorrelationId),
rabbit_log:info("Distributing existing messages to subscription "
- "~p~n",
+ "~p",
[SubscriptionId]),
{{segment, Segment1}, {credit, Credit1}} =
send_chunks(Transport, ConsumerState,
@@ -1571,7 +1571,7 @@ handle_frame_post_auth(Transport,
ConsumerOffset = osiris_log:next_offset(Segment1),
rabbit_log:info("Subscription ~p is now at offset ~p with ~p message(s) "
- "distributed after subscription~n",
+ "distributed after subscription",
[SubscriptionId, ConsumerOffset,
messages_consumed(ConsumerCounters1)]),
@@ -1626,7 +1626,7 @@ handle_frame_post_auth(Transport,
Consumer1}},
Rest};
_ ->
- rabbit_log:warning("Giving credit to unknown subscription: ~p~n",
+ rabbit_log:warning("Giving credit to unknown subscription: ~p",
[SubscriptionId]),
Frame =
<<?RESPONSE:1,
@@ -1663,8 +1663,8 @@ handle_frame_post_auth(_Transport,
ok ->
case lookup_leader(Stream, Connection) of
cluster_not_found ->
- rabbit_log:info("Could not find leader to commit offset on ~p~n",
- [Stream]),
+ rabbit_log:warning("Could not find leader to commit offset on ~p",
+ [Stream]),
%% FIXME commit offset is fire-and-forget, so no response even if error, change this?
{Connection, State, Rest};
{ClusterLeader, Connection1} ->
@@ -1673,7 +1673,7 @@ handle_frame_post_auth(_Transport,
end;
error ->
%% FIXME commit offset is fire-and-forget, so no response even if error, change this?
- rabbit_log:info("Not authorized to commit offset on ~p~n",
+ rabbit_log:info("Not authorized to commit offset on ~p",
[Stream]),
{Connection, State, Rest}
end;
@@ -1816,7 +1816,7 @@ handle_frame_post_auth(Transport,
{ok,
#{leader_pid := LeaderPid,
replica_pids := ReturnedReplicas}} ->
- rabbit_log:info("Created cluster with leader ~p and replicas ~p~n",
+ rabbit_log:info("Created cluster with leader ~p and replicas ~p",
[LeaderPid, ReturnedReplicas]),
response_ok(Transport,
Connection,
@@ -1981,7 +1981,7 @@ handle_frame_post_auth(Transport,
{port, Port}}},
Index + 1};
_ ->
- rabbit_log:warning("Error when retrieving broker metadata: ~p ~p~n",
+ rabbit_log:warning("Error when retrieving broker metadata: ~p ~p",
[Host, Port]),
{Acc, Index}
end
@@ -2151,7 +2151,7 @@ handle_frame_post_auth(Transport,
ClosingReasonLength:16,
ClosingReason:ClosingReasonLength/binary>>,
_Rest) ->
- rabbit_log:info("Received close command ~p ~p~n",
+ rabbit_log:info("Received close command ~p ~p",
[ClosingCode, ClosingReason]),
Frame =
<<?RESPONSE:1,
@@ -2170,7 +2170,7 @@ handle_frame_post_auth(_Transport,
rabbit_log:info("Received heartbeat frame post auth~n"),
{Connection, State, Rest};
handle_frame_post_auth(Transport, Connection, State, Frame, Rest) ->
- rabbit_log:warning("unknown frame ~p ~p, sending close command.~n",
+ rabbit_log:warning("unknown frame ~p ~p, sending close command.",
[Frame, Rest]),
CloseReason = <<"unknown frame">>,
CloseReasonLength = byte_size(CloseReason),
@@ -2230,7 +2230,7 @@ handle_frame_post_close(_Transport,
rabbit_log:info("Received heartbeat frame post close~n"),
{Connection, State, Rest};
handle_frame_post_close(_Transport, Connection, State, Frame, Rest) ->
- rabbit_log:warning("ignored frame on close ~p ~p.~n", [Frame, Rest]),
+ rabbit_log:warning("ignored frame on close ~p ~p.", [Frame, Rest]),
{Connection, State, Rest}.
stream_r(Stream, #stream_connection{virtual_host = VHost}) ->
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
index 205cc9f6bb..1985c93f20 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
@@ -143,7 +143,7 @@ auth_mechanisms(Sock) ->
auth_mechanism_to_module(TypeBin, Sock) ->
case rabbit_registry:binary_to_type(TypeBin) of
{error, not_found} ->
- rabbit_log:warning("Unknown authentication mechanism '~p'~n",
+ rabbit_log:warning("Unknown authentication mechanism '~p'",
[TypeBin]),
{error, not_found};
T ->
@@ -154,7 +154,7 @@ auth_mechanism_to_module(TypeBin, Sock) ->
{true, {ok, Module}} ->
{ok, Module};
_ ->
- rabbit_log:warning("Invalid authentication mechanism '~p'~n",
+ rabbit_log:warning("Invalid authentication mechanism '~p'",
[T]),
{error, invalid}
end