summaryrefslogtreecommitdiff
path: root/src/rabbit_reader.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_reader.erl')
-rw-r--r--src/rabbit_reader.erl1217
1 files changed, 0 insertions, 1217 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
deleted file mode 100644
index 2033dd14..00000000
--- a/src/rabbit_reader.erl
+++ /dev/null
@@ -1,1217 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
-%%
-
--module(rabbit_reader).
--include("rabbit_framing.hrl").
--include("rabbit.hrl").
-
--export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/2,
- shutdown/2]).
-
--export([system_continue/3, system_terminate/4, system_code_change/4]).
-
--export([init/2, mainloop/4, recvloop/4]).
-
--export([conserve_resources/3, server_properties/1]).
-
--define(NORMAL_TIMEOUT, 3).
--define(CLOSING_TIMEOUT, 30).
--define(CHANNEL_TERMINATION_TIMEOUT, 3).
--define(SILENT_CLOSE_DELAY, 3).
--define(CHANNEL_MIN, 1).
-
-%%--------------------------------------------------------------------------
-
--record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
- connection_state, helper_sup, queue_collector, heartbeater,
- stats_timer, channel_sup_sup_pid, channel_count, throttle}).
-
--record(connection, {name, host, peer_host, port, peer_port,
- protocol, user, timeout_sec, frame_max, channel_max, vhost,
- client_properties, capabilities,
- auth_mechanism, auth_state, connected_at}).
-
--record(throttle, {alarmed_by, last_blocked_by, last_blocked_at}).
-
--define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
- send_pend, state, channels]).
-
--define(CREATION_EVENT_KEYS,
- [pid, name, port, peer_port, host,
- peer_host, ssl, peer_cert_subject, peer_cert_issuer,
- peer_cert_validity, auth_mechanism, ssl_protocol,
- ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
- timeout, frame_max, channel_max, client_properties, connected_at]).
-
--define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
-
--define(IS_RUNNING(State),
- (State#v1.connection_state =:= running orelse
- State#v1.connection_state =:= blocking orelse
- State#v1.connection_state =:= blocked)).
-
--define(IS_STOPPING(State),
- (State#v1.connection_state =:= closing orelse
- State#v1.connection_state =:= closed)).
-
-%%--------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--spec(start_link/1 :: (pid()) -> rabbit_types:ok(pid())).
--spec(info_keys/0 :: () -> rabbit_types:info_keys()).
--spec(info/1 :: (pid()) -> rabbit_types:infos()).
--spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
--spec(force_event_refresh/2 :: (pid(), reference()) -> 'ok').
--spec(shutdown/2 :: (pid(), string()) -> 'ok').
--spec(conserve_resources/3 :: (pid(), atom(), boolean()) -> 'ok').
--spec(server_properties/1 :: (rabbit_types:protocol()) ->
- rabbit_framing:amqp_table()).
-
-%% These specs only exists to add no_return() to keep dialyzer happy
--spec(init/2 :: (pid(), pid()) -> no_return()).
--spec(start_connection/5 ::
- (pid(), pid(), any(), rabbit_net:socket(),
- fun ((rabbit_net:socket()) ->
- rabbit_types:ok_or_error2(
- rabbit_net:socket(), any()))) -> no_return()).
-
--spec(mainloop/4 :: (_,[binary()], non_neg_integer(), #v1{}) -> any()).
--spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}).
--spec(system_continue/3 :: (_,_,{[binary()], non_neg_integer(), #v1{}}) ->
- any()).
--spec(system_terminate/4 :: (_,_,_,_) -> none()).
-
--endif.
-
-%%--------------------------------------------------------------------------
-
-start_link(HelperSup) ->
- {ok, proc_lib:spawn_link(?MODULE, init, [self(), HelperSup])}.
-
-shutdown(Pid, Explanation) ->
- gen_server:call(Pid, {shutdown, Explanation}, infinity).
-
-init(Parent, HelperSup) ->
- Deb = sys:debug_options([]),
- receive
- {go, Sock, SockTransform} ->
- start_connection(Parent, HelperSup, Deb, Sock, SockTransform)
- end.
-
-system_continue(Parent, Deb, {Buf, BufLen, State}) ->
- mainloop(Deb, Buf, BufLen, State#v1{parent = Parent}).
-
-system_terminate(Reason, _Parent, _Deb, _State) ->
- exit(Reason).
-
-system_code_change(Misc, _Module, _OldVsn, _Extra) ->
- {ok, Misc}.
-
-info_keys() -> ?INFO_KEYS.
-
-info(Pid) ->
- gen_server:call(Pid, info, infinity).
-
-info(Pid, Items) ->
- case gen_server:call(Pid, {info, Items}, infinity) of
- {ok, Res} -> Res;
- {error, Error} -> throw(Error)
- end.
-
-force_event_refresh(Pid, Ref) ->
- gen_server:cast(Pid, {force_event_refresh, Ref}).
-
-conserve_resources(Pid, Source, Conserve) ->
- Pid ! {conserve_resources, Source, Conserve},
- ok.
-
-server_properties(Protocol) ->
- {ok, Product} = application:get_key(rabbit, id),
- {ok, Version} = application:get_key(rabbit, vsn),
-
- %% Get any configuration-specified server properties
- {ok, RawConfigServerProps} = application:get_env(rabbit,
- server_properties),
-
- %% Normalize the simplifed (2-tuple) and unsimplified (3-tuple) forms
- %% from the config and merge them with the generated built-in properties
- NormalizedConfigServerProps =
- [{<<"capabilities">>, table, server_capabilities(Protocol)} |
- [case X of
- {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)),
- longstr,
- maybe_list_to_binary(Value)};
- {BinKey, Type, Value} -> {BinKey, Type, Value}
- end || X <- RawConfigServerProps ++
- [{product, Product},
- {version, Version},
- {cluster_name, rabbit_nodes:cluster_name()},
- {platform, "Erlang/OTP"},
- {copyright, ?COPYRIGHT_MESSAGE},
- {information, ?INFORMATION_MESSAGE}]]],
-
- %% Filter duplicated properties in favour of config file provided values
- lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end,
- NormalizedConfigServerProps).
-
-maybe_list_to_binary(V) when is_binary(V) -> V;
-maybe_list_to_binary(V) when is_list(V) -> list_to_binary(V).
-
-server_capabilities(rabbit_framing_amqp_0_9_1) ->
- [{<<"publisher_confirms">>, bool, true},
- {<<"exchange_exchange_bindings">>, bool, true},
- {<<"basic.nack">>, bool, true},
- {<<"consumer_cancel_notify">>, bool, true},
- {<<"connection.blocked">>, bool, true},
- {<<"consumer_priorities">>, bool, true},
- {<<"authentication_failure_close">>, bool, true},
- {<<"per_consumer_qos">>, bool, true}];
-server_capabilities(_) ->
- [].
-
-%%--------------------------------------------------------------------------
-
-log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
-
-socket_error(Reason) when is_atom(Reason) ->
- log(error, "Error on AMQP connection ~p: ~s~n",
- [self(), rabbit_misc:format_inet_error(Reason)]);
-socket_error(Reason) ->
- log(error, "Error on AMQP connection ~p:~n~p~n", [self(), Reason]).
-
-inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
-
-socket_op(Sock, Fun) ->
- case Fun(Sock) of
- {ok, Res} -> Res;
- {error, Reason} -> socket_error(Reason),
- %% NB: this is tcp socket, even in case of ssl
- rabbit_net:fast_close(Sock),
- exit(normal)
- end.
-
-start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
- process_flag(trap_exit, true),
- Name = case rabbit_net:connection_string(Sock, inbound) of
- {ok, Str} -> Str;
- {error, enotconn} -> rabbit_net:fast_close(Sock),
- exit(normal);
- {error, Reason} -> socket_error(Reason),
- rabbit_net:fast_close(Sock),
- exit(normal)
- end,
- log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]),
- {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout),
- ClientSock = socket_op(Sock, SockTransform),
- erlang:send_after(HandshakeTimeout, self(), handshake_timeout),
- {PeerHost, PeerPort, Host, Port} =
- socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
- ?store_proc_name(list_to_binary(Name)),
- State = #v1{parent = Parent,
- sock = ClientSock,
- connection = #connection{
- name = list_to_binary(Name),
- host = Host,
- peer_host = PeerHost,
- port = Port,
- peer_port = PeerPort,
- protocol = none,
- user = none,
- timeout_sec = (HandshakeTimeout / 1000),
- frame_max = ?FRAME_MIN_SIZE,
- vhost = none,
- client_properties = none,
- capabilities = [],
- auth_mechanism = none,
- auth_state = none,
- connected_at = rabbit_misc:now_to_ms(os:timestamp())},
- callback = uninitialized_callback,
- recv_len = 0,
- pending_recv = false,
- connection_state = pre_init,
- queue_collector = undefined, %% started on tune-ok
- helper_sup = HelperSup,
- heartbeater = none,
- channel_sup_sup_pid = none,
- channel_count = 0,
- throttle = #throttle{
- alarmed_by = [],
- last_blocked_by = none,
- last_blocked_at = never}},
- try
- run({?MODULE, recvloop,
- [Deb, [], 0, switch_callback(rabbit_event:init_stats_timer(
- State, #v1.stats_timer),
- handshake, 8)]}),
- log(info, "closing AMQP connection ~p (~s)~n", [self(), Name])
- catch
- Ex -> log(case Ex of
- connection_closed_abruptly -> warning;
- _ -> error
- end, "closing AMQP connection ~p (~s):~n~p~n",
- [self(), Name, Ex])
- after
- %% We don't call gen_tcp:close/1 here since it waits for
- %% pending output to be sent, which results in unnecessary
- %% delays. We could just terminate - the reader is the
- %% controlling process and hence its termination will close
- %% the socket. However, to keep the file_handle_cache
- %% accounting as accurate as possible we ought to close the
- %% socket w/o delay before termination.
- rabbit_net:fast_close(ClientSock),
- rabbit_networking:unregister_connection(self()),
- rabbit_event:notify(connection_closed, [{pid, self()}])
- end,
- done.
-
-run({M, F, A}) ->
- try apply(M, F, A)
- catch {become, MFA} -> run(MFA)
- end.
-
-recvloop(Deb, Buf, BufLen, State = #v1{pending_recv = true}) ->
- mainloop(Deb, Buf, BufLen, State);
-recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) ->
- mainloop(Deb, Buf, BufLen, State);
-recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) ->
- throw({become, F(Deb, Buf, BufLen, State)});
-recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen})
- when BufLen < RecvLen ->
- case rabbit_net:setopts(Sock, [{active, once}]) of
- ok -> mainloop(Deb, Buf, BufLen,
- State#v1{pending_recv = true});
- {error, Reason} -> stop(Reason, State)
- end;
-recvloop(Deb, [B], _BufLen, State) ->
- {Rest, State1} = handle_input(State#v1.callback, B, State),
- recvloop(Deb, [Rest], size(Rest), State1);
-recvloop(Deb, Buf, BufLen, State = #v1{recv_len = RecvLen}) ->
- {DataLRev, RestLRev} = binlist_split(BufLen - RecvLen, Buf, []),
- Data = list_to_binary(lists:reverse(DataLRev)),
- {<<>>, State1} = handle_input(State#v1.callback, Data, State),
- recvloop(Deb, lists:reverse(RestLRev), BufLen - RecvLen, State1).
-
-binlist_split(0, L, Acc) ->
- {L, Acc};
-binlist_split(Len, L, [Acc0|Acc]) when Len < 0 ->
- {H, T} = split_binary(Acc0, -Len),
- {[H|L], [T|Acc]};
-binlist_split(Len, [H|T], Acc) ->
- binlist_split(Len - size(H), T, [H|Acc]).
-
-mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) ->
- case rabbit_net:recv(Sock) of
- {data, Data} ->
- recvloop(Deb, [Data | Buf], BufLen + size(Data),
- State#v1{pending_recv = false});
- closed when State#v1.connection_state =:= closed ->
- ok;
- closed ->
- stop(closed, State);
- {error, Reason} ->
- stop(Reason, State);
- {other, {system, From, Request}} ->
- sys:handle_system_msg(Request, From, State#v1.parent,
- ?MODULE, Deb, {Buf, BufLen, State});
- {other, Other} ->
- case handle_other(Other, State) of
- stop -> ok;
- NewState -> recvloop(Deb, Buf, BufLen, NewState)
- end
- end.
-
-stop(closed, State) -> maybe_emit_stats(State),
- throw(connection_closed_abruptly);
-stop(Reason, State) -> maybe_emit_stats(State),
- throw({inet_error, Reason}).
-
-handle_other({conserve_resources, Source, Conserve},
- State = #v1{throttle = Throttle = #throttle{alarmed_by = CR}}) ->
- CR1 = case Conserve of
- true -> lists:usort([Source | CR]);
- false -> CR -- [Source]
- end,
- State1 = control_throttle(
- State#v1{throttle = Throttle#throttle{alarmed_by = CR1}}),
- case {blocked_by_alarm(State), blocked_by_alarm(State1)} of
- {false, true} -> ok = send_blocked(State1);
- {true, false} -> ok = send_unblocked(State1);
- {_, _} -> ok
- end,
- State1;
-handle_other({channel_closing, ChPid}, State) ->
- ok = rabbit_channel:ready_for_close(ChPid),
- {_, State1} = channel_cleanup(ChPid, State),
- maybe_close(control_throttle(State1));
-handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
- terminate(io_lib:format("broker forced connection closure "
- "with reason '~w'", [Reason]), State),
- %% this is what we are expected to do according to
- %% http://www.erlang.org/doc/man/sys.html
- %%
- %% If we wanted to be *really* nice we should wait for a while for
- %% clients to close the socket at their end, just as we do in the
- %% ordinary error case. However, since this termination is
- %% initiated by our parent it is probably more important to exit
- %% quickly.
- maybe_emit_stats(State),
- exit(Reason);
-handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, State) ->
- maybe_emit_stats(State),
- throw(E);
-handle_other({channel_exit, Channel, Reason}, State) ->
- handle_exception(State, Channel, Reason);
-handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) ->
- handle_dependent_exit(ChPid, Reason, State);
-handle_other(terminate_connection, State) ->
- maybe_emit_stats(State),
- stop;
-handle_other(handshake_timeout, State)
- when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) ->
- State;
-handle_other(handshake_timeout, State) ->
- maybe_emit_stats(State),
- throw({handshake_timeout, State#v1.callback});
-handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
- State;
-handle_other(heartbeat_timeout, State = #v1{connection_state = S}) ->
- maybe_emit_stats(State),
- throw({heartbeat_timeout, S});
-handle_other({'$gen_call', From, {shutdown, Explanation}}, State) ->
- {ForceTermination, NewState} = terminate(Explanation, State),
- gen_server:reply(From, ok),
- case ForceTermination of
- force -> stop;
- normal -> NewState
- end;
-handle_other({'$gen_call', From, info}, State) ->
- gen_server:reply(From, infos(?INFO_KEYS, State)),
- State;
-handle_other({'$gen_call', From, {info, Items}}, State) ->
- gen_server:reply(From, try {ok, infos(Items, State)}
- catch Error -> {error, Error}
- end),
- State;
-handle_other({'$gen_cast', {force_event_refresh, Ref}}, State)
- when ?IS_RUNNING(State) ->
- rabbit_event:notify(
- connection_created,
- [{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref),
- rabbit_event:init_stats_timer(State, #v1.stats_timer);
-handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
- %% Ignore, we will emit a created event once we start running.
- State;
-handle_other(ensure_stats, State) ->
- ensure_stats_timer(State);
-handle_other(emit_stats, State) ->
- emit_stats(State);
-handle_other({bump_credit, Msg}, State) ->
- credit_flow:handle_bump_msg(Msg),
- control_throttle(State);
-handle_other(Other, State) ->
- %% internal error -> something worth dying for
- maybe_emit_stats(State),
- exit({unexpected_message, Other}).
-
-switch_callback(State, Callback, Length) ->
- State#v1{callback = Callback, recv_len = Length}.
-
-terminate(Explanation, State) when ?IS_RUNNING(State) ->
- {normal, handle_exception(State, 0,
- rabbit_misc:amqp_error(
- connection_forced, Explanation, [], none))};
-terminate(_Explanation, State) ->
- {force, State}.
-
-control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
- IsThrottled = ((Throttle#throttle.alarmed_by =/= []) orelse
- credit_flow:blocked()),
- case {CS, IsThrottled} of
- {running, true} -> State#v1{connection_state = blocking};
- {blocking, false} -> State#v1{connection_state = running};
- {blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
- State#v1.heartbeater),
- State#v1{connection_state = running};
- {blocked, true} -> State#v1{throttle = update_last_blocked_by(
- Throttle)};
- {_, _} -> State
- end.
-
-maybe_block(State = #v1{connection_state = blocking,
- throttle = Throttle}) ->
- ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
- State1 = State#v1{connection_state = blocked,
- throttle = update_last_blocked_by(
- Throttle#throttle{
- last_blocked_at = erlang:now()})},
- case {blocked_by_alarm(State), blocked_by_alarm(State1)} of
- {false, true} -> ok = send_blocked(State1);
- {_, _} -> ok
- end,
- State1;
-maybe_block(State) ->
- State.
-
-
-blocked_by_alarm(#v1{connection_state = blocked,
- throttle = #throttle{alarmed_by = CR}})
- when CR =/= [] ->
- true;
-blocked_by_alarm(#v1{}) ->
- false.
-
-send_blocked(#v1{throttle = #throttle{alarmed_by = CR},
- connection = #connection{protocol = Protocol,
- capabilities = Capabilities},
- sock = Sock}) ->
- case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
- {bool, true} ->
- RStr = string:join([atom_to_list(A) || A <- CR], " & "),
- Reason = list_to_binary(rabbit_misc:format("low on ~s", [RStr])),
- ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason},
- Protocol);
- _ ->
- ok
- end.
-
-send_unblocked(#v1{connection = #connection{protocol = Protocol,
- capabilities = Capabilities},
- sock = Sock}) ->
- case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
- {bool, true} ->
- ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol);
- _ ->
- ok
- end.
-
-update_last_blocked_by(Throttle = #throttle{alarmed_by = []}) ->
- Throttle#throttle{last_blocked_by = flow};
-update_last_blocked_by(Throttle) ->
- Throttle#throttle{last_blocked_by = resource}.
-
-%%--------------------------------------------------------------------------
-%% error handling / termination
-
-close_connection(State = #v1{queue_collector = Collector,
- connection = #connection{
- timeout_sec = TimeoutSec}}) ->
- %% The spec says "Exclusive queues may only be accessed by the
- %% current connection, and are deleted when that connection
- %% closes." This does not strictly imply synchrony, but in
- %% practice it seems to be what people assume.
- rabbit_queue_collector:delete_all(Collector),
- %% We terminate the connection after the specified interval, but
- %% no later than ?CLOSING_TIMEOUT seconds.
- erlang:send_after((if TimeoutSec > 0 andalso
- TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec;
- true -> ?CLOSING_TIMEOUT
- end) * 1000, self(), terminate_connection),
- State#v1{connection_state = closed}.
-
-handle_dependent_exit(ChPid, Reason, State) ->
- {Channel, State1} = channel_cleanup(ChPid, State),
- case {Channel, termination_kind(Reason)} of
- {undefined, controlled} -> State1;
- {undefined, uncontrolled} -> exit({abnormal_dependent_exit,
- ChPid, Reason});
- {_, controlled} -> maybe_close(control_throttle(State1));
- {_, uncontrolled} -> State2 = handle_exception(
- State1, Channel, Reason),
- maybe_close(control_throttle(State2))
- end.
-
-terminate_channels(#v1{channel_count = 0} = State) ->
- State;
-terminate_channels(#v1{channel_count = ChannelCount} = State) ->
- lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
- Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * ChannelCount,
- TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
- wait_for_channel_termination(ChannelCount, TimerRef, State).
-
-wait_for_channel_termination(0, TimerRef, State) ->
- case erlang:cancel_timer(TimerRef) of
- false -> receive
- cancel_wait -> State
- end;
- _ -> State
- end;
-wait_for_channel_termination(N, TimerRef,
- State = #v1{connection_state = CS,
- connection = #connection{
- name = ConnName,
- user = User,
- vhost = VHost}}) ->
- receive
- {'DOWN', _MRef, process, ChPid, Reason} ->
- {Channel, State1} = channel_cleanup(ChPid, State),
- case {Channel, termination_kind(Reason)} of
- {undefined, _} ->
- exit({abnormal_dependent_exit, ChPid, Reason});
- {_, controlled} ->
- wait_for_channel_termination(N-1, TimerRef, State1);
- {_, uncontrolled} ->
- log(error, "Error on AMQP connection ~p (~s, vhost: '~s',"
- " user: '~s', state: ~p), channel ~p:"
- "error while terminating:~n~p~n",
- [self(), ConnName, VHost, User#user.username,
- CS, Channel, Reason]),
- wait_for_channel_termination(N-1, TimerRef, State1)
- end;
- cancel_wait ->
- exit(channel_termination_timeout)
- end.
-
-maybe_close(State = #v1{connection_state = closing,
- channel_count = 0,
- connection = #connection{protocol = Protocol},
- sock = Sock}) ->
- NewState = close_connection(State),
- ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
- NewState;
-maybe_close(State) ->
- State.
-
-termination_kind(normal) -> controlled;
-termination_kind(_) -> uncontrolled.
-
-log_hard_error(#v1{connection_state = CS,
- connection = #connection{
- name = ConnName,
- user = User,
- vhost = VHost}}, Channel, Reason) ->
- log(error,
- "Error on AMQP connection ~p (~s, vhost: '~s',"
- " user: '~s', state: ~p), channel ~p:~n~p~n",
- [self(), ConnName, VHost, User#user.username, CS, Channel, Reason]).
-
-handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
- log_hard_error(State, Channel, Reason),
- State;
-handle_exception(State = #v1{connection = #connection{protocol = Protocol},
- connection_state = CS},
- Channel, Reason)
- when ?IS_RUNNING(State) orelse CS =:= closing ->
- log_hard_error(State, Channel, Reason),
- {0, CloseMethod} =
- rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
- State1 = close_connection(terminate_channels(State)),
- ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
- State1;
-handle_exception(State, Channel, Reason) ->
- %% We don't trust the client at this point - force them to wait
- %% for a bit so they can't DOS us with repeated failed logins etc.
- timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({handshake_error, State#v1.connection_state, Channel, Reason}).
-
-%% we've "lost sync" with the client and hence must not accept any
-%% more input
-fatal_frame_error(Error, Type, Channel, Payload, State) ->
- frame_error(Error, Type, Channel, Payload, State),
- %% grace period to allow transmission of error
- timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw(fatal_frame_error).
-
-frame_error(Error, Type, Channel, Payload, State) ->
- {Str, Bin} = payload_snippet(Payload),
- handle_exception(State, Channel,
- rabbit_misc:amqp_error(frame_error,
- "type ~p, ~s octets = ~p: ~p",
- [Type, Str, Bin, Error], none)).
-
-unexpected_frame(Type, Channel, Payload, State) ->
- {Str, Bin} = payload_snippet(Payload),
- handle_exception(State, Channel,
- rabbit_misc:amqp_error(unexpected_frame,
- "type ~p, ~s octets = ~p",
- [Type, Str, Bin], none)).
-
-payload_snippet(Payload) when size(Payload) =< 16 ->
- {"all", Payload};
-payload_snippet(<<Snippet:16/binary, _/binary>>) ->
- {"first 16", Snippet}.
-
-%%--------------------------------------------------------------------------
-
-create_channel(_Channel,
- #v1{channel_count = ChannelCount,
- connection = #connection{channel_max = ChannelMax}})
- when ChannelMax /= 0 andalso ChannelCount >= ChannelMax ->
- {error, rabbit_misc:amqp_error(
- not_allowed, "number of channels opened (~w) has reached the "
- "negotiated channel_max (~w)",
- [ChannelCount, ChannelMax], 'none')};
-create_channel(Channel,
- #v1{sock = Sock,
- queue_collector = Collector,
- channel_sup_sup_pid = ChanSupSup,
- channel_count = ChannelCount,
- connection =
- #connection{name = Name,
- protocol = Protocol,
- frame_max = FrameMax,
- user = User,
- vhost = VHost,
- capabilities = Capabilities}} = State) ->
- {ok, _ChSupPid, {ChPid, AState}} =
- rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
- Protocol, User, VHost, Capabilities, Collector}),
- MRef = erlang:monitor(process, ChPid),
- put({ch_pid, ChPid}, {Channel, MRef}),
- put({channel, Channel}, {ChPid, AState}),
- {ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}}.
-
-channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) ->
- case get({ch_pid, ChPid}) of
- undefined -> {undefined, State};
- {Channel, MRef} -> credit_flow:peer_down(ChPid),
- erase({channel, Channel}),
- erase({ch_pid, ChPid}),
- erlang:demonitor(MRef, [flush]),
- {Channel, State#v1{channel_count = ChannelCount - 1}}
- end.
-
-all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
-
-%%--------------------------------------------------------------------------
-
-handle_frame(Type, 0, Payload,
- State = #v1{connection = #connection{protocol = Protocol}})
- when ?IS_STOPPING(State) ->
- case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
- {method, MethodName, FieldsBin} ->
- handle_method0(MethodName, FieldsBin, State);
- _Other -> State
- end;
-handle_frame(Type, 0, Payload,
- State = #v1{connection = #connection{protocol = Protocol}}) ->
- case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
- error -> frame_error(unknown_frame, Type, 0, Payload, State);
- heartbeat -> State;
- {method, MethodName, FieldsBin} ->
- handle_method0(MethodName, FieldsBin, State);
- _Other -> unexpected_frame(Type, 0, Payload, State)
- end;
-handle_frame(Type, Channel, Payload,
- State = #v1{connection = #connection{protocol = Protocol}})
- when ?IS_RUNNING(State) ->
- case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
- error -> frame_error(unknown_frame, Type, Channel, Payload, State);
- heartbeat -> unexpected_frame(Type, Channel, Payload, State);
- Frame -> process_frame(Frame, Channel, State)
- end;
-handle_frame(_Type, _Channel, _Payload, State) when ?IS_STOPPING(State) ->
- State;
-handle_frame(Type, Channel, Payload, State) ->
- unexpected_frame(Type, Channel, Payload, State).
-
-process_frame(Frame, Channel, State) ->
- ChKey = {channel, Channel},
- case (case get(ChKey) of
- undefined -> create_channel(Channel, State);
- Other -> {ok, Other, State}
- end) of
- {error, Error} ->
- handle_exception(State, Channel, Error);
- {ok, {ChPid, AState}, State1} ->
- case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} ->
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State1);
- {ok, Method, NewAState} ->
- rabbit_channel:do(ChPid, Method),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State1);
- {ok, Method, Content, NewAState} ->
- rabbit_channel:do_flow(ChPid, Method, Content),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, control_throttle(State1));
- {error, Reason} ->
- handle_exception(State1, Channel, Reason)
- end
- end.
-
-post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
- {_, State1} = channel_cleanup(ChPid, State),
- %% This is not strictly necessary, but more obviously
- %% correct. Also note that we do not need to call maybe_close/1
- %% since we cannot possibly be in the 'closing' state.
- control_throttle(State1);
-post_process_frame({content_header, _, _, _, _}, _ChPid, State) ->
- maybe_block(State);
-post_process_frame({content_body, _}, _ChPid, State) ->
- maybe_block(State);
-post_process_frame(_Frame, _ChPid, State) ->
- State.
-
-%%--------------------------------------------------------------------------
-
-%% We allow clients to exceed the frame size a little bit since quite
-%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical.
--define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE).
-
-handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, _/binary>>,
- State = #v1{connection = #connection{frame_max = FrameMax}})
- when FrameMax /= 0 andalso
- PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE ->
- fatal_frame_error(
- {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE},
- Type, Channel, <<>>, State);
-handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32,
- Payload:PayloadSize/binary, ?FRAME_END,
- Rest/binary>>,
- State) ->
- {Rest, ensure_stats_timer(handle_frame(Type, Channel, Payload, State))};
-handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>,
- State) ->
- {Rest, ensure_stats_timer(
- switch_callback(State,
- {frame_payload, Type, Channel, PayloadSize},
- PayloadSize + 1))};
-handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
- <<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data,
- case EndMarker of
- ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
- {Rest, switch_callback(State1, frame_header, 7)};
- _ -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
- Type, Channel, Payload, State)
- end;
-handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) ->
- {Rest, handshake({A, B, C, D}, State)};
-handle_input(handshake, <<Other:8/binary, _/binary>>, #v1{sock = Sock}) ->
- refuse_connection(Sock, {bad_header, Other});
-handle_input(Callback, Data, _State) ->
- throw({bad_input, Callback, Data}).
-
-%% The two rules pertaining to version negotiation:
-%%
-%% * If the server cannot support the protocol specified in the
-%% protocol header, it MUST respond with a valid protocol header and
-%% then close the socket connection.
-%%
-%% * The server MUST provide a protocol version that is lower than or
-%% equal to that requested by the client in the protocol header.
-handshake({0, 0, 9, 1}, State) ->
- start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State);
-
-%% This is the protocol header for 0-9, which we can safely treat as
-%% though it were 0-9-1.
-handshake({1, 1, 0, 9}, State) ->
- start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State);
-
-%% This is what most clients send for 0-8. The 0-8 spec, confusingly,
-%% defines the version as 8-0.
-handshake({1, 1, 8, 0}, State) ->
- start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
-
-%% The 0-8 spec as on the AMQP web site actually has this as the
-%% protocol header; some libraries e.g., py-amqplib, send it when they
-%% want 0-8.
-handshake({1, 1, 9, 1}, State) ->
- start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
-
-%% ... and finally, the 1.0 spec is crystal clear!
-handshake({Id, 1, 0, 0}, State) ->
- become_1_0(Id, State);
-
-handshake(Vsn, #v1{sock = Sock}) ->
- refuse_connection(Sock, {bad_version, Vsn}).
-
-%% Offer a protocol version to the client. Connection.start only
-%% includes a major and minor version number, Luckily 0-9 and 0-9-1
-%% are similar enough that clients will be happy with either.
-start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
- Protocol,
- State = #v1{sock = Sock, connection = Connection}) ->
- rabbit_networking:register_connection(self()),
- Start = #'connection.start'{
- version_major = ProtocolMajor,
- version_minor = ProtocolMinor,
- server_properties = server_properties(Protocol),
- mechanisms = auth_mechanisms_binary(Sock),
- locales = <<"en_US">> },
- ok = send_on_channel0(Sock, Start, Protocol),
- switch_callback(State#v1{connection = Connection#connection{
- timeout_sec = ?NORMAL_TIMEOUT,
- protocol = Protocol},
- connection_state = starting},
- frame_header, 7).
-
-refuse_connection(Sock, Exception, {A, B, C, D}) ->
- ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",A,B,C,D>>) end),
- throw(Exception).
-
--ifdef(use_specs).
--spec(refuse_connection/2 :: (rabbit_net:socket(), any()) -> no_return()).
--endif.
-refuse_connection(Sock, Exception) ->
- refuse_connection(Sock, Exception, {0, 0, 9, 1}).
-
-ensure_stats_timer(State = #v1{connection_state = running}) ->
- rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats);
-ensure_stats_timer(State) ->
- State.
-
-%%--------------------------------------------------------------------------
-
-handle_method0(MethodName, FieldsBin,
- State = #v1{connection = #connection{protocol = Protocol}}) ->
- try
- handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
- State)
- catch throw:{inet_error, E} when E =:= closed; E =:= enotconn ->
- maybe_emit_stats(State),
- throw(connection_closed_abruptly);
- exit:#amqp_error{method = none} = Reason ->
- handle_exception(State, 0, Reason#amqp_error{method = MethodName});
- Type:Reason ->
- Stack = erlang:get_stacktrace(),
- handle_exception(State, 0, {Type, Reason, MethodName, Stack})
- end.
-
-handle_method0(#'connection.start_ok'{mechanism = Mechanism,
- response = Response,
- client_properties = ClientProperties},
- State0 = #v1{connection_state = starting,
- connection = Connection,
- sock = Sock}) ->
- AuthMechanism = auth_mechanism_to_module(Mechanism, Sock),
- Capabilities =
- case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of
- {table, Capabilities1} -> Capabilities1;
- _ -> []
- end,
- State = State0#v1{connection_state = securing,
- connection =
- Connection#connection{
- client_properties = ClientProperties,
- capabilities = Capabilities,
- auth_mechanism = {Mechanism, AuthMechanism},
- auth_state = AuthMechanism:init(Sock)}},
- auth_phase(Response, State);
-
-handle_method0(#'connection.secure_ok'{response = Response},
- State = #v1{connection_state = securing}) ->
- auth_phase(Response, State);
-
-handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
- channel_max = ChannelMax,
- heartbeat = ClientHeartbeat},
- State = #v1{connection_state = tuning,
- connection = Connection,
- helper_sup = SupPid,
- sock = Sock}) ->
- ok = validate_negotiated_integer_value(
- frame_max, ?FRAME_MIN_SIZE, FrameMax),
- ok = validate_negotiated_integer_value(
- channel_max, ?CHANNEL_MIN, ChannelMax),
- {ok, Collector} = rabbit_connection_helper_sup:start_queue_collector(
- SupPid, Connection#connection.name),
- Frame = rabbit_binary_generator:build_heartbeat_frame(),
- SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
- Parent = self(),
- ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
- Heartbeater = rabbit_heartbeat:start(
- SupPid, Sock, Connection#connection.name,
- ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun),
- State#v1{connection_state = opening,
- connection = Connection#connection{
- frame_max = FrameMax,
- channel_max = ChannelMax,
- timeout_sec = ClientHeartbeat},
- queue_collector = Collector,
- heartbeater = Heartbeater};
-
-handle_method0(#'connection.open'{virtual_host = VHostPath},
- State = #v1{connection_state = opening,
- connection = Connection = #connection{
- user = User,
- protocol = Protocol},
- helper_sup = SupPid,
- sock = Sock,
- throttle = Throttle}) ->
- ok = rabbit_access_control:check_vhost_access(User, VHostPath, Sock),
- NewConnection = Connection#connection{vhost = VHostPath},
- ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
- Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
- Throttle1 = Throttle#throttle{alarmed_by = Conserve},
- {ok, ChannelSupSupPid} =
- rabbit_connection_helper_sup:start_channel_sup_sup(SupPid),
- State1 = control_throttle(
- State#v1{connection_state = running,
- connection = NewConnection,
- channel_sup_sup_pid = ChannelSupSupPid,
- throttle = Throttle1}),
- rabbit_event:notify(connection_created,
- [{type, network} |
- infos(?CREATION_EVENT_KEYS, State1)]),
- maybe_emit_stats(State1),
- State1;
-handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
- lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
- maybe_close(State#v1{connection_state = closing});
-handle_method0(#'connection.close'{},
- State = #v1{connection = #connection{protocol = Protocol},
- sock = Sock})
- when ?IS_STOPPING(State) ->
- %% We're already closed or closing, so we don't need to cleanup
- %% anything.
- ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
- State;
-handle_method0(#'connection.close_ok'{},
- State = #v1{connection_state = closed}) ->
- self() ! terminate_connection,
- State;
-handle_method0(_Method, State) when ?IS_STOPPING(State) ->
- State;
-handle_method0(_Method, #v1{connection_state = S}) ->
- rabbit_misc:protocol_error(
- channel_error, "unexpected method in connection state ~w", [S]).
-
-validate_negotiated_integer_value(Field, Min, ClientValue) ->
- ServerValue = get_env(Field),
- if ClientValue /= 0 andalso ClientValue < Min ->
- fail_negotiation(Field, min, ServerValue, ClientValue);
- ServerValue /= 0 andalso (ClientValue =:= 0 orelse
- ClientValue > ServerValue) ->
- fail_negotiation(Field, max, ServerValue, ClientValue);
- true ->
- ok
- end.
-
-%% keep dialyzer happy
--spec fail_negotiation(atom(), 'min' | 'max', integer(), integer()) ->
- no_return().
-fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) ->
- {S1, S2} = case MinOrMax of
- min -> {lower, minimum};
- max -> {higher, maximum}
- end,
- rabbit_misc:protocol_error(
- not_allowed, "negotiated ~w = ~w is ~w than the ~w allowed value (~w)",
- [Field, ClientValue, S1, S2, ServerValue], 'connection.tune').
-
-get_env(Key) ->
- {ok, Value} = application:get_env(rabbit, Key),
- Value.
-
-send_on_channel0(Sock, Method, Protocol) ->
- ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
-
-auth_mechanism_to_module(TypeBin, Sock) ->
- case rabbit_registry:binary_to_type(TypeBin) of
- {error, not_found} ->
- rabbit_misc:protocol_error(
- command_invalid, "unknown authentication mechanism '~s'",
- [TypeBin]);
- T ->
- case {lists:member(T, auth_mechanisms(Sock)),
- rabbit_registry:lookup_module(auth_mechanism, T)} of
- {true, {ok, Module}} ->
- Module;
- _ ->
- rabbit_misc:protocol_error(
- command_invalid,
- "invalid authentication mechanism '~s'", [T])
- end
- end.
-
-auth_mechanisms(Sock) ->
- {ok, Configured} = application:get_env(auth_mechanisms),
- [Name || {Name, Module} <- rabbit_registry:lookup_all(auth_mechanism),
- Module:should_offer(Sock), lists:member(Name, Configured)].
-
-auth_mechanisms_binary(Sock) ->
- list_to_binary(
- string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")).
-
-auth_phase(Response,
- State = #v1{connection = Connection =
- #connection{protocol = Protocol,
- auth_mechanism = {Name, AuthMechanism},
- auth_state = AuthState},
- sock = Sock}) ->
- case AuthMechanism:handle_response(Response, AuthState) of
- {refused, Msg, Args} ->
- auth_fail(Msg, Args, Name, State);
- {protocol_error, Msg, Args} ->
- rabbit_misc:protocol_error(syntax_error, Msg, Args);
- {challenge, Challenge, AuthState1} ->
- Secure = #'connection.secure'{challenge = Challenge},
- ok = send_on_channel0(Sock, Secure, Protocol),
- State#v1{connection = Connection#connection{
- auth_state = AuthState1}};
- {ok, User = #user{username = Username}} ->
- case rabbit_access_control:check_user_loopback(Username, Sock) of
- ok -> ok;
- not_allowed -> auth_fail("user '~s' can only connect via "
- "localhost", [Username], Name, State)
- end,
- Tune = #'connection.tune'{frame_max = get_env(frame_max),
- channel_max = get_env(channel_max),
- heartbeat = get_env(heartbeat)},
- ok = send_on_channel0(Sock, Tune, Protocol),
- State#v1{connection_state = tuning,
- connection = Connection#connection{user = User,
- auth_state = none}}
- end.
-
--ifdef(use_specs).
--spec(auth_fail/4 :: (string(), [any()], binary(), #v1{}) -> no_return()).
--endif.
-auth_fail(Msg, Args, AuthName,
- State = #v1{connection = #connection{protocol = Protocol,
- capabilities = Capabilities}}) ->
- AmqpError = rabbit_misc:amqp_error(
- access_refused, "~s login refused: ~s",
- [AuthName, io_lib:format(Msg, Args)], none),
- case rabbit_misc:table_lookup(Capabilities,
- <<"authentication_failure_close">>) of
- {bool, true} ->
- SafeMsg = io_lib:format(
- "Login was refused using authentication "
- "mechanism ~s. For details see the broker "
- "logfile.", [AuthName]),
- AmqpError1 = AmqpError#amqp_error{explanation = SafeMsg},
- {0, CloseMethod} = rabbit_binary_generator:map_exception(
- 0, AmqpError1, Protocol),
- ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol);
- _ -> ok
- end,
- rabbit_misc:protocol_error(AmqpError).
-
-%%--------------------------------------------------------------------------
-
-infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-
-i(pid, #v1{}) -> self();
-i(SockStat, S) when SockStat =:= recv_oct;
- SockStat =:= recv_cnt;
- SockStat =:= send_oct;
- SockStat =:= send_cnt;
- SockStat =:= send_pend ->
- socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end,
- fun ([{_, I}]) -> I end, S);
-i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock);
-i(ssl_protocol, S) -> ssl_info(fun ({P, _}) -> P end, S);
-i(ssl_key_exchange, S) -> ssl_info(fun ({_, {K, _, _}}) -> K end, S);
-i(ssl_cipher, S) -> ssl_info(fun ({_, {_, C, _}}) -> C end, S);
-i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S);
-i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
-i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
-i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
-i(channels, #v1{channel_count = ChannelCount}) -> ChannelCount;
-i(state, #v1{connection_state = ConnectionState,
- throttle = #throttle{alarmed_by = Alarms,
- last_blocked_by = WasBlockedBy,
- last_blocked_at = T}}) ->
- case Alarms =:= [] andalso %% not throttled by resource alarms
- (credit_flow:blocked() %% throttled by flow now
- orelse %% throttled by flow recently
- (WasBlockedBy =:= flow andalso T =/= never andalso
- timer:now_diff(erlang:now(), T) < 5000000)) of
- true -> flow;
- false -> ConnectionState
- end;
-i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
-
-ic(name, #connection{name = Name}) -> Name;
-ic(host, #connection{host = Host}) -> Host;
-ic(peer_host, #connection{peer_host = PeerHost}) -> PeerHost;
-ic(port, #connection{port = Port}) -> Port;
-ic(peer_port, #connection{peer_port = PeerPort}) -> PeerPort;
-ic(protocol, #connection{protocol = none}) -> none;
-ic(protocol, #connection{protocol = P}) -> P:version();
-ic(user, #connection{user = none}) -> '';
-ic(user, #connection{user = U}) -> U#user.username;
-ic(vhost, #connection{vhost = VHost}) -> VHost;
-ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout;
-ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax;
-ic(channel_max, #connection{channel_max = ChMax}) -> ChMax;
-ic(client_properties, #connection{client_properties = CP}) -> CP;
-ic(auth_mechanism, #connection{auth_mechanism = none}) -> none;
-ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name;
-ic(connected_at, #connection{connected_at = T}) -> T;
-ic(Item, #connection{}) -> throw({bad_argument, Item}).
-
-socket_info(Get, Select, #v1{sock = Sock}) ->
- case Get(Sock) of
- {ok, T} -> Select(T);
- {error, _} -> ''
- end.
-
-ssl_info(F, #v1{sock = Sock}) ->
- %% The first ok form is R14
- %% The second is R13 - the extra term is exportability (by inspection,
- %% the docs are wrong)
- case rabbit_net:ssl_info(Sock) of
- nossl -> '';
- {error, _} -> '';
- {ok, {P, {K, C, H}}} -> F({P, {K, C, H}});
- {ok, {P, {K, C, H, _}}} -> F({P, {K, C, H}})
- end.
-
-cert_info(F, #v1{sock = Sock}) ->
- case rabbit_net:peercert(Sock) of
- nossl -> '';
- {error, no_peercert} -> '';
- {ok, Cert} -> list_to_binary(F(Cert))
- end.
-
-maybe_emit_stats(State) ->
- rabbit_event:if_enabled(State, #v1.stats_timer,
- fun() -> emit_stats(State) end).
-
-emit_stats(State) ->
- Infos = infos(?STATISTICS_KEYS, State),
- rabbit_event:notify(connection_stats, Infos),
- State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer),
- %% If we emit an event which looks like we are in flow control, it's not a
- %% good idea for it to be our last even if we go idle. Keep emitting
- %% events, either we stay busy or we drop out of flow control.
- case proplists:get_value(state, Infos) of
- flow -> ensure_stats_timer(State1);
- _ -> State1
- end.
-
-%% 1.0 stub
--ifdef(use_specs).
--spec(become_1_0/2 :: (non_neg_integer(), #v1{}) -> no_return()).
--endif.
-become_1_0(Id, State = #v1{sock = Sock}) ->
- case code:is_loaded(rabbit_amqp1_0_reader) of
- false -> refuse_connection(Sock, amqp1_0_plugin_not_enabled);
- _ -> Mode = case Id of
- 0 -> amqp;
- 3 -> sasl;
- _ -> refuse_connection(
- Sock, {unsupported_amqp1_0_protocol_id, Id},
- {3, 1, 0, 0})
- end,
- F = fun (_Deb, Buf, BufLen, S) ->
- {rabbit_amqp1_0_reader, init,
- [Mode, pack_for_1_0(Buf, BufLen, S)]}
- end,
- State#v1{connection_state = {become, F}}
- end.
-
-pack_for_1_0(Buf, BufLen, #v1{parent = Parent,
- sock = Sock,
- recv_len = RecvLen,
- pending_recv = PendingRecv,
- helper_sup = SupPid}) ->
- {Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen}.