diff options
Diffstat (limited to 'src/rabbit_reader.erl')
-rw-r--r-- | src/rabbit_reader.erl | 819 |
1 files changed, 482 insertions, 337 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 6f12e22e..8106a46a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -10,8 +10,8 @@ %% %% The Original Code is RabbitMQ. %% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. %% -module(rabbit_reader). @@ -23,7 +23,7 @@ -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/4, mainloop/2]). +-export([init/4, mainloop/2, recvloop/2]). -export([conserve_resources/3, server_properties/1]). @@ -37,21 +37,27 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, - channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, - auth_mechanism, auth_state, conserve_resources, - last_blocked_by, last_blocked_at}). + ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun, + buf, buf_len, throttle}). + +-record(connection, {name, host, peer_host, port, peer_port, + protocol, user, timeout_sec, frame_max, vhost, + client_properties, capabilities, + auth_mechanism, auth_state}). + +-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at, + blocked_sent}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, last_blocked_by, last_blocked_age, channels]). --define(CREATION_EVENT_KEYS, [pid, name, address, port, peer_address, peer_port, - ssl, peer_cert_subject, peer_cert_issuer, - peer_cert_validity, auth_mechanism, - ssl_protocol, ssl_key_exchange, - ssl_cipher, ssl_hash, - protocol, user, vhost, timeout, frame_max, - client_properties]). +-define(CREATION_EVENT_KEYS, + [pid, name, port, peer_port, host, + peer_host, ssl, peer_cert_subject, peer_cert_issuer, + peer_cert_validity, auth_mechanism, ssl_protocol, + ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost, + timeout, frame_max, client_properties]). -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). @@ -60,6 +66,10 @@ State#v1.connection_state =:= blocking orelse State#v1.connection_state =:= blocked)). +-define(IS_STOPPING(State), + (State#v1.connection_state =:= closing orelse + State#v1.connection_state =:= closed)). + %%-------------------------------------------------------------------------- -ifdef(use_specs). @@ -94,24 +104,24 @@ %%-------------------------------------------------------------------------- -start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) -> - {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid, +start_link(ChannelSup3Pid, Collector, StartHeartbeatFun) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSup3Pid, Collector, StartHeartbeatFun])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) -> +init(Parent, ChSup3Pid, Collector, StartHeartbeatFun) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> start_connection( - Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock, + Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) end. system_continue(Parent, Deb, State) -> - ?MODULE:mainloop(Deb, State#v1{parent = Parent}). + mainloop(Deb, State#v1{parent = Parent}). system_terminate(Reason, _Parent, _Deb, _State) -> exit(Reason). @@ -133,8 +143,8 @@ info(Pid, Items) -> force_event_refresh(Pid) -> gen_server:cast(Pid, force_event_refresh). -conserve_resources(Pid, _Source, Conserve) -> - Pid ! {conserve_resources, Conserve}, +conserve_resources(Pid, Source, Conserve) -> + Pid ! {conserve_resources, Source, Conserve}, ok. server_properties(Protocol) -> @@ -169,84 +179,110 @@ server_capabilities(rabbit_framing_amqp_0_9_1) -> [{<<"publisher_confirms">>, bool, true}, {<<"exchange_exchange_bindings">>, bool, true}, {<<"basic.nack">>, bool, true}, - {<<"consumer_cancel_notify">>, bool, true}]; + {<<"consumer_cancel_notify">>, bool, true}, + {<<"connection.blocked">>, bool, true}]; server_capabilities(_) -> []. +%%-------------------------------------------------------------------------- + log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args). +socket_error(Reason) -> + log(error, "error on AMQP connection ~p: ~p (~s)~n", + [self(), Reason, rabbit_misc:format_inet_error(Reason)]). + inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). socket_op(Sock, Fun) -> case Fun(Sock) of {ok, Res} -> Res; - {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n", - [self(), Reason]), + {error, Reason} -> socket_error(Reason), + %% NB: this is tcp socket, even in case of ssl + rabbit_net:fast_close(Sock), exit(normal) end. -name(Sock) -> - socket_op(Sock, fun (S) -> rabbit_net:connection_string(S, inbound) end). - -start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, +start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), - ConnStr = name(Sock), - log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]), + Name = case rabbit_net:connection_string(Sock, inbound) of + {ok, Str} -> Str; + {error, enotconn} -> rabbit_net:fast_close(Sock), + exit(normal); + {error, Reason} -> socket_error(Reason), + rabbit_net:fast_close(Sock), + exit(normal) + end, + log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]), ClientSock = socket_op(Sock, SockTransform), - erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), - handshake_timeout), + erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), + {PeerHost, PeerPort, Host, Port} = + socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), State = #v1{parent = Parent, sock = ClientSock, connection = #connection{ + name = list_to_binary(Name), + host = Host, + peer_host = PeerHost, + port = Port, + peer_port = PeerPort, protocol = none, user = none, timeout_sec = ?HANDSHAKE_TIMEOUT, frame_max = ?FRAME_MIN_SIZE, vhost = none, client_properties = none, - capabilities = []}, + capabilities = [], + auth_mechanism = none, + auth_state = none}, callback = uninitialized_callback, recv_len = 0, pending_recv = false, connection_state = pre_init, queue_collector = Collector, heartbeater = none, - channel_sup_sup_pid = ChannelSupSupPid, + ch_sup3_pid = ChSup3Pid, + channel_sup_sup_pid = none, start_heartbeat_fun = StartHeartbeatFun, buf = [], buf_len = 0, - auth_mechanism = none, - auth_state = none, - conserve_resources = false, - last_blocked_by = none, - last_blocked_at = never}, + throttle = #throttle{ + alarmed_by = [], + last_blocked_by = none, + last_blocked_at = never, + blocked_sent = false}}, try - ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end), - recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( - State, #v1.stats_timer), - handshake, 8)), - log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr]) + run({?MODULE, recvloop, + [Deb, switch_callback(rabbit_event:init_stats_timer( + State, #v1.stats_timer), + handshake, 8)]}), + log(info, "closing AMQP connection ~p (~s)~n", [self(), Name]) catch Ex -> log(case Ex of connection_closed_abruptly -> warning; _ -> error end, "closing AMQP connection ~p (~s):~n~p~n", - [self(), ConnStr, Ex]) + [self(), Name, Ex]) after - %% The reader is the controlling process and hence its - %% termination will close the socket. Furthermore, - %% gen_tcp:close/1 waits for pending output to be sent, which - %% results in unnecessary delays. However, to keep the - %% file_handle_cache accounting as accurate as possible it - %% would be good to close the socket immediately if we - %% can. But we can only do this for non-ssl sockets. - %% - rabbit_net:maybe_fast_close(ClientSock), + %% We don't call gen_tcp:close/1 here since it waits for + %% pending output to be sent, which results in unnecessary + %% delays. We could just terminate - the reader is the + %% controlling process and hence its termination will close + %% the socket. However, to keep the file_handle_cache + %% accounting as accurate as possible we ought to close the + %% socket w/o delay before termination. + rabbit_net:fast_close(ClientSock), + rabbit_networking:unregister_connection(self()), rabbit_event:notify(connection_closed, [{pid, self()}]) end, done. +run({M, F, A}) -> + try apply(M, F, A) + catch {become, MFA} -> run(MFA) + end. + recvloop(Deb, State = #v1{pending_recv = true}) -> mainloop(Deb, State); recvloop(Deb, State = #v1{connection_state = blocked}) -> @@ -276,18 +312,32 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> _ -> throw(connection_closed_abruptly) end; {inet_async, _Sock, _Ref, {error, Reason}} -> + maybe_emit_stats(State), throw({inet_error, Reason}); + {system, From, Request} -> + sys:handle_system_msg(Request, From, State#v1.parent, + ?MODULE, Deb, State); Other -> - handle_other(Other, Deb, State) + case handle_other(Other, State) of + stop -> ok; + NewState -> recvloop(Deb, NewState) + end end. -handle_other({conserve_resources, Conserve}, Deb, State) -> - recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve})); -handle_other({channel_closing, ChPid}, Deb, State) -> +handle_other({conserve_resources, Source, Conserve}, + State = #v1{throttle = Throttle = + #throttle{alarmed_by = CR}}) -> + CR1 = case Conserve of + true -> lists:usort([Source | CR]); + false -> CR -- [Source] + end, + Throttle1 = Throttle#throttle{alarmed_by = CR1}, + control_throttle(State#v1{throttle = Throttle1}); +handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), - mainloop(Deb, maybe_close(control_throttle(State))); -handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) -> + maybe_close(control_throttle(State)); +handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), %% this is what we are expected to do according to @@ -298,94 +348,133 @@ handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) -> %% ordinary error case. However, since this termination is %% initiated by our parent it is probably more important to exit %% quickly. + maybe_emit_stats(State), exit(Reason); -handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}}, - _Deb, _State) -> +handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, State) -> + maybe_emit_stats(State), throw(E); -handle_other({channel_exit, Channel, Reason}, Deb, State) -> - mainloop(Deb, handle_exception(State, Channel, Reason)); -handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) -> - mainloop(Deb, handle_dependent_exit(ChPid, Reason, State)); -handle_other(terminate_connection, _Deb, State) -> +handle_other({channel_exit, Channel, Reason}, State) -> + handle_exception(State, Channel, Reason); +handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) -> + handle_dependent_exit(ChPid, Reason, State); +handle_other(terminate_connection, State) -> + maybe_emit_stats(State), + stop; +handle_other(handshake_timeout, State) + when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) -> State; -handle_other(handshake_timeout, Deb, State) - when ?IS_RUNNING(State) orelse - State#v1.connection_state =:= closing orelse - State#v1.connection_state =:= closed -> - mainloop(Deb, State); -handle_other(handshake_timeout, _Deb, State) -> +handle_other(handshake_timeout, State) -> + maybe_emit_stats(State), throw({handshake_timeout, State#v1.callback}); -handle_other(timeout, Deb, State = #v1{connection_state = closed}) -> - mainloop(Deb, State); -handle_other(timeout, _Deb, #v1{connection_state = S}) -> - throw({timeout, S}); -handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) -> +handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) -> + State; +handle_other(heartbeat_timeout, State = #v1{connection_state = S}) -> + maybe_emit_stats(State), + throw({heartbeat_timeout, S}); +handle_other({'$gen_call', From, {shutdown, Explanation}}, State) -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), case ForceTermination of - force -> ok; - normal -> mainloop(Deb, NewState) + force -> stop; + normal -> NewState end; -handle_other({'$gen_call', From, info}, Deb, State) -> +handle_other({'$gen_call', From, info}, State) -> gen_server:reply(From, infos(?INFO_KEYS, State)), - mainloop(Deb, State); -handle_other({'$gen_call', From, {info, Items}}, Deb, State) -> + State; +handle_other({'$gen_call', From, {info, Items}}, State) -> gen_server:reply(From, try {ok, infos(Items, State)} catch Error -> {error, Error} end), - mainloop(Deb, State); -handle_other({'$gen_cast', force_event_refresh}, Deb, State) + State; +handle_other({'$gen_cast', force_event_refresh}, State) when ?IS_RUNNING(State) -> rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State)]), - mainloop(Deb, State); -handle_other({'$gen_cast', force_event_refresh}, Deb, State) -> + State; +handle_other({'$gen_cast', force_event_refresh}, State) -> %% Ignore, we will emit a created event once we start running. - mainloop(Deb, State); -handle_other(emit_stats, Deb, State) -> - mainloop(Deb, emit_stats(State)); -handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); -handle_other({bump_credit, Msg}, Deb, State) -> + State; +handle_other(ensure_stats, State) -> + ensure_stats_timer(State); +handle_other(emit_stats, State) -> + emit_stats(State); +handle_other({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), - recvloop(Deb, control_throttle(State)); -handle_other(Other, _Deb, _State) -> + control_throttle(State); +handle_other(Other, State) -> %% internal error -> something worth dying for + maybe_emit_stats(State), exit({unexpected_message, Other}). switch_callback(State, Callback, Length) -> State#v1{callback = Callback, recv_len = Length}. terminate(Explanation, State) when ?IS_RUNNING(State) -> - {normal, send_exception(State, 0, - rabbit_misc:amqp_error( - connection_forced, Explanation, [], none))}; + {normal, handle_exception(State, 0, + rabbit_misc:amqp_error( + connection_forced, Explanation, [], none))}; terminate(_Explanation, State) -> {force, State}. -control_throttle(State = #v1{connection_state = CS, - conserve_resources = Mem}) -> - case {CS, Mem orelse credit_flow:blocked()} of +control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) -> + IsThrottled = ((Throttle#throttle.alarmed_by =/= []) orelse + credit_flow:blocked()), + case {CS, IsThrottled} of {running, true} -> State#v1{connection_state = blocking}; {blocking, false} -> State#v1{connection_state = running}; {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( State#v1.heartbeater), - State#v1{connection_state = running}; - {blocked, true} -> update_last_blocked_by(State); + maybe_send_unblocked(State), + State#v1{connection_state = running, + throttle = Throttle#throttle{ + blocked_sent = false}}; + {blocked, true} -> State#v1{throttle = update_last_blocked_by( + Throttle)}; {_, _} -> State end. -maybe_block(State = #v1{connection_state = blocking}) -> +maybe_block(State = #v1{connection_state = blocking, + throttle = Throttle}) -> ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater), - update_last_blocked_by(State#v1{connection_state = blocked, - last_blocked_at = erlang:now()}); + Sent = maybe_send_blocked(State), + State#v1{connection_state = blocked, + throttle = update_last_blocked_by( + Throttle#throttle{last_blocked_at = erlang:now(), + blocked_sent = Sent})}; maybe_block(State) -> State. -update_last_blocked_by(State = #v1{conserve_resources = true}) -> - State#v1{last_blocked_by = resource}; -update_last_blocked_by(State = #v1{conserve_resources = false}) -> - State#v1{last_blocked_by = flow}. +maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = []}}) -> + false; +maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = CR}, + connection = #connection{ + protocol = Protocol, + capabilities = Capabilities}, + sock = Sock}) -> + case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of + {bool, true} -> + RStr = string:join([atom_to_list(A) || A <- CR], " & "), + Reason = list_to_binary(rabbit_misc:format("low on ~s", [RStr])), + ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason}, + Protocol), + true; + _ -> + false + end. + +maybe_send_unblocked(#v1{throttle = #throttle{blocked_sent = false}}) -> + ok; +maybe_send_unblocked(#v1{connection = #connection{protocol = Protocol}, + sock = Sock}) -> + ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol). + +update_last_blocked_by(Throttle = #throttle{alarmed_by = []}) -> + Throttle#throttle{last_blocked_by = flow}; +update_last_blocked_by(Throttle) -> + Throttle#throttle{last_blocked_by = resource}. + +%%-------------------------------------------------------------------------- +%% error handling / termination close_connection(State = #v1{queue_collector = Collector, connection = #connection{ @@ -405,29 +494,15 @@ close_connection(State = #v1{queue_collector = Collector, handle_dependent_exit(ChPid, Reason, State) -> case {channel_cleanup(ChPid), termination_kind(Reason)} of - {undefined, uncontrolled} -> - exit({abnormal_dependent_exit, ChPid, Reason}); - {_Channel, controlled} -> - maybe_close(control_throttle(State)); - {Channel, uncontrolled} -> - log(error, "AMQP connection ~p, channel ~p - error:~n~p~n", - [self(), Channel, Reason]), - maybe_close(handle_exception(control_throttle(State), - Channel, Reason)) - end. - -channel_cleanup(ChPid) -> - case get({ch_pid, ChPid}) of - undefined -> undefined; - {Channel, MRef} -> credit_flow:peer_down(ChPid), - erase({channel, Channel}), - erase({ch_pid, ChPid}), - erlang:demonitor(MRef, [flush]), - Channel + {undefined, controlled} -> State; + {undefined, uncontrolled} -> exit({abnormal_dependent_exit, + ChPid, Reason}); + {_Channel, controlled} -> maybe_close(control_throttle(State)); + {Channel, uncontrolled} -> State1 = handle_exception( + State, Channel, Reason), + maybe_close(control_throttle(State1)) end. -all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. - terminate_channels() -> NChannels = length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]), @@ -481,78 +556,179 @@ maybe_close(State) -> termination_kind(normal) -> controlled; termination_kind(_) -> uncontrolled. +handle_exception(State = #v1{connection_state = closed}, Channel, Reason) -> + log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n", + [self(), closed, Channel, Reason]), + State; +handle_exception(State = #v1{connection = #connection{protocol = Protocol}, + connection_state = CS}, + Channel, Reason) + when ?IS_RUNNING(State) orelse CS =:= closing -> + log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n", + [self(), CS, Channel, Reason]), + {0, CloseMethod} = + rabbit_binary_generator:map_exception(Channel, Reason, Protocol), + terminate_channels(), + State1 = close_connection(State), + ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol), + State1; +handle_exception(State, Channel, Reason) -> + %% We don't trust the client at this point - force them to wait + %% for a bit so they can't DOS us with repeated failed logins etc. + timer:sleep(?SILENT_CLOSE_DELAY * 1000), + throw({handshake_error, State#v1.connection_state, Channel, Reason}). + +%% we've "lost sync" with the client and hence must not accept any +%% more input +fatal_frame_error(Error, Type, Channel, Payload, State) -> + frame_error(Error, Type, Channel, Payload, State), + %% grace period to allow transmission of error + timer:sleep(?SILENT_CLOSE_DELAY * 1000), + throw(fatal_frame_error). + +frame_error(Error, Type, Channel, Payload, State) -> + {Str, Bin} = payload_snippet(Payload), + handle_exception(State, Channel, + rabbit_misc:amqp_error(frame_error, + "type ~p, ~s octets = ~p: ~p", + [Type, Str, Bin, Error], none)). + +unexpected_frame(Type, Channel, Payload, State) -> + {Str, Bin} = payload_snippet(Payload), + handle_exception(State, Channel, + rabbit_misc:amqp_error(unexpected_frame, + "type ~p, ~s octets = ~p", + [Type, Str, Bin], none)). + +payload_snippet(Payload) when size(Payload) =< 16 -> + {"all", Payload}; +payload_snippet(<<Snippet:16/binary, _/binary>>) -> + {"first 16", Snippet}. + +%%-------------------------------------------------------------------------- + +create_channel(Channel, State) -> + #v1{sock = Sock, queue_collector = Collector, + channel_sup_sup_pid = ChanSupSup, + connection = #connection{name = Name, + protocol = Protocol, + frame_max = FrameMax, + user = User, + vhost = VHost, + capabilities = Capabilities}} = State, + {ok, _ChSupPid, {ChPid, AState}} = + rabbit_channel_sup_sup:start_channel( + ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, + Protocol, User, VHost, Capabilities, Collector}), + MRef = erlang:monitor(process, ChPid), + put({ch_pid, ChPid}, {Channel, MRef}), + put({channel, Channel}, {ChPid, AState}), + {ChPid, AState}. + +channel_cleanup(ChPid) -> + case get({ch_pid, ChPid}) of + undefined -> undefined; + {Channel, MRef} -> credit_flow:peer_down(ChPid), + erase({channel, Channel}), + erase({ch_pid, ChPid}), + erlang:demonitor(MRef, [flush]), + Channel + end. + +all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. + +%%-------------------------------------------------------------------------- + handle_frame(Type, 0, Payload, - State = #v1{connection_state = CS, - connection = #connection{protocol = Protocol}}) - when CS =:= closing; CS =:= closed -> + State = #v1{connection = #connection{protocol = Protocol}}) + when ?IS_STOPPING(State) -> case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); _Other -> State end; -handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) - when CS =:= closing; CS =:= closed -> - State; handle_frame(Type, 0, Payload, State = #v1{connection = #connection{protocol = Protocol}}) -> case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of - error -> throw({unknown_frame, 0, Type, Payload}); + error -> frame_error(unknown_frame, Type, 0, Payload, State); heartbeat -> State; {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); - Other -> throw({unexpected_frame_on_channel0, Other}) + _Other -> unexpected_frame(Type, 0, Payload, State) end; handle_frame(Type, Channel, Payload, - State = #v1{connection = #connection{protocol = Protocol}}) -> + State = #v1{connection = #connection{protocol = Protocol}}) + when ?IS_RUNNING(State) -> case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of - error -> throw({unknown_frame, Channel, Type, Payload}); - heartbeat -> throw({unexpected_heartbeat_frame, Channel}); - AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, State) - end. + error -> frame_error(unknown_frame, Type, Channel, Payload, State); + heartbeat -> unexpected_frame(Type, Channel, Payload, State); + Frame -> process_frame(Frame, Channel, State) + end; +handle_frame(_Type, _Channel, _Payload, State) when ?IS_STOPPING(State) -> + State; +handle_frame(Type, Channel, Payload, State) -> + unexpected_frame(Type, Channel, Payload, State). process_frame(Frame, Channel, State) -> - case get({channel, Channel}) of - {ChPid, AState} -> - case process_channel_frame(Frame, ChPid, AState) of - {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {error, Reason} -> handle_exception(State, Channel, Reason) - end; - undefined when ?IS_RUNNING(State) -> - ok = create_channel(Channel, State), - process_frame(Frame, Channel, State); - undefined -> - throw({channel_frame_while_starting, - Channel, State#v1.connection_state, Frame}) + ChKey = {channel, Channel}, + {ChPid, AState} = case get(ChKey) of + undefined -> create_channel(Channel, State); + Other -> Other + end, + case rabbit_command_assembler:process(Frame, AState) of + {ok, NewAState} -> + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + {ok, Method, NewAState} -> + rabbit_channel:do(ChPid, Method), + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + {ok, Method, Content, NewAState} -> + rabbit_channel:do_flow(ChPid, Method, Content), + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, control_throttle(State)); + {error, Reason} -> + handle_exception(State, Channel, Reason) end. post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> channel_cleanup(ChPid), + %% This is not strictly necessary, but more obviously + %% correct. Also note that we do not need to call maybe_close/1 + %% since we cannot possibly be in the 'closing' state. control_throttle(State); -post_process_frame({method, MethodName, _}, _ChPid, - State = #v1{connection = #connection{ - protocol = Protocol}}) -> - case Protocol:method_has_content(MethodName) of - true -> erlang:bump_reductions(2000), - maybe_block(control_throttle(State)); - false -> control_throttle(State) - end; +post_process_frame({content_header, _, _, _, _}, _ChPid, State) -> + maybe_block(State); +post_process_frame({content_body, _}, _ChPid, State) -> + maybe_block(State); post_process_frame(_Frame, _ChPid, State) -> - control_throttle(State). + State. + +%%-------------------------------------------------------------------------- +%% We allow clients to exceed the frame size a little bit since quite +%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical. +-define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE). + +handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, + State = #v1{connection = #connection{frame_max = FrameMax}}) + when FrameMax /= 0 andalso + PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE -> + fatal_frame_error( + {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE}, + Type, Channel, <<>>, State); handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> ensure_stats_timer( switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, PayloadSize + 1)); -handle_input({frame_payload, Type, Channel, PayloadSize}, - PayloadAndMarker, State) -> - case PayloadAndMarker of - <<Payload:PayloadSize/binary, ?FRAME_END>> -> - switch_callback(handle_frame(Type, Channel, Payload, State), - frame_header, 7); - _ -> - throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker}) +handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) -> + <<Payload:PayloadSize/binary, EndMarker>> = Data, + case EndMarker of + ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State), + switch_callback(State1, frame_header, 7); + _ -> fatal_frame_error({invalid_frame_end_marker, EndMarker}, + Type, Channel, Payload, State) end; %% The two rules pertaining to version negotiation: @@ -582,8 +758,12 @@ handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) -> handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) -> start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); +%% ... and finally, the 1.0 spec is crystal clear! Note that the +handle_input(handshake, <<"AMQP", Id, 1, 0, 0>>, State) -> + become_1_0(Id, State); + handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) -> - refuse_connection(Sock, {bad_version, A, B, C, D}); + refuse_connection(Sock, {bad_version, {A, B, C, D}}); handle_input(handshake, Other, #v1{sock = Sock}) -> refuse_connection(Sock, {bad_header, Other}); @@ -597,6 +777,7 @@ handle_input(Callback, Data, _State) -> start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, Protocol, State = #v1{sock = Sock, connection = Connection}) -> + rabbit_networking:register_connection(self()), Start = #'connection.start'{ version_major = ProtocolMajor, version_minor = ProtocolMinor, @@ -610,10 +791,16 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, connection_state = starting}, frame_header, 7). -refuse_connection(Sock, Exception) -> - ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end), +refuse_connection(Sock, Exception, {A, B, C, D}) -> + ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",A,B,C,D>>) end), throw(Exception). +-ifdef(use_specs). +-spec(refuse_connection/2 :: (rabbit_net:socket(), any()) -> no_return()). +-endif. +refuse_connection(Sock, Exception) -> + refuse_connection(Sock, Exception, {0, 0, 9, 1}). + ensure_stats_timer(State = #v1{connection_state = running}) -> rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats); ensure_stats_timer(State) -> @@ -623,24 +810,14 @@ ensure_stats_timer(State) -> handle_method0(MethodName, FieldsBin, State = #v1{connection = #connection{protocol = Protocol}}) -> - HandleException = - fun(R) -> - case ?IS_RUNNING(State) of - true -> send_exception(State, 0, R); - %% We don't trust the client at this point - force - %% them to wait for a bit so they can't DOS us with - %% repeated failed logins etc. - false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), - throw({channel0_error, State#v1.connection_state, R}) - end - end, try handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), State) catch exit:#amqp_error{method = none} = Reason -> - HandleException(Reason#amqp_error{method = MethodName}); + handle_exception(State, 0, Reason#amqp_error{method = MethodName}); Type:Reason -> - HandleException({Type, Reason, MethodName, erlang:get_stacktrace()}) + Stack = erlang:get_stacktrace(), + handle_exception(State, 0, {Type, Reason, MethodName, Stack}) end. handle_method0(#'connection.start_ok'{mechanism = Mechanism, @@ -655,13 +832,13 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, {table, Capabilities1} -> Capabilities1; _ -> [] end, - State = State0#v1{auth_mechanism = AuthMechanism, - auth_state = AuthMechanism:init(Sock), - connection_state = securing, + State = State0#v1{connection_state = securing, connection = Connection#connection{ client_properties = ClientProperties, - capabilities = Capabilities}}, + capabilities = Capabilities, + auth_mechanism = {Mechanism, AuthMechanism}, + auth_state = AuthMechanism:init(Sock)}}, auth_phase(Response, State); handle_method0(#'connection.secure_ok'{response = Response}, @@ -687,7 +864,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, Frame = rabbit_binary_generator:build_heartbeat_frame(), SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, Parent = self(), - ReceiveFun = fun() -> Parent ! timeout end, + ReceiveFun = fun() -> Parent ! heartbeat_timeout end, Heartbeater = SHF(Sock, ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, @@ -699,32 +876,39 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, handle_method0(#'connection.open'{virtual_host = VHostPath}, State = #v1{connection_state = opening, - connection = Connection = #connection{ - user = User, - protocol = Protocol}, - sock = Sock}) -> + connection = Connection = #connection{ + user = User, + protocol = Protocol}, + ch_sup3_pid = ChSup3Pid, + sock = Sock, + throttle = Throttle}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), + Throttle1 = Throttle#throttle{alarmed_by = Conserve}, + {ok, ChannelSupSupPid} = + supervisor2:start_child( + ChSup3Pid, + {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), State1 = control_throttle( - State#v1{connection_state = running, - connection = NewConnection, - conserve_resources = Conserve}), + State#v1{connection_state = running, + connection = NewConnection, + channel_sup_sup_pid = ChannelSupSupPid, + throttle = Throttle1}), rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), - rabbit_event:if_enabled(State1, #v1.stats_timer, - fun() -> emit_stats(State1) end), + maybe_emit_stats(State1), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); handle_method0(#'connection.close'{}, - State = #v1{connection_state = CS, - connection = #connection{protocol = Protocol}, + State = #v1{connection = #connection{protocol = Protocol}, sock = Sock}) - when CS =:= closing; CS =:= closed -> + when ?IS_STOPPING(State) -> %% We're already closed or closing, so we don't need to cleanup %% anything. ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), @@ -733,19 +917,20 @@ handle_method0(#'connection.close_ok'{}, State = #v1{connection_state = closed}) -> self() ! terminate_connection, State; -handle_method0(_Method, State = #v1{connection_state = CS}) - when CS =:= closing; CS =:= closed -> +handle_method0(_Method, State) when ?IS_STOPPING(State) -> State; handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -%% Compute frame_max for this instance. Could simply use 0, but breaks -%% QPid Java client. server_frame_max() -> {ok, FrameMax} = application:get_env(rabbit, frame_max), FrameMax. +server_heartbeat() -> + {ok, Heartbeat} = application:get_env(rabbit, heartbeat), + Heartbeat. + send_on_channel0(Sock, Method, Protocol) -> ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). @@ -777,115 +962,86 @@ auth_mechanisms_binary(Sock) -> string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")). auth_phase(Response, - State = #v1{auth_mechanism = AuthMechanism, - auth_state = AuthState, - connection = Connection = - #connection{protocol = Protocol}, + State = #v1{connection = Connection = + #connection{protocol = Protocol, + auth_mechanism = {Name, AuthMechanism}, + auth_state = AuthState}, sock = Sock}) -> case AuthMechanism:handle_response(Response, AuthState) of {refused, Msg, Args} -> rabbit_misc:protocol_error( access_refused, "~s login refused: ~s", - [proplists:get_value(name, AuthMechanism:description()), - io_lib:format(Msg, Args)]); + [Name, io_lib:format(Msg, Args)]); {protocol_error, Msg, Args} -> rabbit_misc:protocol_error(syntax_error, Msg, Args); {challenge, Challenge, AuthState1} -> Secure = #'connection.secure'{challenge = Challenge}, ok = send_on_channel0(Sock, Secure, Protocol), - State#v1{auth_state = AuthState1}; + State#v1{connection = Connection#connection{ + auth_state = AuthState1}}; {ok, User} -> Tune = #'connection.tune'{channel_max = 0, frame_max = server_frame_max(), - heartbeat = 0}, + heartbeat = server_heartbeat()}, ok = send_on_channel0(Sock, Tune, Protocol), State#v1{connection_state = tuning, - connection = Connection#connection{user = User}} + connection = Connection#connection{user = User, + auth_state = none}} end. %%-------------------------------------------------------------------------- infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. -i(pid, #v1{}) -> - self(); -i(name, #v1{sock = Sock}) -> - list_to_binary(name(Sock)); -i(address, #v1{sock = Sock}) -> - socket_info(fun rabbit_net:sockname/1, fun ({A, _}) -> A end, Sock); -i(port, #v1{sock = Sock}) -> - socket_info(fun rabbit_net:sockname/1, fun ({_, P}) -> P end, Sock); -i(peer_address, #v1{sock = Sock}) -> - socket_info(fun rabbit_net:peername/1, fun ({A, _}) -> A end, Sock); -i(peer_port, #v1{sock = Sock}) -> - socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock); -i(ssl, #v1{sock = Sock}) -> - rabbit_net:is_ssl(Sock); -i(ssl_protocol, #v1{sock = Sock}) -> - ssl_info(fun ({P, _}) -> P end, Sock); -i(ssl_key_exchange, #v1{sock = Sock}) -> - ssl_info(fun ({_, {K, _, _}}) -> K end, Sock); -i(ssl_cipher, #v1{sock = Sock}) -> - ssl_info(fun ({_, {_, C, _}}) -> C end, Sock); -i(ssl_hash, #v1{sock = Sock}) -> - ssl_info(fun ({_, {_, _, H}}) -> H end, Sock); -i(peer_cert_issuer, #v1{sock = Sock}) -> - cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock); -i(peer_cert_subject, #v1{sock = Sock}) -> - cert_info(fun rabbit_ssl:peer_cert_subject/1, Sock); -i(peer_cert_validity, #v1{sock = Sock}) -> - cert_info(fun rabbit_ssl:peer_cert_validity/1, Sock); -i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; - SockStat =:= recv_cnt; - SockStat =:= send_oct; - SockStat =:= send_cnt; - SockStat =:= send_pend -> - socket_info(fun () -> rabbit_net:getstat(Sock, [SockStat]) end, - fun ([{_, I}]) -> I end); -i(state, #v1{connection_state = S}) -> - S; -i(last_blocked_by, #v1{last_blocked_by = By}) -> - By; -i(last_blocked_age, #v1{last_blocked_at = never}) -> +i(pid, #v1{}) -> self(); +i(SockStat, S) when SockStat =:= recv_oct; + SockStat =:= recv_cnt; + SockStat =:= send_oct; + SockStat =:= send_cnt; + SockStat =:= send_pend -> + socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end, + fun ([{_, I}]) -> I end, S); +i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock); +i(ssl_protocol, S) -> ssl_info(fun ({P, _}) -> P end, S); +i(ssl_key_exchange, S) -> ssl_info(fun ({_, {K, _, _}}) -> K end, S); +i(ssl_cipher, S) -> ssl_info(fun ({_, {_, C, _}}) -> C end, S); +i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S); +i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S); +i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S); +i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S); +i(state, #v1{connection_state = CS}) -> CS; +i(last_blocked_by, #v1{throttle = #throttle{last_blocked_by = By}}) -> By; +i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) -> infinity; -i(last_blocked_age, #v1{last_blocked_at = T}) -> +i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) -> timer:now_diff(erlang:now(), T) / 1000000; -i(channels, #v1{}) -> - length(all_channels()); -i(protocol, #v1{connection = #connection{protocol = none}}) -> - none; -i(protocol, #v1{connection = #connection{protocol = Protocol}}) -> - Protocol:version(); -i(auth_mechanism, #v1{auth_mechanism = none}) -> - none; -i(auth_mechanism, #v1{auth_mechanism = Mechanism}) -> - proplists:get_value(name, Mechanism:description()); -i(user, #v1{connection = #connection{user = #user{username = Username}}}) -> - Username; -i(user, #v1{connection = #connection{user = none}}) -> - ''; -i(vhost, #v1{connection = #connection{vhost = VHost}}) -> - VHost; -i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) -> - Timeout; -i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) -> - FrameMax; -i(client_properties, #v1{connection = #connection{ - client_properties = ClientProperties}}) -> - ClientProperties; -i(Item, #v1{}) -> - throw({bad_argument, Item}). - -socket_info(Get, Select, Sock) -> - socket_info(fun() -> Get(Sock) end, Select). - -socket_info(Get, Select) -> - case Get() of +i(channels, #v1{}) -> length(all_channels()); +i(Item, #v1{connection = Conn}) -> ic(Item, Conn). + +ic(name, #connection{name = Name}) -> Name; +ic(host, #connection{host = Host}) -> Host; +ic(peer_host, #connection{peer_host = PeerHost}) -> PeerHost; +ic(port, #connection{port = Port}) -> Port; +ic(peer_port, #connection{peer_port = PeerPort}) -> PeerPort; +ic(protocol, #connection{protocol = none}) -> none; +ic(protocol, #connection{protocol = P}) -> P:version(); +ic(user, #connection{user = none}) -> ''; +ic(user, #connection{user = U}) -> U#user.username; +ic(vhost, #connection{vhost = VHost}) -> VHost; +ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout; +ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax; +ic(client_properties, #connection{client_properties = CP}) -> CP; +ic(auth_mechanism, #connection{auth_mechanism = none}) -> none; +ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name; +ic(Item, #connection{}) -> throw({bad_argument, Item}). + +socket_info(Get, Select, #v1{sock = Sock}) -> + case Get(Sock) of {ok, T} -> Select(T); {error, _} -> '' end. -ssl_info(F, Sock) -> +ssl_info(F, #v1{sock = Sock}) -> %% The first ok form is R14 %% The second is R13 - the extra term is exportability (by inspection, %% the docs are wrong) @@ -896,58 +1052,47 @@ ssl_info(F, Sock) -> {ok, {P, {K, C, H, _}}} -> F({P, {K, C, H}}) end. -cert_info(F, Sock) -> +cert_info(F, #v1{sock = Sock}) -> case rabbit_net:peercert(Sock) of nossl -> ''; {error, no_peercert} -> ''; {ok, Cert} -> list_to_binary(F(Cert)) end. -%%-------------------------------------------------------------------------- - -create_channel(Channel, State) -> - #v1{sock = Sock, queue_collector = Collector, - channel_sup_sup_pid = ChanSupSup, - connection = #connection{protocol = Protocol, - frame_max = FrameMax, - user = User, - vhost = VHost, - capabilities = Capabilities}} = State, - {ok, _ChSupPid, {ChPid, AState}} = - rabbit_channel_sup_sup:start_channel( - ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock), - Protocol, User, VHost, Capabilities, Collector}), - MRef = erlang:monitor(process, ChPid), - put({ch_pid, ChPid}, {Channel, MRef}), - put({channel, Channel}, {ChPid, AState}), - ok. - -process_channel_frame(Frame, ChPid, AState) -> - case rabbit_command_assembler:process(Frame, AState) of - {ok, NewAState} -> {ok, NewAState}; - {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), - {ok, NewAState}; - {ok, Method, Content, NewAState} -> rabbit_channel:do_flow( - ChPid, Method, Content), - {ok, NewAState}; - {error, Reason} -> {error, Reason} - end. - -handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) -> - State; -handle_exception(State, Channel, Reason) -> - send_exception(State, Channel, Reason). - -send_exception(State = #v1{connection = #connection{protocol = Protocol}}, - Channel, Reason) -> - {0, CloseMethod} = - rabbit_binary_generator:map_exception(Channel, Reason, Protocol), - terminate_channels(), - State1 = close_connection(State), - ok = rabbit_writer:internal_send_command( - State1#v1.sock, 0, CloseMethod, Protocol), - State1. +maybe_emit_stats(State) -> + rabbit_event:if_enabled(State, #v1.stats_timer, + fun() -> emit_stats(State) end). emit_stats(State) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), rabbit_event:reset_stats_timer(State, #v1.stats_timer). + +%% 1.0 stub +-ifdef(use_specs). +-spec(become_1_0/2 :: (non_neg_integer(), #v1{}) -> no_return()). +-endif. +become_1_0(Id, State = #v1{sock = Sock}) -> + case code:is_loaded(rabbit_amqp1_0_reader) of + false -> refuse_connection(Sock, amqp1_0_plugin_not_enabled); + _ -> Mode = case Id of + 0 -> amqp; + 3 -> sasl; + _ -> refuse_connection( + Sock, {unsupported_amqp1_0_protocol_id, Id}, + {3, 1, 0, 0}) + end, + throw({become, {rabbit_amqp1_0_reader, init, + [Mode, pack_for_1_0(State)]}}) + end. + +pack_for_1_0(#v1{parent = Parent, + sock = Sock, + recv_len = RecvLen, + pending_recv = PendingRecv, + queue_collector = QueueCollector, + ch_sup3_pid = ChSup3Pid, + start_heartbeat_fun = SHF, + buf = Buf, + buf_len = BufLen}) -> + {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ChSup3Pid, SHF, + Buf, BufLen}. |