diff options
Diffstat (limited to 'src/rabbit_reader.erl')
-rw-r--r-- | src/rabbit_reader.erl | 64 |
1 files changed, 40 insertions, 24 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e6e94e28..2ac24f97 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -27,7 +27,6 @@ -export([conserve_resources/3, server_properties/1]). --define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 30). -define(CHANNEL_TERMINATION_TIMEOUT, 3). @@ -43,7 +42,7 @@ -record(connection, {name, host, peer_host, port, peer_port, protocol, user, timeout_sec, frame_max, channel_max, vhost, client_properties, capabilities, - auth_mechanism, auth_state}). + auth_mechanism, auth_state, connected_at}). -record(throttle, {alarmed_by, last_blocked_by, last_blocked_at}). @@ -55,7 +54,7 @@ peer_host, ssl, peer_cert_subject, peer_cert_issuer, peer_cert_validity, auth_mechanism, ssl_protocol, ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost, - timeout, frame_max, channel_max, client_properties]). + timeout, frame_max, channel_max, client_properties, connected_at]). -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). @@ -189,10 +188,10 @@ server_capabilities(_) -> log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args). socket_error(Reason) when is_atom(Reason) -> - log(error, "error on AMQP connection ~p: ~s~n", + log(error, "Error on AMQP connection ~p: ~s~n", [self(), rabbit_misc:format_inet_error(Reason)]); socket_error(Reason) -> - log(error, "error on AMQP connection ~p:~n~p~n", [self(), Reason]). + log(error, "Error on AMQP connection ~p:~n~p~n", [self(), Reason]). inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). @@ -216,8 +215,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> exit(normal) end, log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]), + {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout), ClientSock = socket_op(Sock, SockTransform), - erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), + erlang:send_after(HandshakeTimeout, self(), handshake_timeout), {PeerHost, PeerPort, Host, Port} = socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), ?store_proc_name(list_to_binary(Name)), @@ -231,13 +231,14 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> peer_port = PeerPort, protocol = none, user = none, - timeout_sec = ?HANDSHAKE_TIMEOUT, + timeout_sec = (HandshakeTimeout / 1000), frame_max = ?FRAME_MIN_SIZE, vhost = none, client_properties = none, capabilities = [], auth_mechanism = none, - auth_state = none}, + auth_state = none, + connected_at = rabbit_misc:now_to_ms(os:timestamp())}, callback = uninitialized_callback, recv_len = 0, pending_recv = false, @@ -410,7 +411,7 @@ handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) rabbit_event:notify( connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref), - State; + rabbit_event:init_stats_timer(State, #v1.stats_timer); handle_other({'$gen_cast', force_event_refresh}, State) -> %% Ignore, we will emit a created event once we start running. State; @@ -548,21 +549,27 @@ wait_for_channel_termination(0, TimerRef, State) -> end; _ -> State end; -wait_for_channel_termination(N, TimerRef, State) -> +wait_for_channel_termination(N, TimerRef, + State = #v1{connection_state = CS, + connection = #connection{ + name = ConnName, + user = User, + vhost = VHost}}) -> receive {'DOWN', _MRef, process, ChPid, Reason} -> {Channel, State1} = channel_cleanup(ChPid, State), case {Channel, termination_kind(Reason)} of - {undefined, _} -> exit({abnormal_dependent_exit, - ChPid, Reason}); - {_, controlled} -> wait_for_channel_termination( - N-1, TimerRef, State1); - {_, uncontrolled} -> log(error, - "AMQP connection ~p, channel ~p - " - "error while terminating:~n~p~n", - [self(), Channel, Reason]), - wait_for_channel_termination( - N-1, TimerRef, State1) + {undefined, _} -> + exit({abnormal_dependent_exit, ChPid, Reason}); + {_, controlled} -> + wait_for_channel_termination(N-1, TimerRef, State1); + {_, uncontrolled} -> + log(error, "Error on AMQP connection ~p (~s, vhost: '~s'," + " user: '~s', state: ~p), channel ~p:" + "error while terminating:~n~p~n", + [self(), ConnName, VHost, User#user.username, + CS, Channel, Reason]), + wait_for_channel_termination(N-1, TimerRef, State1) end; cancel_wait -> exit(channel_termination_timeout) @@ -581,16 +588,24 @@ maybe_close(State) -> termination_kind(normal) -> controlled; termination_kind(_) -> uncontrolled. +log_hard_error(#v1{connection_state = CS, + connection = #connection{ + name = ConnName, + user = User, + vhost = VHost}}, Channel, Reason) -> + log(error, + "Error on AMQP connection ~p (~s, vhost: '~s'," + " user: '~s', state: ~p), channel ~p:~n~p~n", + [self(), ConnName, VHost, User#user.username, CS, Channel, Reason]). + handle_exception(State = #v1{connection_state = closed}, Channel, Reason) -> - log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n", - [self(), closed, Channel, Reason]), + log_hard_error(State, Channel, Reason), State; handle_exception(State = #v1{connection = #connection{protocol = Protocol}, connection_state = CS}, Channel, Reason) when ?IS_RUNNING(State) orelse CS =:= closing -> - log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n", - [self(), CS, Channel, Reason]), + log_hard_error(State, Channel, Reason), {0, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), State1 = close_connection(terminate_channels(State)), @@ -1130,6 +1145,7 @@ ic(channel_max, #connection{channel_max = ChMax}) -> ChMax; ic(client_properties, #connection{client_properties = CP}) -> CP; ic(auth_mechanism, #connection{auth_mechanism = none}) -> none; ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name; +ic(connected_at, #connection{connected_at = T}) -> T; ic(Item, #connection{}) -> throw({bad_argument, Item}). socket_info(Get, Select, #v1{sock = Sock}) -> |