summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_reader.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/src/rabbit_reader.erl')
-rw-r--r--deps/rabbit/src/rabbit_reader.erl1803
1 files changed, 1803 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl
new file mode 100644
index 0000000000..c91dbbc105
--- /dev/null
+++ b/deps/rabbit/src/rabbit_reader.erl
@@ -0,0 +1,1803 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_reader).
+
+%% Transitional step until we can require Erlang/OTP 21 and
+%% use the now recommended try/catch syntax for obtaining the stack trace.
+-compile(nowarn_deprecated_function).
+
+%% This is an AMQP 0-9-1 connection implementation. If AMQP 1.0 plugin is enabled,
+%% this module passes control of incoming AMQP 1.0 connections to it.
+%%
+%% Every connection (as in, a process using this module)
+%% is a controlling process for a server socket.
+%%
+%% Connections have a number of responsibilities:
+%%
+%% * Performing protocol handshake
+%% * Parsing incoming data and dispatching protocol methods
+%% * Authenticating clients (with the help of authentication backends)
+%% * Enforcing TCP backpressure (throttling clients)
+%% * Enforcing connection limits, e.g. channel_max
+%% * Channel management
+%% * Setting up heartbeater and alarm notifications
+%% * Emitting connection and network activity metric events
+%% * Gracefully handling client disconnects, channel termination, etc
+%%
+%% and a few more.
+%%
+%% Every connection has
+%%
+%% * a queue collector which is responsible for keeping
+%% track of exclusive queues on the connection and their cleanup.
+%% * a heartbeater that's responsible for sending heartbeat frames to clients,
+%% keeping track of the incoming ones and notifying connection about
+%% heartbeat timeouts
+%% * Stats timer, a timer that is used to periodically emit metric events
+%%
+%% Some dependencies are started under a separate supervisor to avoid deadlocks
+%% during system shutdown. See rabbit_channel_sup:start_link/0 for details.
+%%
+%% Reader processes are special processes (in the OTP sense).
+
+-include("rabbit_framing.hrl").
+-include("rabbit.hrl").
+
+-export([start_link/2, info_keys/0, info/1, info/2, force_event_refresh/2,
+ shutdown/2]).
+
+-export([system_continue/3, system_terminate/4, system_code_change/4]).
+
+-export([init/3, mainloop/4, recvloop/4]).
+
+-export([conserve_resources/3, server_properties/1]).
+
+-define(NORMAL_TIMEOUT, 3).
+-define(CLOSING_TIMEOUT, 30).
+-define(CHANNEL_TERMINATION_TIMEOUT, 3).
+%% we wait for this many seconds before closing TCP connection
+%% with a client that failed to log in. Provides some relief
+%% from connection storms and DoS.
+-define(SILENT_CLOSE_DELAY, 3).
+-define(CHANNEL_MIN, 1).
+
+%%--------------------------------------------------------------------------
+
+-record(v1, {
+ %% parent process
+ parent,
+ %% socket
+ sock,
+ %% connection state, see connection record
+ connection,
+ callback,
+ recv_len,
+ pending_recv,
+ %% pre_init | securing | running | blocking | blocked | closing | closed | {become, F}
+ connection_state,
+ %% see comment in rabbit_connection_sup:start_link/0
+ helper_sup,
+ %% takes care of cleaning up exclusive queues,
+ %% see rabbit_queue_collector
+ queue_collector,
+ %% sends and receives heartbeat frames,
+ %% see rabbit_heartbeat
+ heartbeater,
+ %% timer used to emit statistics
+ stats_timer,
+ %% channel supervisor
+ channel_sup_sup_pid,
+ %% how many channels this connection has
+ channel_count,
+ %% throttling state, for both
+ %% credit- and resource-driven flow control
+ throttle,
+ proxy_socket}).
+
+-record(throttle, {
+ %% never | timestamp()
+ last_blocked_at,
+ %% a set of the reasons why we are
+ %% blocked: {resource, memory}, {resource, disk}.
+ %% More reasons can be added in the future.
+ blocked_by,
+ %% true if received any publishes, false otherwise
+ %% note that this will also be true when connection is
+ %% already blocked
+ should_block,
+ %% true if we had we sent a connection.blocked,
+ %% false otherwise
+ connection_blocked_message_sent
+}).
+
+-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
+ send_pend, state, channels, reductions,
+ garbage_collection]).
+
+-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
+-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels,
+ garbage_collection]).
+
+-define(CREATION_EVENT_KEYS,
+ [pid, name, port, peer_port, host,
+ peer_host, ssl, peer_cert_subject, peer_cert_issuer,
+ peer_cert_validity, auth_mechanism, ssl_protocol,
+ ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
+ timeout, frame_max, channel_max, client_properties, connected_at,
+ node, user_who_performed_action]).
+
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+
+-define(AUTH_NOTIFICATION_INFO_KEYS,
+ [host, name, peer_host, peer_port, protocol, auth_mechanism,
+ ssl, ssl_protocol, ssl_cipher, peer_cert_issuer, peer_cert_subject,
+ peer_cert_validity]).
+
+-define(IS_RUNNING(State),
+ (State#v1.connection_state =:= running orelse
+ State#v1.connection_state =:= blocked)).
+
+-define(IS_STOPPING(State),
+ (State#v1.connection_state =:= closing orelse
+ State#v1.connection_state =:= closed)).
+
+%%--------------------------------------------------------------------------
+
+-type resource_alert() :: {WasAlarmSetForNode :: boolean(),
+ IsThereAnyAlarmsWithSameSourceInTheCluster :: boolean(),
+ NodeForWhichAlarmWasSetOrCleared :: node()}.
+
+%%--------------------------------------------------------------------------
+
+-spec start_link(pid(), any()) -> rabbit_types:ok(pid()).
+
+start_link(HelperSup, Ref) ->
+ Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSup, Ref]),
+
+ {ok, Pid}.
+
+-spec shutdown(pid(), string()) -> 'ok'.
+
+shutdown(Pid, Explanation) ->
+ gen_server:call(Pid, {shutdown, Explanation}, infinity).
+
+-spec init(pid(), pid(), any()) -> no_return().
+
+init(Parent, HelperSup, Ref) ->
+ ?LG_PROCESS_TYPE(reader),
+ {ok, Sock} = rabbit_networking:handshake(Ref,
+ application:get_env(rabbit, proxy_protocol, false)),
+ Deb = sys:debug_options([]),
+ start_connection(Parent, HelperSup, Deb, Sock).
+
+-spec system_continue(_,_,{[binary()], non_neg_integer(), #v1{}}) -> any().
+
+system_continue(Parent, Deb, {Buf, BufLen, State}) ->
+ mainloop(Deb, Buf, BufLen, State#v1{parent = Parent}).
+
+-spec system_terminate(_,_,_,_) -> no_return().
+
+system_terminate(Reason, _Parent, _Deb, _State) ->
+ exit(Reason).
+
+-spec system_code_change(_,_,_,_) -> {'ok',_}.
+
+system_code_change(Misc, _Module, _OldVsn, _Extra) ->
+ {ok, Misc}.
+
+-spec info_keys() -> rabbit_types:info_keys().
+
+info_keys() -> ?INFO_KEYS.
+
+-spec info(pid()) -> rabbit_types:infos().
+
+info(Pid) ->
+ gen_server:call(Pid, info, infinity).
+
+-spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos().
+
+info(Pid, Items) ->
+ case gen_server:call(Pid, {info, Items}, infinity) of
+ {ok, Res} -> Res;
+ {error, Error} -> throw(Error)
+ end.
+
+-spec force_event_refresh(pid(), reference()) -> 'ok'.
+
+% Note: https://www.pivotaltracker.com/story/show/166962656
+% This event is necessary for the stats timer to be initialized with
+% the correct values once the management agent has started
+force_event_refresh(Pid, Ref) ->
+ gen_server:cast(Pid, {force_event_refresh, Ref}).
+
+-spec conserve_resources(pid(), atom(), resource_alert()) -> 'ok'.
+
+conserve_resources(Pid, Source, {_, Conserve, _}) ->
+ Pid ! {conserve_resources, Source, Conserve},
+ ok.
+
+-spec server_properties(rabbit_types:protocol()) ->
+ rabbit_framing:amqp_table().
+
+server_properties(Protocol) ->
+ {ok, Product} = application:get_key(rabbit, description),
+ {ok, Version} = application:get_key(rabbit, vsn),
+
+ %% Get any configuration-specified server properties
+ {ok, RawConfigServerProps} = application:get_env(rabbit,
+ server_properties),
+
+ %% Normalize the simplified (2-tuple) and unsimplified (3-tuple) forms
+ %% from the config and merge them with the generated built-in properties
+ NormalizedConfigServerProps =
+ [{<<"capabilities">>, table, server_capabilities(Protocol)} |
+ [case X of
+ {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)),
+ longstr,
+ maybe_list_to_binary(Value)};
+ {BinKey, Type, Value} -> {BinKey, Type, Value}
+ end || X <- RawConfigServerProps ++
+ [{product, Product},
+ {version, Version},
+ {cluster_name, rabbit_nodes:cluster_name()},
+ {platform, rabbit_misc:platform_and_version()},
+ {copyright, ?COPYRIGHT_MESSAGE},
+ {information, ?INFORMATION_MESSAGE}]]],
+
+ %% Filter duplicated properties in favour of config file provided values
+ lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end,
+ NormalizedConfigServerProps).
+
+maybe_list_to_binary(V) when is_binary(V) -> V;
+maybe_list_to_binary(V) when is_list(V) -> list_to_binary(V).
+
+server_capabilities(rabbit_framing_amqp_0_9_1) ->
+ [{<<"publisher_confirms">>, bool, true},
+ {<<"exchange_exchange_bindings">>, bool, true},
+ {<<"basic.nack">>, bool, true},
+ {<<"consumer_cancel_notify">>, bool, true},
+ {<<"connection.blocked">>, bool, true},
+ {<<"consumer_priorities">>, bool, true},
+ {<<"authentication_failure_close">>, bool, true},
+ {<<"per_consumer_qos">>, bool, true},
+ {<<"direct_reply_to">>, bool, true}];
+server_capabilities(_) ->
+ [].
+
+%%--------------------------------------------------------------------------
+
+socket_error(Reason) when is_atom(Reason) ->
+ rabbit_log_connection:error("Error on AMQP connection ~p: ~s~n",
+ [self(), rabbit_misc:format_inet_error(Reason)]);
+socket_error(Reason) ->
+ Fmt = "Error on AMQP connection ~p:~n~p~n",
+ Args = [self(), Reason],
+ case Reason of
+ %% The socket was closed while upgrading to SSL.
+ %% This is presumably a TCP healthcheck, so don't log
+ %% it unless specified otherwise.
+ {ssl_upgrade_error, closed} ->
+ %% Lager sinks (rabbit_log_connection)
+ %% are handled by the lager parse_transform.
+ %% Hence have to define the loglevel as a function call.
+ rabbit_log_connection:debug(Fmt, Args);
+ _ ->
+ rabbit_log_connection:error(Fmt, Args)
+ end.
+
+inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
+
+socket_op(Sock, Fun) ->
+ RealSocket = rabbit_net:unwrap_socket(Sock),
+ case Fun(Sock) of
+ {ok, Res} -> Res;
+ {error, Reason} -> socket_error(Reason),
+ rabbit_net:fast_close(RealSocket),
+ exit(normal)
+ end.
+
+-spec start_connection(pid(), pid(), any(), rabbit_net:socket()) ->
+ no_return().
+
+start_connection(Parent, HelperSup, Deb, Sock) ->
+ process_flag(trap_exit, true),
+ RealSocket = rabbit_net:unwrap_socket(Sock),
+ Name = case rabbit_net:connection_string(Sock, inbound) of
+ {ok, Str} -> list_to_binary(Str);
+ {error, enotconn} -> rabbit_net:fast_close(RealSocket),
+ exit(normal);
+ {error, Reason} -> socket_error(Reason),
+ rabbit_net:fast_close(RealSocket),
+ exit(normal)
+ end,
+ {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout),
+ InitialFrameMax = application:get_env(rabbit, initial_frame_max, ?FRAME_MIN_SIZE),
+ erlang:send_after(HandshakeTimeout, self(), handshake_timeout),
+ {PeerHost, PeerPort, Host, Port} =
+ socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
+ ?store_proc_name(Name),
+ State = #v1{parent = Parent,
+ sock = RealSocket,
+ connection = #connection{
+ name = Name,
+ log_name = Name,
+ host = Host,
+ peer_host = PeerHost,
+ port = Port,
+ peer_port = PeerPort,
+ protocol = none,
+ user = none,
+ timeout_sec = (HandshakeTimeout / 1000),
+ frame_max = InitialFrameMax,
+ vhost = none,
+ client_properties = none,
+ capabilities = [],
+ auth_mechanism = none,
+ auth_state = none,
+ connected_at = os:system_time(
+ milli_seconds)},
+ callback = uninitialized_callback,
+ recv_len = 0,
+ pending_recv = false,
+ connection_state = pre_init,
+ queue_collector = undefined, %% started on tune-ok
+ helper_sup = HelperSup,
+ heartbeater = none,
+ channel_sup_sup_pid = none,
+ channel_count = 0,
+ throttle = #throttle{
+ last_blocked_at = never,
+ should_block = false,
+ blocked_by = sets:new(),
+ connection_blocked_message_sent = false
+ },
+ proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock)},
+ try
+ case run({?MODULE, recvloop,
+ [Deb, [], 0, switch_callback(rabbit_event:init_stats_timer(
+ State, #v1.stats_timer),
+ handshake, 8)]}) of
+ %% connection was closed cleanly by the client
+ #v1{connection = #connection{user = #user{username = Username},
+ vhost = VHost}} ->
+ rabbit_log_connection:info("closing AMQP connection ~p (~s, vhost: '~s', user: '~s')~n",
+ [self(), dynamic_connection_name(Name), VHost, Username]);
+ %% just to be more defensive
+ _ ->
+ rabbit_log_connection:info("closing AMQP connection ~p (~s)~n",
+ [self(), dynamic_connection_name(Name)])
+ end
+ catch
+ Ex ->
+ log_connection_exception(dynamic_connection_name(Name), Ex)
+ after
+ %% We don't call gen_tcp:close/1 here since it waits for
+ %% pending output to be sent, which results in unnecessary
+ %% delays. We could just terminate - the reader is the
+ %% controlling process and hence its termination will close
+ %% the socket. However, to keep the file_handle_cache
+ %% accounting as accurate as possible we ought to close the
+ %% socket w/o delay before termination.
+ rabbit_net:fast_close(RealSocket),
+ rabbit_networking:unregister_connection(self()),
+ rabbit_core_metrics:connection_closed(self()),
+ ClientProperties = case get(client_properties) of
+ undefined ->
+ [];
+ Properties ->
+ Properties
+ end,
+ EventProperties = [{name, Name},
+ {pid, self()},
+ {node, node()},
+ {client_properties, ClientProperties}],
+ EventProperties1 = case get(connection_user_provided_name) of
+ undefined ->
+ EventProperties;
+ ConnectionUserProvidedName ->
+ [{user_provided_name, ConnectionUserProvidedName} | EventProperties]
+ end,
+ rabbit_event:notify(connection_closed, EventProperties1)
+ end,
+ done.
+
+log_connection_exception(Name, Ex) ->
+ Severity = case Ex of
+ connection_closed_with_no_data_received -> debug;
+ {connection_closed_abruptly, _} -> warning;
+ connection_closed_abruptly -> warning;
+ _ -> error
+ end,
+ log_connection_exception(Severity, Name, Ex).
+
+log_connection_exception(Severity, Name, {heartbeat_timeout, TimeoutSec}) ->
+ %% Long line to avoid extra spaces and line breaks in log
+ log_connection_exception_with_severity(Severity,
+ "closing AMQP connection ~p (~s):~n"
+ "missed heartbeats from client, timeout: ~ps~n",
+ [self(), Name, TimeoutSec]);
+log_connection_exception(Severity, Name, {connection_closed_abruptly,
+ #v1{connection = #connection{user = #user{username = Username},
+ vhost = VHost}}}) ->
+ log_connection_exception_with_severity(Severity,
+ "closing AMQP connection ~p (~s, vhost: '~s', user: '~s'):~nclient unexpectedly closed TCP connection~n",
+ [self(), Name, VHost, Username]);
+%% when client abruptly closes connection before connection.open/authentication/authorization
+%% succeeded, don't log username and vhost as 'none'
+log_connection_exception(Severity, Name, {connection_closed_abruptly, _}) ->
+ log_connection_exception_with_severity(Severity,
+ "closing AMQP connection ~p (~s):~nclient unexpectedly closed TCP connection~n",
+ [self(), Name]);
+%% failed connection.tune negotiations
+log_connection_exception(Severity, Name, {handshake_error, tuning, _Channel,
+ {exit, #amqp_error{explanation = Explanation},
+ _Method, _Stacktrace}}) ->
+ log_connection_exception_with_severity(Severity,
+ "closing AMQP connection ~p (~s):~nfailed to negotiate connection parameters: ~s~n",
+ [self(), Name, Explanation]);
+%% old exception structure
+log_connection_exception(Severity, Name, connection_closed_abruptly) ->
+ log_connection_exception_with_severity(Severity,
+ "closing AMQP connection ~p (~s):~n"
+ "client unexpectedly closed TCP connection~n",
+ [self(), Name]);
+log_connection_exception(Severity, Name, Ex) ->
+ log_connection_exception_with_severity(Severity,
+ "closing AMQP connection ~p (~s):~n~p~n",
+ [self(), Name, Ex]).
+
+log_connection_exception_with_severity(Severity, Fmt, Args) ->
+ case Severity of
+ debug -> rabbit_log_connection:debug(Fmt, Args);
+ warning -> rabbit_log_connection:warning(Fmt, Args);
+ error -> rabbit_log_connection:error(Fmt, Args)
+ end.
+
+run({M, F, A}) ->
+ try apply(M, F, A)
+ catch {become, MFA} -> run(MFA)
+ end.
+
+recvloop(Deb, Buf, BufLen, State = #v1{pending_recv = true}) ->
+ mainloop(Deb, Buf, BufLen, State);
+recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) ->
+ mainloop(Deb, Buf, BufLen, State);
+recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) ->
+ throw({become, F(Deb, Buf, BufLen, State)});
+recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen})
+ when BufLen < RecvLen ->
+ case rabbit_net:setopts(Sock, [{active, once}]) of
+ ok -> mainloop(Deb, Buf, BufLen,
+ State#v1{pending_recv = true});
+ {error, Reason} -> stop(Reason, State)
+ end;
+recvloop(Deb, [B], _BufLen, State) ->
+ {Rest, State1} = handle_input(State#v1.callback, B, State),
+ recvloop(Deb, [Rest], size(Rest), State1);
+recvloop(Deb, Buf, BufLen, State = #v1{recv_len = RecvLen}) ->
+ {DataLRev, RestLRev} = binlist_split(BufLen - RecvLen, Buf, []),
+ Data = list_to_binary(lists:reverse(DataLRev)),
+ {<<>>, State1} = handle_input(State#v1.callback, Data, State),
+ recvloop(Deb, lists:reverse(RestLRev), BufLen - RecvLen, State1).
+
+binlist_split(0, L, Acc) ->
+ {L, Acc};
+binlist_split(Len, L, [Acc0|Acc]) when Len < 0 ->
+ {H, T} = split_binary(Acc0, -Len),
+ {[H|L], [T|Acc]};
+binlist_split(Len, [H|T], Acc) ->
+ binlist_split(Len - size(H), T, [H|Acc]).
+
+-spec mainloop(_,[binary()], non_neg_integer(), #v1{}) -> any().
+
+mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
+ connection_state = CS,
+ connection = #connection{
+ name = ConnName}}) ->
+ Recv = rabbit_net:recv(Sock),
+ case CS of
+ pre_init when Buf =:= [] ->
+ %% We only log incoming connections when either the
+ %% first byte was received or there was an error (eg. a
+ %% timeout).
+ %%
+ %% The goal is to not log TCP healthchecks (a connection
+ %% with no data received) unless specified otherwise.
+ Fmt = "accepting AMQP connection ~p (~s)~n",
+ Args = [self(), ConnName],
+ case Recv of
+ closed -> rabbit_log_connection:debug(Fmt, Args);
+ _ -> rabbit_log_connection:info(Fmt, Args)
+ end;
+ _ ->
+ ok
+ end,
+ case Recv of
+ {data, Data} ->
+ recvloop(Deb, [Data | Buf], BufLen + size(Data),
+ State#v1{pending_recv = false});
+ closed when State#v1.connection_state =:= closed ->
+ State;
+ closed when CS =:= pre_init andalso Buf =:= [] ->
+ stop(tcp_healthcheck, State);
+ closed ->
+ stop(closed, State);
+ {other, {heartbeat_send_error, Reason}} ->
+ %% The only portable way to detect disconnect on blocked
+ %% connection is to wait for heartbeat send failure.
+ stop(Reason, State);
+ {error, Reason} ->
+ stop(Reason, State);
+ {other, {system, From, Request}} ->
+ sys:handle_system_msg(Request, From, State#v1.parent,
+ ?MODULE, Deb, {Buf, BufLen, State});
+ {other, Other} ->
+ case handle_other(Other, State) of
+ stop -> State;
+ NewState -> recvloop(Deb, Buf, BufLen, NewState)
+ end
+ end.
+
+-spec stop(_, #v1{}) -> no_return().
+stop(tcp_healthcheck, State) ->
+ %% The connection was closed before any packet was received. It's
+ %% probably a load-balancer healthcheck: don't consider this a
+ %% failure.
+ maybe_emit_stats(State),
+ throw(connection_closed_with_no_data_received);
+stop(closed, State) ->
+ maybe_emit_stats(State),
+ throw({connection_closed_abruptly, State});
+stop(Reason, State) ->
+ maybe_emit_stats(State),
+ throw({inet_error, Reason}).
+
+handle_other({conserve_resources, Source, Conserve},
+ State = #v1{throttle = Throttle = #throttle{blocked_by = Blockers}}) ->
+ Resource = {resource, Source},
+ Blockers1 = case Conserve of
+ true -> sets:add_element(Resource, Blockers);
+ false -> sets:del_element(Resource, Blockers)
+ end,
+ control_throttle(State#v1{throttle = Throttle#throttle{blocked_by = Blockers1}});
+handle_other({channel_closing, ChPid}, State) ->
+ ok = rabbit_channel:ready_for_close(ChPid),
+ {_, State1} = channel_cleanup(ChPid, State),
+ maybe_close(control_throttle(State1));
+handle_other({'EXIT', Parent, normal}, State = #v1{parent = Parent}) ->
+ %% rabbitmq/rabbitmq-server#544
+ %% The connection port process has exited due to the TCP socket being closed.
+ %% Handle this case in the same manner as receiving {error, closed}
+ stop(closed, State);
+handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
+ Msg = io_lib:format("broker forced connection closure with reason '~w'", [Reason]),
+ terminate(Msg, State),
+ %% this is what we are expected to do according to
+ %% https://www.erlang.org/doc/man/sys.html
+ %%
+ %% If we wanted to be *really* nice we should wait for a while for
+ %% clients to close the socket at their end, just as we do in the
+ %% ordinary error case. However, since this termination is
+ %% initiated by our parent it is probably more important to exit
+ %% quickly.
+ maybe_emit_stats(State),
+ exit(Reason);
+handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, State) ->
+ maybe_emit_stats(State),
+ throw(E);
+handle_other({channel_exit, Channel, Reason}, State) ->
+ handle_exception(State, Channel, Reason);
+handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) ->
+ handle_dependent_exit(ChPid, Reason, State);
+handle_other(terminate_connection, State) ->
+ maybe_emit_stats(State),
+ stop;
+handle_other(handshake_timeout, State)
+ when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) ->
+ State;
+handle_other(handshake_timeout, State) ->
+ maybe_emit_stats(State),
+ throw({handshake_timeout, State#v1.callback});
+handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
+ State;
+handle_other(heartbeat_timeout,
+ State = #v1{connection = #connection{timeout_sec = T}}) ->
+ maybe_emit_stats(State),
+ throw({heartbeat_timeout, T});
+handle_other({'$gen_call', From, {shutdown, Explanation}}, State) ->
+ {ForceTermination, NewState} = terminate(Explanation, State),
+ gen_server:reply(From, ok),
+ case ForceTermination of
+ force -> stop;
+ normal -> NewState
+ end;
+handle_other({'$gen_call', From, info}, State) ->
+ gen_server:reply(From, infos(?INFO_KEYS, State)),
+ State;
+handle_other({'$gen_call', From, {info, Items}}, State) ->
+ gen_server:reply(From, try {ok, infos(Items, State)}
+ catch Error -> {error, Error}
+ end),
+ State;
+handle_other({'$gen_cast', {force_event_refresh, Ref}}, State)
+ when ?IS_RUNNING(State) ->
+ rabbit_event:notify(
+ connection_created,
+ augment_infos_with_user_provided_connection_name(
+ [{type, network} | infos(?CREATION_EVENT_KEYS, State)], State),
+ Ref),
+ rabbit_event:init_stats_timer(State, #v1.stats_timer);
+handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
+ %% Ignore, we will emit a created event once we start running.
+ State;
+handle_other(ensure_stats, State) ->
+ ensure_stats_timer(State);
+handle_other(emit_stats, State) ->
+ emit_stats(State);
+handle_other({bump_credit, Msg}, State) ->
+ %% Here we are receiving credit by some channel process.
+ credit_flow:handle_bump_msg(Msg),
+ control_throttle(State);
+handle_other(Other, State) ->
+ %% internal error -> something worth dying for
+ maybe_emit_stats(State),
+ exit({unexpected_message, Other}).
+
+switch_callback(State, Callback, Length) ->
+ State#v1{callback = Callback, recv_len = Length}.
+
+terminate(Explanation, State) when ?IS_RUNNING(State) ->
+ {normal, handle_exception(State, 0,
+ rabbit_misc:amqp_error(
+ connection_forced, "~s", [Explanation], none))};
+terminate(_Explanation, State) ->
+ {force, State}.
+
+send_blocked(#v1{connection = #connection{protocol = Protocol,
+ capabilities = Capabilities},
+ sock = Sock}, Reason) ->
+ case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
+ {bool, true} ->
+
+ ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason},
+ Protocol);
+ _ ->
+ ok
+ end.
+
+send_unblocked(#v1{connection = #connection{protocol = Protocol,
+ capabilities = Capabilities},
+ sock = Sock}) ->
+ case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
+ {bool, true} ->
+ ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol);
+ _ ->
+ ok
+ end.
+
+%%--------------------------------------------------------------------------
+%% error handling / termination
+
+close_connection(State = #v1{queue_collector = Collector,
+ connection = #connection{
+ timeout_sec = TimeoutSec}}) ->
+ %% The spec says "Exclusive queues may only be accessed by the
+ %% current connection, and are deleted when that connection
+ %% closes." This does not strictly imply synchrony, but in
+ %% practice it seems to be what people assume.
+ clean_up_exclusive_queues(Collector),
+ %% We terminate the connection after the specified interval, but
+ %% no later than ?CLOSING_TIMEOUT seconds.
+ erlang:send_after((if TimeoutSec > 0 andalso
+ TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
+ true -> ?CLOSING_TIMEOUT
+ end) * 1000, self(), terminate_connection),
+ State#v1{connection_state = closed}.
+
+%% queue collector will be undefined when connection
+%% tuning was never performed or didn't finish. In such cases
+%% there's also nothing to clean up.
+clean_up_exclusive_queues(undefined) ->
+ ok;
+
+clean_up_exclusive_queues(Collector) ->
+ rabbit_queue_collector:delete_all(Collector).
+
+handle_dependent_exit(ChPid, Reason, State) ->
+ {Channel, State1} = channel_cleanup(ChPid, State),
+ case {Channel, termination_kind(Reason)} of
+ {undefined, controlled} -> State1;
+ {undefined, uncontrolled} -> handle_uncontrolled_channel_close(ChPid),
+ exit({abnormal_dependent_exit,
+ ChPid, Reason});
+ {_, controlled} -> maybe_close(control_throttle(State1));
+ {_, uncontrolled} -> handle_uncontrolled_channel_close(ChPid),
+ State2 = handle_exception(
+ State1, Channel, Reason),
+ maybe_close(control_throttle(State2))
+ end.
+
+terminate_channels(#v1{channel_count = 0} = State) ->
+ State;
+terminate_channels(#v1{channel_count = ChannelCount} = State) ->
+ lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
+ Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * ChannelCount,
+ TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
+ wait_for_channel_termination(ChannelCount, TimerRef, State).
+
+wait_for_channel_termination(0, TimerRef, State) ->
+ case erlang:cancel_timer(TimerRef) of
+ false -> receive
+ cancel_wait -> State
+ end;
+ _ -> State
+ end;
+wait_for_channel_termination(N, TimerRef,
+ State = #v1{connection_state = CS,
+ connection = #connection{
+ log_name = ConnName,
+ user = User,
+ vhost = VHost},
+ sock = Sock}) ->
+ receive
+ {'DOWN', _MRef, process, ChPid, Reason} ->
+ {Channel, State1} = channel_cleanup(ChPid, State),
+ case {Channel, termination_kind(Reason)} of
+ {undefined, _} ->
+ exit({abnormal_dependent_exit, ChPid, Reason});
+ {_, controlled} ->
+ wait_for_channel_termination(N-1, TimerRef, State1);
+ {_, uncontrolled} ->
+ rabbit_log_connection:error(
+ "Error on AMQP connection ~p (~s, vhost: '~s',"
+ " user: '~s', state: ~p), channel ~p:"
+ "error while terminating:~n~p~n",
+ [self(), ConnName, VHost, User#user.username,
+ CS, Channel, Reason]),
+ handle_uncontrolled_channel_close(ChPid),
+ wait_for_channel_termination(N-1, TimerRef, State1)
+ end;
+ {'EXIT', Sock, _Reason} ->
+ clean_up_all_channels(State),
+ exit(normal);
+ cancel_wait ->
+ exit(channel_termination_timeout)
+ end.
+
+maybe_close(State = #v1{connection_state = closing,
+ channel_count = 0,
+ connection = #connection{protocol = Protocol},
+ sock = Sock}) ->
+ NewState = close_connection(State),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
+ NewState;
+maybe_close(State) ->
+ State.
+
+termination_kind(normal) -> controlled;
+termination_kind(_) -> uncontrolled.
+
+format_hard_error(#amqp_error{name = N, explanation = E, method = M}) ->
+ io_lib:format("operation ~s caused a connection exception ~s: ~p", [M, N, E]);
+format_hard_error(Reason) ->
+ case io_lib:deep_char_list(Reason) of
+ true -> Reason;
+ false -> rabbit_misc:format("~p", [Reason])
+ end.
+
+log_hard_error(#v1{connection_state = CS,
+ connection = #connection{
+ log_name = ConnName,
+ user = User,
+ vhost = VHost}}, Channel, Reason) ->
+ rabbit_log_connection:error(
+ "Error on AMQP connection ~p (~s, vhost: '~s',"
+ " user: '~s', state: ~p), channel ~p:~n ~s~n",
+ [self(), ConnName, VHost, User#user.username, CS, Channel, format_hard_error(Reason)]).
+
+handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
+ log_hard_error(State, Channel, Reason),
+ State;
+handle_exception(State = #v1{connection = #connection{protocol = Protocol},
+ connection_state = CS},
+ Channel, Reason)
+ when ?IS_RUNNING(State) orelse CS =:= closing ->
+ respond_and_close(State, Channel, Protocol, Reason, Reason);
+%% authentication failure
+handle_exception(State = #v1{connection = #connection{protocol = Protocol,
+ log_name = ConnName,
+ capabilities = Capabilities},
+ connection_state = starting},
+ Channel, Reason = #amqp_error{name = access_refused,
+ explanation = ErrMsg}) ->
+ rabbit_log_connection:error(
+ "Error on AMQP connection ~p (~s, state: ~p):~n~s~n",
+ [self(), ConnName, starting, ErrMsg]),
+ %% respect authentication failure notification capability
+ case rabbit_misc:table_lookup(Capabilities,
+ <<"authentication_failure_close">>) of
+ {bool, true} ->
+ send_error_on_channel0_and_close(Channel, Protocol, Reason, State);
+ _ ->
+ close_connection(terminate_channels(State))
+ end;
+%% when loopback-only user tries to connect from a non-local host
+%% when user tries to access a vhost it has no permissions for
+handle_exception(State = #v1{connection = #connection{protocol = Protocol,
+ log_name = ConnName,
+ user = User},
+ connection_state = opening},
+ Channel, Reason = #amqp_error{name = not_allowed,
+ explanation = ErrMsg}) ->
+ rabbit_log_connection:error(
+ "Error on AMQP connection ~p (~s, user: '~s', state: ~p):~n~s~n",
+ [self(), ConnName, User#user.username, opening, ErrMsg]),
+ send_error_on_channel0_and_close(Channel, Protocol, Reason, State);
+handle_exception(State = #v1{connection = #connection{protocol = Protocol},
+ connection_state = CS = opening},
+ Channel, Reason = #amqp_error{}) ->
+ respond_and_close(State, Channel, Protocol, Reason,
+ {handshake_error, CS, Reason});
+%% when negotiation fails, e.g. due to channel_max being higher than the
+%% maximum allowed limit
+handle_exception(State = #v1{connection = #connection{protocol = Protocol,
+ log_name = ConnName,
+ user = User},
+ connection_state = tuning},
+ Channel, Reason = #amqp_error{name = not_allowed,
+ explanation = ErrMsg}) ->
+ rabbit_log_connection:error(
+ "Error on AMQP connection ~p (~s,"
+ " user: '~s', state: ~p):~n~s~n",
+ [self(), ConnName, User#user.username, tuning, ErrMsg]),
+ send_error_on_channel0_and_close(Channel, Protocol, Reason, State);
+handle_exception(State, Channel, Reason) ->
+ %% We don't trust the client at this point - force them to wait
+ %% for a bit so they can't DOS us with repeated failed logins etc.
+ timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({handshake_error, State#v1.connection_state, Channel, Reason}).
+
+%% we've "lost sync" with the client and hence must not accept any
+%% more input
+-spec fatal_frame_error(_, _, _, _, _) -> no_return().
+fatal_frame_error(Error, Type, Channel, Payload, State) ->
+ frame_error(Error, Type, Channel, Payload, State),
+ %% grace period to allow transmission of error
+ timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw(fatal_frame_error).
+
+frame_error(Error, Type, Channel, Payload, State) ->
+ {Str, Bin} = payload_snippet(Payload),
+ handle_exception(State, Channel,
+ rabbit_misc:amqp_error(frame_error,
+ "type ~p, ~s octets = ~p: ~p",
+ [Type, Str, Bin, Error], none)).
+
+unexpected_frame(Type, Channel, Payload, State) ->
+ {Str, Bin} = payload_snippet(Payload),
+ handle_exception(State, Channel,
+ rabbit_misc:amqp_error(unexpected_frame,
+ "type ~p, ~s octets = ~p",
+ [Type, Str, Bin], none)).
+
+payload_snippet(Payload) when size(Payload) =< 16 ->
+ {"all", Payload};
+payload_snippet(<<Snippet:16/binary, _/binary>>) ->
+ {"first 16", Snippet}.
+
+%%--------------------------------------------------------------------------
+
+create_channel(_Channel,
+ #v1{channel_count = ChannelCount,
+ connection = #connection{channel_max = ChannelMax}})
+ when ChannelMax /= 0 andalso ChannelCount >= ChannelMax ->
+ {error, rabbit_misc:amqp_error(
+ not_allowed, "number of channels opened (~w) has reached the "
+ "negotiated channel_max (~w)",
+ [ChannelCount, ChannelMax], 'none')};
+create_channel(Channel,
+ #v1{sock = Sock,
+ queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ channel_count = ChannelCount,
+ connection =
+ #connection{name = Name,
+ protocol = Protocol,
+ frame_max = FrameMax,
+ vhost = VHost,
+ capabilities = Capabilities,
+ user = #user{username = Username} = User}
+ } = State) ->
+ case rabbit_auth_backend_internal:is_over_channel_limit(Username) of
+ false ->
+ {ok, _ChSupPid, {ChPid, AState}} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
+ Protocol, User, VHost, Capabilities,
+ Collector}),
+ MRef = erlang:monitor(process, ChPid),
+ put({ch_pid, ChPid}, {Channel, MRef}),
+ put({channel, Channel}, {ChPid, AState}),
+ {ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}};
+ {true, Limit} ->
+ {error, rabbit_misc:amqp_error(not_allowed,
+ "number of channels opened for user '~s' has reached "
+ "the maximum allowed user limit of (~w)",
+ [Username, Limit], 'none')}
+ end.
+
+channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) ->
+ case get({ch_pid, ChPid}) of
+ undefined -> {undefined, State};
+ {Channel, MRef} -> credit_flow:peer_down(ChPid),
+ erase({channel, Channel}),
+ erase({ch_pid, ChPid}),
+ erlang:demonitor(MRef, [flush]),
+ {Channel, State#v1{channel_count = ChannelCount - 1}}
+ end.
+
+all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
+
+clean_up_all_channels(State) ->
+ CleanupFun = fun(ChPid) ->
+ channel_cleanup(ChPid, State)
+ end,
+ lists:foreach(CleanupFun, all_channels()).
+
+%%--------------------------------------------------------------------------
+
+handle_frame(Type, 0, Payload,
+ State = #v1{connection = #connection{protocol = Protocol}})
+ when ?IS_STOPPING(State) ->
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
+ {method, MethodName, FieldsBin} ->
+ handle_method0(MethodName, FieldsBin, State);
+ _Other -> State
+ end;
+handle_frame(Type, 0, Payload,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
+ error -> frame_error(unknown_frame, Type, 0, Payload, State);
+ heartbeat -> State;
+ {method, MethodName, FieldsBin} ->
+ handle_method0(MethodName, FieldsBin, State);
+ _Other -> unexpected_frame(Type, 0, Payload, State)
+ end;
+handle_frame(Type, Channel, Payload,
+ State = #v1{connection = #connection{protocol = Protocol}})
+ when ?IS_RUNNING(State) ->
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
+ error -> frame_error(unknown_frame, Type, Channel, Payload, State);
+ heartbeat -> unexpected_frame(Type, Channel, Payload, State);
+ Frame -> process_frame(Frame, Channel, State)
+ end;
+handle_frame(_Type, _Channel, _Payload, State) when ?IS_STOPPING(State) ->
+ State;
+handle_frame(Type, Channel, Payload, State) ->
+ unexpected_frame(Type, Channel, Payload, State).
+
+process_frame(Frame, Channel, State) ->
+ ChKey = {channel, Channel},
+ case (case get(ChKey) of
+ undefined -> create_channel(Channel, State);
+ Other -> {ok, Other, State}
+ end) of
+ {error, Error} ->
+ handle_exception(State, Channel, Error);
+ {ok, {ChPid, AState}, State1} ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} ->
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State1);
+ {ok, Method, NewAState} ->
+ rabbit_channel:do(ChPid, Method),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State1);
+ {ok, Method, Content, NewAState} ->
+ rabbit_channel:do_flow(ChPid, Method, Content),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, control_throttle(State1));
+ {error, Reason} ->
+ handle_exception(State1, Channel, Reason)
+ end
+ end.
+
+post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
+ {_, State1} = channel_cleanup(ChPid, State),
+ %% This is not strictly necessary, but more obviously
+ %% correct. Also note that we do not need to call maybe_close/1
+ %% since we cannot possibly be in the 'closing' state.
+ control_throttle(State1);
+post_process_frame({content_header, _, _, _, _}, _ChPid, State) ->
+ publish_received(State);
+post_process_frame({content_body, _}, _ChPid, State) ->
+ publish_received(State);
+post_process_frame(_Frame, _ChPid, State) ->
+ State.
+
+%%--------------------------------------------------------------------------
+
+%% We allow clients to exceed the frame size a little bit since quite
+%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical.
+-define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE).
+
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, _/binary>>,
+ State = #v1{connection = #connection{frame_max = FrameMax}})
+ when FrameMax /= 0 andalso
+ PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE ->
+ fatal_frame_error(
+ {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE},
+ Type, Channel, <<>>, State);
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32,
+ Payload:PayloadSize/binary, ?FRAME_END,
+ Rest/binary>>,
+ State) ->
+ {Rest, ensure_stats_timer(handle_frame(Type, Channel, Payload, State))};
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>,
+ State) ->
+ {Rest, ensure_stats_timer(
+ switch_callback(State,
+ {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1))};
+handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
+ <<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data,
+ case EndMarker of
+ ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
+ {Rest, switch_callback(State1, frame_header, 7)};
+ _ -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
+ Type, Channel, Payload, State)
+ end;
+handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) ->
+ {Rest, handshake({A, B, C, D}, State)};
+handle_input(handshake, <<Other:8/binary, _/binary>>, #v1{sock = Sock}) ->
+ refuse_connection(Sock, {bad_header, Other});
+handle_input(Callback, Data, _State) ->
+ throw({bad_input, Callback, Data}).
+
+%% The two rules pertaining to version negotiation:
+%%
+%% * If the server cannot support the protocol specified in the
+%% protocol header, it MUST respond with a valid protocol header and
+%% then close the socket connection.
+%%
+%% * The server MUST provide a protocol version that is lower than or
+%% equal to that requested by the client in the protocol header.
+handshake({0, 0, 9, 1}, State) ->
+ start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State);
+
+%% This is the protocol header for 0-9, which we can safely treat as
+%% though it were 0-9-1.
+handshake({1, 1, 0, 9}, State) ->
+ start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State);
+
+%% This is what most clients send for 0-8. The 0-8 spec, confusingly,
+%% defines the version as 8-0.
+handshake({1, 1, 8, 0}, State) ->
+ start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+
+%% The 0-8 spec as on the AMQP web site actually has this as the
+%% protocol header; some libraries e.g., py-amqplib, send it when they
+%% want 0-8.
+handshake({1, 1, 9, 1}, State) ->
+ start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+
+%% ... and finally, the 1.0 spec is crystal clear!
+handshake({Id, 1, 0, 0}, State) ->
+ become_1_0(Id, State);
+
+handshake(Vsn, #v1{sock = Sock}) ->
+ refuse_connection(Sock, {bad_version, Vsn}).
+
+%% Offer a protocol version to the client. Connection.start only
+%% includes a major and minor version number, Luckily 0-9 and 0-9-1
+%% are similar enough that clients will be happy with either.
+start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
+ Protocol,
+ State = #v1{sock = Sock, connection = Connection}) ->
+ rabbit_networking:register_connection(self()),
+ Start = #'connection.start'{
+ version_major = ProtocolMajor,
+ version_minor = ProtocolMinor,
+ server_properties = server_properties(Protocol),
+ mechanisms = auth_mechanisms_binary(Sock),
+ locales = <<"en_US">> },
+ ok = send_on_channel0(Sock, Start, Protocol),
+ switch_callback(State#v1{connection = Connection#connection{
+ timeout_sec = ?NORMAL_TIMEOUT,
+ protocol = Protocol},
+ connection_state = starting},
+ frame_header, 7).
+
+-spec refuse_connection(_, _, _) -> no_return().
+refuse_connection(Sock, Exception, {A, B, C, D}) ->
+ ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",A,B,C,D>>) end),
+ throw(Exception).
+
+-spec refuse_connection(rabbit_net:socket(), any()) -> no_return().
+
+refuse_connection(Sock, Exception) ->
+ refuse_connection(Sock, Exception, {0, 0, 9, 1}).
+
+ensure_stats_timer(State = #v1{connection_state = running}) ->
+ rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats);
+ensure_stats_timer(State) ->
+ State.
+
+%%--------------------------------------------------------------------------
+
+handle_method0(MethodName, FieldsBin,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
+ try
+ handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
+ State)
+ catch throw:{inet_error, E} when E =:= closed; E =:= enotconn ->
+ maybe_emit_stats(State),
+ throw({connection_closed_abruptly, State});
+ exit:#amqp_error{method = none} = Reason ->
+ handle_exception(State, 0, Reason#amqp_error{method = MethodName});
+ Type:Reason:Stacktrace ->
+ handle_exception(State, 0, {Type, Reason, MethodName, Stacktrace})
+ end.
+
+handle_method0(#'connection.start_ok'{mechanism = Mechanism,
+ response = Response,
+ client_properties = ClientProperties},
+ State0 = #v1{connection_state = starting,
+ connection = Connection0,
+ sock = Sock}) ->
+ AuthMechanism = auth_mechanism_to_module(Mechanism, Sock),
+ Capabilities =
+ case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of
+ {table, Capabilities1} -> Capabilities1;
+ _ -> []
+ end,
+ Connection1 = Connection0#connection{
+ client_properties = ClientProperties,
+ capabilities = Capabilities,
+ auth_mechanism = {Mechanism, AuthMechanism},
+ auth_state = AuthMechanism:init(Sock)},
+ Connection2 = augment_connection_log_name(Connection1),
+ State = State0#v1{connection_state = securing,
+ connection = Connection2},
+ % adding client properties to process dictionary to send them later
+ % in the connection_closed event
+ put(client_properties, ClientProperties),
+ case user_provided_connection_name(Connection2) of
+ undefined ->
+ undefined;
+ UserProvidedConnectionName ->
+ put(connection_user_provided_name, UserProvidedConnectionName)
+ end,
+ auth_phase(Response, State);
+
+handle_method0(#'connection.secure_ok'{response = Response},
+ State = #v1{connection_state = securing}) ->
+ auth_phase(Response, State);
+
+handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
+ channel_max = ChannelMax,
+ heartbeat = ClientHeartbeat},
+ State = #v1{connection_state = tuning,
+ connection = Connection,
+ helper_sup = SupPid,
+ sock = Sock}) ->
+ ok = validate_negotiated_integer_value(
+ frame_max, ?FRAME_MIN_SIZE, FrameMax),
+ ok = validate_negotiated_integer_value(
+ channel_max, ?CHANNEL_MIN, ChannelMax),
+ {ok, Collector} = rabbit_connection_helper_sup:start_queue_collector(
+ SupPid, Connection#connection.name),
+ Frame = rabbit_binary_generator:build_heartbeat_frame(),
+ Parent = self(),
+ SendFun =
+ fun() ->
+ case catch rabbit_net:send(Sock, Frame) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ Parent ! {heartbeat_send_error, Reason};
+ Unexpected ->
+ Parent ! {heartbeat_send_error, Unexpected}
+ end,
+ ok
+ end,
+ ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
+ Heartbeater = rabbit_heartbeat:start(
+ SupPid, Sock, Connection#connection.name,
+ ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun),
+ State#v1{connection_state = opening,
+ connection = Connection#connection{
+ frame_max = FrameMax,
+ channel_max = ChannelMax,
+ timeout_sec = ClientHeartbeat},
+ queue_collector = Collector,
+ heartbeater = Heartbeater};
+
+handle_method0(#'connection.open'{virtual_host = VHost},
+ State = #v1{connection_state = opening,
+ connection = Connection = #connection{
+ log_name = ConnName,
+ user = User = #user{username = Username},
+ protocol = Protocol},
+ helper_sup = SupPid,
+ sock = Sock,
+ throttle = Throttle}) ->
+
+ ok = is_over_vhost_connection_limit(VHost, User),
+ ok = is_over_user_connection_limit(User),
+ ok = rabbit_access_control:check_vhost_access(User, VHost, {socket, Sock}, #{}),
+ ok = is_vhost_alive(VHost, User),
+ NewConnection = Connection#connection{vhost = VHost},
+ ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
+
+ Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
+ BlockedBy = sets:from_list([{resource, Alarm} || Alarm <- Alarms]),
+ Throttle1 = Throttle#throttle{blocked_by = BlockedBy},
+
+ {ok, ChannelSupSupPid} =
+ rabbit_connection_helper_sup:start_channel_sup_sup(SupPid),
+ State1 = control_throttle(
+ State#v1{connection_state = running,
+ connection = NewConnection,
+ channel_sup_sup_pid = ChannelSupSupPid,
+ throttle = Throttle1}),
+ Infos = augment_infos_with_user_provided_connection_name(
+ [{type, network} | infos(?CREATION_EVENT_KEYS, State1)],
+ State1
+ ),
+ rabbit_core_metrics:connection_created(proplists:get_value(pid, Infos),
+ Infos),
+ rabbit_event:notify(connection_created, Infos),
+ maybe_emit_stats(State1),
+ rabbit_log_connection:info(
+ "connection ~p (~s): "
+ "user '~s' authenticated and granted access to vhost '~s'~n",
+ [self(), dynamic_connection_name(ConnName), Username, VHost]),
+ State1;
+handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
+ lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
+ maybe_close(State#v1{connection_state = closing});
+handle_method0(#'connection.close'{},
+ State = #v1{connection = #connection{protocol = Protocol},
+ sock = Sock})
+ when ?IS_STOPPING(State) ->
+ %% We're already closed or closing, so we don't need to cleanup
+ %% anything.
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
+ State;
+handle_method0(#'connection.close_ok'{},
+ State = #v1{connection_state = closed}) ->
+ self() ! terminate_connection,
+ State;
+handle_method0(#'connection.update_secret'{new_secret = NewSecret, reason = Reason},
+ State = #v1{connection =
+ #connection{protocol = Protocol,
+ user = User = #user{username = Username},
+ log_name = ConnName} = Conn,
+ sock = Sock}) when ?IS_RUNNING(State) ->
+ rabbit_log_connection:debug(
+ "connection ~p (~s) of user '~s': "
+ "asked to update secret, reason: ~s~n",
+ [self(), dynamic_connection_name(ConnName), Username, Reason]),
+ case rabbit_access_control:update_state(User, NewSecret) of
+ {ok, User1} ->
+ %% User/auth backend state has been updated. Now we can propagate it to channels
+ %% asynchronously and return. All the channels have to do is to update their
+ %% own state.
+ %%
+ %% Any secret update errors coming from the authz backend will be handled in the other branch.
+ %% Therefore we optimistically do no error handling here. MK.
+ lists:foreach(fun(Ch) ->
+ rabbit_log:debug("Updating user/auth backend state for channel ~p", [Ch]),
+ _ = rabbit_channel:update_user_state(Ch, User1)
+ end, all_channels()),
+ ok = send_on_channel0(Sock, #'connection.update_secret_ok'{}, Protocol),
+ rabbit_log_connection:info(
+ "connection ~p (~s): "
+ "user '~s' updated secret, reason: ~s~n",
+ [self(), dynamic_connection_name(ConnName), Username, Reason]),
+ State#v1{connection = Conn#connection{user = User1}};
+ {refused, Message} ->
+ rabbit_log_connection:error("Secret update was refused for user '~p': ~p",
+ [Username, Message]),
+ rabbit_misc:protocol_error(not_allowed, "New secret was refused by one of the backends", []);
+ {error, Message} ->
+ rabbit_log_connection:error("Secret update for user '~p' failed: ~p",
+ [Username, Message]),
+ rabbit_misc:protocol_error(not_allowed,
+ "Secret update failed", [])
+ end;
+handle_method0(_Method, State) when ?IS_STOPPING(State) ->
+ State;
+handle_method0(_Method, #v1{connection_state = S}) ->
+ rabbit_misc:protocol_error(
+ channel_error, "unexpected method in connection state ~w", [S]).
+
+is_vhost_alive(VHostPath, User) ->
+ case rabbit_vhost_sup_sup:is_vhost_alive(VHostPath) of
+ true -> ok;
+ false ->
+ rabbit_misc:protocol_error(internal_error,
+ "access to vhost '~s' refused for user '~s': "
+ "vhost '~s' is down",
+ [VHostPath, User#user.username, VHostPath])
+ end.
+
+is_over_vhost_connection_limit(VHostPath, User) ->
+ try rabbit_vhost_limit:is_over_connection_limit(VHostPath) of
+ false -> ok;
+ {true, Limit} -> rabbit_misc:protocol_error(not_allowed,
+ "access to vhost '~s' refused for user '~s': "
+ "connection limit (~p) is reached",
+ [VHostPath, User#user.username, Limit])
+ catch
+ throw:{error, {no_such_vhost, VHostPath}} ->
+ rabbit_misc:protocol_error(not_allowed, "vhost ~s not found", [VHostPath])
+ end.
+
+is_over_user_connection_limit(#user{username = Username}) ->
+ case rabbit_auth_backend_internal:is_over_connection_limit(Username) of
+ false -> ok;
+ {true, Limit} -> rabbit_misc:protocol_error(not_allowed,
+ "Connection refused for user '~s': "
+ "user connection limit (~p) is reached",
+ [Username, Limit])
+ end.
+
+validate_negotiated_integer_value(Field, Min, ClientValue) ->
+ ServerValue = get_env(Field),
+ if ClientValue /= 0 andalso ClientValue < Min ->
+ fail_negotiation(Field, min, Min, ClientValue);
+ ServerValue /= 0 andalso (ClientValue =:= 0 orelse
+ ClientValue > ServerValue) ->
+ fail_negotiation(Field, max, ServerValue, ClientValue);
+ true ->
+ ok
+ end.
+
+%% keep dialyzer happy
+-spec fail_negotiation(atom(), 'min' | 'max', integer(), integer()) ->
+ no_return().
+fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) ->
+ {S1, S2} = case MinOrMax of
+ min -> {lower, minimum};
+ max -> {higher, maximum}
+ end,
+ ClientValueDetail = get_client_value_detail(Field, ClientValue),
+ rabbit_misc:protocol_error(
+ not_allowed, "negotiated ~w = ~w~s is ~w than the ~w allowed value (~w)",
+ [Field, ClientValue, ClientValueDetail, S1, S2, ServerValue], 'connection.tune').
+
+get_env(Key) ->
+ {ok, Value} = application:get_env(rabbit, Key),
+ Value.
+
+send_on_channel0(Sock, Method, Protocol) ->
+ ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
+
+auth_mechanism_to_module(TypeBin, Sock) ->
+ case rabbit_registry:binary_to_type(TypeBin) of
+ {error, not_found} ->
+ rabbit_misc:protocol_error(
+ command_invalid, "unknown authentication mechanism '~s'",
+ [TypeBin]);
+ T ->
+ case {lists:member(T, auth_mechanisms(Sock)),
+ rabbit_registry:lookup_module(auth_mechanism, T)} of
+ {true, {ok, Module}} ->
+ Module;
+ _ ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "invalid authentication mechanism '~s'", [T])
+ end
+ end.
+
+auth_mechanisms(Sock) ->
+ {ok, Configured} = application:get_env(auth_mechanisms),
+ [Name || {Name, Module} <- rabbit_registry:lookup_all(auth_mechanism),
+ Module:should_offer(Sock), lists:member(Name, Configured)].
+
+auth_mechanisms_binary(Sock) ->
+ list_to_binary(
+ string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")).
+
+auth_phase(Response,
+ State = #v1{connection = Connection =
+ #connection{protocol = Protocol,
+ auth_mechanism = {Name, AuthMechanism},
+ auth_state = AuthState},
+ sock = Sock}) ->
+ RemoteAddress = list_to_binary(inet:ntoa(Connection#connection.host)),
+ case AuthMechanism:handle_response(Response, AuthState) of
+ {refused, Username, Msg, Args} ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, amqp091),
+ auth_fail(Username, Msg, Args, Name, State);
+ {protocol_error, Msg, Args} ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, amqp091),
+ notify_auth_result(none, user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}],
+ State),
+ rabbit_misc:protocol_error(syntax_error, Msg, Args);
+ {challenge, Challenge, AuthState1} ->
+ rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, amqp091),
+ Secure = #'connection.secure'{challenge = Challenge},
+ ok = send_on_channel0(Sock, Secure, Protocol),
+ State#v1{connection = Connection#connection{
+ auth_state = AuthState1}};
+ {ok, User = #user{username = Username}} ->
+ case rabbit_access_control:check_user_loopback(Username, Sock) of
+ ok ->
+ rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, Username, amqp091),
+ notify_auth_result(Username, user_authentication_success,
+ [], State);
+ not_allowed ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, amqp091),
+ auth_fail(Username, "user '~s' can only connect via "
+ "localhost", [Username], Name, State)
+ end,
+ Tune = #'connection.tune'{frame_max = get_env(frame_max),
+ channel_max = get_env(channel_max),
+ heartbeat = get_env(heartbeat)},
+ ok = send_on_channel0(Sock, Tune, Protocol),
+ State#v1{connection_state = tuning,
+ connection = Connection#connection{user = User,
+ auth_state = none}}
+ end.
+
+-spec auth_fail
+ (rabbit_types:username() | none, string(), [any()], binary(), #v1{}) ->
+ no_return().
+
+auth_fail(Username, Msg, Args, AuthName,
+ State = #v1{connection = #connection{protocol = Protocol,
+ capabilities = Capabilities}}) ->
+ notify_auth_result(Username, user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}], State),
+ AmqpError = rabbit_misc:amqp_error(
+ access_refused, "~s login refused: ~s",
+ [AuthName, io_lib:format(Msg, Args)], none),
+ case rabbit_misc:table_lookup(Capabilities,
+ <<"authentication_failure_close">>) of
+ {bool, true} ->
+ SafeMsg = io_lib:format(
+ "Login was refused using authentication "
+ "mechanism ~s. For details see the broker "
+ "logfile.", [AuthName]),
+ AmqpError1 = AmqpError#amqp_error{explanation = SafeMsg},
+ {0, CloseMethod} = rabbit_binary_generator:map_exception(
+ 0, AmqpError1, Protocol),
+ ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol);
+ _ -> ok
+ end,
+ rabbit_misc:protocol_error(AmqpError).
+
+notify_auth_result(Username, AuthResult, ExtraProps, State) ->
+ EventProps = [{connection_type, network},
+ {name, case Username of none -> ''; _ -> Username end}] ++
+ [case Item of
+ name -> {connection_name, i(name, State)};
+ _ -> {Item, i(Item, State)}
+ end || Item <- ?AUTH_NOTIFICATION_INFO_KEYS] ++
+ ExtraProps,
+ rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
+
+%%--------------------------------------------------------------------------
+
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(pid, #v1{}) -> self();
+i(node, #v1{}) -> node();
+i(SockStat, S) when SockStat =:= recv_oct;
+ SockStat =:= recv_cnt;
+ SockStat =:= send_oct;
+ SockStat =:= send_cnt;
+ SockStat =:= send_pend ->
+ socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end,
+ fun ([{_, I}]) -> I end, S);
+i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock);
+i(ssl_protocol, S) -> ssl_info(fun ({P, _}) -> P end, S);
+i(ssl_key_exchange, S) -> ssl_info(fun ({_, {K, _, _}}) -> K end, S);
+i(ssl_cipher, S) -> ssl_info(fun ({_, {_, C, _}}) -> C end, S);
+i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S);
+i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
+i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
+i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
+i(channels, #v1{channel_count = ChannelCount}) -> ChannelCount;
+i(state, #v1{connection_state = ConnectionState,
+ throttle = #throttle{blocked_by = Reasons,
+ last_blocked_at = T} = Throttle}) ->
+ %% not throttled by resource or other longer-term reasons
+ %% TODO: come up with a sensible function name
+ case sets:size(sets:del_element(flow, Reasons)) =:= 0 andalso
+ (credit_flow:blocked() %% throttled by flow now
+ orelse %% throttled by flow recently
+ (is_blocked_by_flow(Throttle) andalso T =/= never andalso
+ erlang:convert_time_unit(erlang:monotonic_time() - T,
+ native,
+ micro_seconds) < 5000000)) of
+ true -> flow;
+ false ->
+ case {has_reasons_to_block(Throttle), ConnectionState} of
+ %% blocked
+ {_, blocked} -> blocked;
+ %% not yet blocked (there were no publishes)
+ {true, running} -> blocking;
+ %% not blocked
+ {false, _} -> ConnectionState;
+ %% catch all to be defensive
+ _ -> ConnectionState
+ end
+ end;
+i(garbage_collection, _State) ->
+ rabbit_misc:get_gc_info(self());
+i(reductions, _State) ->
+ {reductions, Reductions} = erlang:process_info(self(), reductions),
+ Reductions;
+i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
+
+ic(name, #connection{name = Name}) -> Name;
+ic(host, #connection{host = Host}) -> Host;
+ic(peer_host, #connection{peer_host = PeerHost}) -> PeerHost;
+ic(port, #connection{port = Port}) -> Port;
+ic(peer_port, #connection{peer_port = PeerPort}) -> PeerPort;
+ic(protocol, #connection{protocol = none}) -> none;
+ic(protocol, #connection{protocol = P}) -> P:version();
+ic(user, #connection{user = none}) -> '';
+ic(user, #connection{user = U}) -> U#user.username;
+ic(user_who_performed_action, C) -> ic(user, C);
+ic(vhost, #connection{vhost = VHost}) -> VHost;
+ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout;
+ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax;
+ic(channel_max, #connection{channel_max = ChMax}) -> ChMax;
+ic(client_properties, #connection{client_properties = CP}) -> CP;
+ic(auth_mechanism, #connection{auth_mechanism = none}) -> none;
+ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name;
+ic(connected_at, #connection{connected_at = T}) -> T;
+ic(Item, #connection{}) -> throw({bad_argument, Item}).
+
+socket_info(Get, Select, #v1{sock = Sock}) ->
+ case Get(Sock) of
+ {ok, T} -> case Select(T) of
+ N when is_number(N) -> N;
+ _ -> 0
+ end;
+ {error, _} -> 0
+ end.
+
+ssl_info(F, #v1{sock = Sock}) ->
+ case rabbit_net:ssl_info(Sock) of
+ nossl -> '';
+ {error, _} -> '';
+ {ok, Items} ->
+ P = proplists:get_value(protocol, Items),
+ #{cipher := C,
+ key_exchange := K,
+ mac := H} = proplists:get_value(selected_cipher_suite, Items),
+ F({P, {K, C, H}})
+ end.
+
+cert_info(F, #v1{sock = Sock}) ->
+ case rabbit_net:peercert(Sock) of
+ nossl -> '';
+ {error, _} -> '';
+ {ok, Cert} -> list_to_binary(F(Cert))
+ end.
+
+maybe_emit_stats(State) ->
+ rabbit_event:if_enabled(State, #v1.stats_timer,
+ fun() -> emit_stats(State) end).
+
+emit_stats(State) ->
+ [{_, Pid}, {_, Recv_oct}, {_, Send_oct}, {_, Reductions}] = I
+ = infos(?SIMPLE_METRICS, State),
+ Infos = infos(?OTHER_METRICS, State),
+ rabbit_core_metrics:connection_stats(Pid, Infos),
+ rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions),
+ rabbit_event:notify(connection_stats, Infos ++ I),
+ State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer),
+ ensure_stats_timer(State1).
+
+%% 1.0 stub
+-spec become_1_0(non_neg_integer(), #v1{}) -> no_return().
+
+become_1_0(Id, State = #v1{sock = Sock}) ->
+ case code:is_loaded(rabbit_amqp1_0_reader) of
+ false -> refuse_connection(Sock, amqp1_0_plugin_not_enabled);
+ _ -> Mode = case Id of
+ 0 -> amqp;
+ 3 -> sasl;
+ _ -> refuse_connection(
+ Sock, {unsupported_amqp1_0_protocol_id, Id},
+ {3, 1, 0, 0})
+ end,
+ F = fun (_Deb, Buf, BufLen, S) ->
+ {rabbit_amqp1_0_reader, init,
+ [Mode, pack_for_1_0(Buf, BufLen, S)]}
+ end,
+ State#v1{connection_state = {become, F}}
+ end.
+
+pack_for_1_0(Buf, BufLen, #v1{parent = Parent,
+ sock = Sock,
+ recv_len = RecvLen,
+ pending_recv = PendingRecv,
+ helper_sup = SupPid,
+ proxy_socket = ProxySocket}) ->
+ {Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen, ProxySocket}.
+
+respond_and_close(State, Channel, Protocol, Reason, LogErr) ->
+ log_hard_error(State, Channel, LogErr),
+ send_error_on_channel0_and_close(Channel, Protocol, Reason, State).
+
+send_error_on_channel0_and_close(Channel, Protocol, Reason, State) ->
+ {0, CloseMethod} =
+ rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
+ State1 = close_connection(terminate_channels(State)),
+ ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol),
+ State1.
+
+%%
+%% Publisher throttling
+%%
+
+blocked_by_message(#throttle{blocked_by = Reasons}) ->
+ %% we don't want to report internal flow as a reason here since
+ %% it is entirely transient
+ Reasons1 = sets:del_element(flow, Reasons),
+ RStr = string:join([format_blocked_by(R) || R <- sets:to_list(Reasons1)], " & "),
+ list_to_binary(rabbit_misc:format("low on ~s", [RStr])).
+
+format_blocked_by({resource, memory}) -> "memory";
+format_blocked_by({resource, disk}) -> "disk";
+format_blocked_by({resource, disc}) -> "disk".
+
+update_last_blocked_at(Throttle) ->
+ Throttle#throttle{last_blocked_at = erlang:monotonic_time()}.
+
+connection_blocked_message_sent(
+ #throttle{connection_blocked_message_sent = BS}) -> BS.
+
+should_send_blocked(Throttle = #throttle{blocked_by = Reasons}) ->
+ should_block(Throttle)
+ andalso
+ sets:size(sets:del_element(flow, Reasons)) =/= 0
+ andalso
+ not connection_blocked_message_sent(Throttle).
+
+should_send_unblocked(Throttle = #throttle{blocked_by = Reasons}) ->
+ connection_blocked_message_sent(Throttle)
+ andalso
+ sets:size(sets:del_element(flow, Reasons)) == 0.
+
+%% Returns true if we have a reason to block
+%% this connection.
+has_reasons_to_block(#throttle{blocked_by = Reasons}) ->
+ sets:size(Reasons) > 0.
+
+is_blocked_by_flow(#throttle{blocked_by = Reasons}) ->
+ sets:is_element(flow, Reasons).
+
+should_block(#throttle{should_block = Val}) -> Val.
+
+should_block_connection(Throttle) ->
+ should_block(Throttle) andalso has_reasons_to_block(Throttle).
+
+should_unblock_connection(Throttle) ->
+ not should_block_connection(Throttle).
+
+maybe_block(State = #v1{connection_state = CS, throttle = Throttle}) ->
+ case should_block_connection(Throttle) of
+ true ->
+ State1 = State#v1{connection_state = blocked,
+ throttle = update_last_blocked_at(Throttle)},
+ case CS of
+ running ->
+ ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater);
+ _ -> ok
+ end,
+ maybe_send_blocked_or_unblocked(State1);
+ false -> State
+ end.
+
+maybe_unblock(State = #v1{throttle = Throttle}) ->
+ case should_unblock_connection(Throttle) of
+ true ->
+ ok = rabbit_heartbeat:resume_monitor(State#v1.heartbeater),
+ State1 = State#v1{connection_state = running,
+ throttle = Throttle#throttle{should_block = false}},
+ maybe_send_unblocked(State1);
+ false -> State
+ end.
+
+maybe_send_unblocked(State = #v1{throttle = Throttle}) ->
+ case should_send_unblocked(Throttle) of
+ true ->
+ ok = send_unblocked(State),
+ State#v1{throttle =
+ Throttle#throttle{connection_blocked_message_sent = false}};
+ false -> State
+ end.
+
+maybe_send_blocked_or_unblocked(State = #v1{throttle = Throttle}) ->
+ case should_send_blocked(Throttle) of
+ true ->
+ ok = send_blocked(State, blocked_by_message(Throttle)),
+ State#v1{throttle =
+ Throttle#throttle{connection_blocked_message_sent = true}};
+ false -> maybe_send_unblocked(State)
+ end.
+
+publish_received(State = #v1{throttle = Throttle}) ->
+ case has_reasons_to_block(Throttle) of
+ false -> State;
+ true ->
+ Throttle1 = Throttle#throttle{should_block = true},
+ maybe_block(State#v1{throttle = Throttle1})
+ end.
+
+control_throttle(State = #v1{connection_state = CS,
+ throttle = #throttle{blocked_by = Reasons} = Throttle}) ->
+ Throttle1 = case credit_flow:blocked() of
+ true ->
+ Throttle#throttle{blocked_by = sets:add_element(flow, Reasons)};
+ false ->
+ Throttle#throttle{blocked_by = sets:del_element(flow, Reasons)}
+ end,
+ State1 = State#v1{throttle = Throttle1},
+ case CS of
+ running -> maybe_block(State1);
+ %% unblock or re-enable blocking
+ blocked -> maybe_block(maybe_unblock(State1));
+ _ -> State1
+ end.
+
+augment_connection_log_name(#connection{name = Name} = Connection) ->
+ case user_provided_connection_name(Connection) of
+ undefined ->
+ Connection;
+ UserSpecifiedName ->
+ LogName = <<Name/binary, " - ", UserSpecifiedName/binary>>,
+ rabbit_log_connection:info("Connection ~p (~s) has a client-provided name: ~s~n", [self(), Name, UserSpecifiedName]),
+ ?store_proc_name(LogName),
+ Connection#connection{log_name = LogName}
+ end.
+
+augment_infos_with_user_provided_connection_name(Infos, #v1{connection = Connection}) ->
+ case user_provided_connection_name(Connection) of
+ undefined ->
+ Infos;
+ UserProvidedConnectionName ->
+ [{user_provided_name, UserProvidedConnectionName} | Infos]
+ end.
+
+user_provided_connection_name(#connection{client_properties = ClientProperties}) ->
+ case rabbit_misc:table_lookup(ClientProperties, <<"connection_name">>) of
+ {longstr, UserSpecifiedName} ->
+ UserSpecifiedName;
+ _ ->
+ undefined
+ end.
+
+dynamic_connection_name(Default) ->
+ case rabbit_misc:get_proc_name() of
+ {ok, Name} ->
+ Name;
+ _ ->
+ Default
+ end.
+
+handle_uncontrolled_channel_close(ChPid) ->
+ rabbit_core_metrics:channel_closed(ChPid),
+ rabbit_event:notify(channel_closed, [{pid, ChPid}]).
+
+-spec get_client_value_detail(atom(), integer()) -> string().
+get_client_value_detail(channel_max, 0) ->
+ " (no limit)";
+get_client_value_detail(_Field, _ClientValue) ->
+ "".