path: root/src/rabbit_reader.erl
diff options
Diffstat (limited to 'src/rabbit_reader.erl')
1 files changed, 361 insertions, 266 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f2a903dc..252f81a3 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -33,18 +33,19 @@
--export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]).
+-export([start_link/3, info_keys/0, info/1, info/2, shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/1, mainloop/3]).
+-export([init/4, mainloop/2]).
+-export([conserve_memory/2, server_properties/0]).
@@ -57,13 +58,18 @@
--record(v1, {sock, connection, callback, recv_ref, connection_state,
- queue_collector}).
+-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
+ connection_state, queue_collector, heartbeater, stats_timer,
+ channel_sup_sup_pid, start_heartbeat_fun}).
+-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
+ send_pend, state, channels]).
+-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port,
+ protocol, user, vhost, timeout, frame_max,
+ client_properties]).
- [pid, address, port, peer_address, peer_port,
- recv_oct, recv_cnt, send_oct, send_cnt, send_pend,
- state, channels, user, vhost, timeout, frame_max, client_properties]).
%% connection lifecycle
@@ -101,6 +107,17 @@
%% -> log error, mark channel as closing, *running*
%% handshake_timeout -> ignore, *running*
%% heartbeat timeout -> *throw*
+%% conserve_memory=true -> *blocking*
+%% blocking:
+%% conserve_memory=true -> *blocking*
+%% conserve_memory=false -> *running*
+%% receive a method frame for a content-bearing method
+%% -> process, stop receiving, *blocked*
+%% same as 'running'
+%% blocked:
+%% conserve_memory=true -> *blocked*
+%% conserve_memory=false -> resume receiving, *running*
+%% same as 'running'
%% closing:
%% socket close -> *terminate*
%% receive connection.close -> send connection.close_ok,
@@ -134,35 +151,60 @@
%% TODO: refactor the code so that the above is obvious
+ (State#v1.connection_state =:= running orelse
+ State#v1.connection_state =:= blocking orelse
+ State#v1.connection_state =:= blocked)).
--spec(info_keys/0 :: () -> [info_key()]).
--spec(info/1 :: (pid()) -> [info()]).
--spec(info/2 :: (pid(), [info_key()]) -> [info()]).
+-type(start_heartbeat_fun() ::
+ fun ((rabbit_networking:socket(), non_neg_integer()) ->
+ rabbit_heartbeat:heartbeaters())).
+-spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) ->
+ rabbit_types:ok(pid())).
+-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
+-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
+-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
+-spec(emit_stats/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
--spec(server_properties/0 :: () -> amqp_table()).
+-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
+-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
+%% These specs only exists to add no_return() to keep dialyzer happy
+-spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()).
+-spec(start_connection/7 ::
+ (pid(), pid(), pid(), start_heartbeat_fun(), any(),
+ rabbit_networking:socket(),
+ fun ((rabbit_networking:socket()) ->
+ rabbit_types:ok_or_error2(
+ rabbit_networking:socket(), any()))) -> no_return()).
-start_link() ->
- {ok, proc_lib:spawn_link(?MODULE, init, [self()])}.
+start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) ->
+ {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid,
+ Collector, StartHeartbeatFun])}.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent) ->
+init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) ->
Deb = sys:debug_options([]),
{go, Sock, SockTransform} ->
- start_connection(Parent, Deb, Sock, SockTransform)
+ start_connection(
+ Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock,
+ SockTransform)
system_continue(Parent, Deb, State) ->
- ?MODULE:mainloop(Parent, Deb, State).
+ ?MODULE:mainloop(Deb, State#v1{parent = Parent}).
system_terminate(Reason, _Parent, _Deb, _State) ->
@@ -181,32 +223,12 @@ info(Pid, Items) ->
{error, Error} -> throw(Error)
-setup_profiling() ->
- Value = rabbit_misc:get_config(profiling_enabled, false),
- case Value of
- once ->
- rabbit_log:info("Enabling profiling for this connection, "
- "and disabling for subsequent.~n"),
- rabbit_misc:set_config(profiling_enabled, false),
- fprof:trace(start);
- true ->
- rabbit_log:info("Enabling profiling for this connection.~n"),
- fprof:trace(start);
- false ->
- ok
- end,
- Value.
+emit_stats(Pid) ->
+ gen_server:cast(Pid, emit_stats).
-teardown_profiling(Value) ->
- case Value of
- false ->
- ok;
- _ ->
- rabbit_log:info("Completing profiling for this connection.~n"),
- fprof:trace(stop),
- fprof:profile(),
- fprof:analyse([{dest, []}, {cols, 100}])
- end.
+conserve_memory(Pid, Conserve) ->
+ Pid ! {conserve_memory, Conserve},
+ ok.
server_properties() ->
{ok, Product} = application:get_key(rabbit, id),
@@ -230,7 +252,8 @@ socket_op(Sock, Fun) ->
-start_connection(Parent, Deb, Sock, SockTransform) ->
+start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
+ Sock, SockTransform) ->
process_flag(trap_exit, true),
{PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
PeerAddressS = inet_parse:ntoa(PeerAddress),
@@ -239,22 +262,29 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
ClientSock = socket_op(Sock, SockTransform),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
- ProfilingValue = setup_profiling(),
- {ok, Collector} = rabbit_reader_queue_collector:start_link(),
- mainloop(Parent, Deb, switch_callback(
- #v1{sock = ClientSock,
- connection = #connection{
- user = none,
- timeout_sec = ?HANDSHAKE_TIMEOUT,
- frame_max = ?FRAME_MIN_SIZE,
- vhost = none,
- client_properties = none},
- callback = uninitialized_callback,
- recv_ref = none,
- connection_state = pre_init,
- queue_collector = Collector},
- handshake, 8))
+ mainloop(Deb, switch_callback(
+ #v1{parent = Parent,
+ sock = ClientSock,
+ connection = #connection{
+ protocol = none,
+ user = none,
+ timeout_sec = ?HANDSHAKE_TIMEOUT,
+ frame_max = ?FRAME_MIN_SIZE,
+ vhost = none,
+ client_properties = none},
+ callback = uninitialized_callback,
+ recv_length = 0,
+ recv_ref = none,
+ connection_state = pre_init,
+ queue_collector = Collector,
+ heartbeater = none,
+ stats_timer =
+ rabbit_event:init_stats_timer(),
+ channel_sup_sup_pid = ChannelSupSupPid,
+ start_heartbeat_fun = StartHeartbeatFun
+ },
+ handshake, 8))
Ex -> (if Ex == connection_closed_abruptly ->
fun rabbit_log:warning/2;
@@ -271,21 +301,18 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%% output to be sent, which results in unnecessary delays.
%% gen_tcp:close(ClientSock),
- teardown_profiling(ProfilingValue),
- rabbit_reader_queue_collector:shutdown(Collector),
- rabbit_misc:unlink_and_capture_exit(Collector)
+ rabbit_event:notify(connection_closed, [{pid, self()}])
-mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
+mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
%%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
{inet_async, Sock, Ref, {ok, Data}} ->
{State1, Callback1, Length1} =
handle_input(State#v1.callback, Data,
State#v1{recv_ref = none}),
- mainloop(Parent, Deb,
- switch_callback(State1, Callback1, Length1));
+ mainloop(Deb, switch_callback(State1, Callback1, Length1));
{inet_async, Sock, Ref, {error, closed}} ->
if State#v1.connection_state =:= closed ->
@@ -294,6 +321,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
{inet_async, Sock, Ref, {error, Reason}} ->
throw({inet_error, Reason});
+ {conserve_memory, Conserve} ->
+ mainloop(Deb, internal_conserve_memory(Conserve, State));
{'EXIT', Parent, Reason} ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -308,17 +337,17 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
{channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
- {channel_exit, Channel, Reason} ->
- mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
- {'EXIT', Pid, Reason} ->
- mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
+ {channel_exit, ChannelOrFrPid, Reason} ->
+ mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State));
+ {'DOWN', _MRef, process, ChSupPid, Reason} ->
+ mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State));
terminate_connection ->
handshake_timeout ->
- if State#v1.connection_state =:= running orelse
+ if ?IS_RUNNING(State) orelse
State#v1.connection_state =:= closing orelse
State#v1.connection_state =:= closed ->
- mainloop(Parent, Deb, State);
+ mainloop(Deb, State);
true ->
throw({handshake_timeout, State#v1.callback})
@@ -329,16 +358,21 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
gen_server:reply(From, ok),
case ForceTermination of
force -> ok;
- normal -> mainloop(Parent, Deb, NewState)
+ normal -> mainloop(Deb, NewState)
{'$gen_call', From, info} ->
gen_server:reply(From, infos(?INFO_KEYS, State)),
- mainloop(Parent, Deb, State);
+ mainloop(Deb, State);
{'$gen_call', From, {info, Items}} ->
gen_server:reply(From, try {ok, infos(Items, State)}
catch Error -> {error, Error}
- mainloop(Parent, Deb, State);
+ mainloop(Deb, State);
+ {'$gen_cast', emit_stats} ->
+ internal_emit_stats(State),
+ mainloop(Deb, State#v1{stats_timer =
+ rabbit_event:reset_stats_timer_after(
+ State#v1.stats_timer)});
{system, From, Request} ->
sys:handle_system_msg(Request, From,
Parent, ?MODULE, Deb, State);
@@ -347,21 +381,44 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
exit({unexpected_message, Other})
-switch_callback(OldState, NewCallback, Length) ->
+switch_callback(State = #v1{connection_state = blocked,
+ heartbeater = Heartbeater}, Callback, Length) ->
+ ok = rabbit_heartbeat:pause_monitor(Heartbeater),
+ State#v1{callback = Callback, recv_length = Length, recv_ref = none};
+switch_callback(State, Callback, Length) ->
Ref = inet_op(fun () -> rabbit_net:async_recv(
- OldState#v1.sock, Length, infinity) end),
- OldState#v1{callback = NewCallback,
- recv_ref = Ref}.
+ State#v1.sock, Length, infinity) end),
+ State#v1{callback = Callback, recv_length = Length, recv_ref = Ref}.
-terminate(Explanation, State = #v1{connection_state = running}) ->
+terminate(Explanation, State) when ?IS_RUNNING(State) ->
{normal, send_exception(State, 0,
connection_forced, Explanation, [], none))};
terminate(_Explanation, State) ->
{force, State}.
-close_connection(State = #v1{connection = #connection{
+internal_conserve_memory(true, State = #v1{connection_state = running}) ->
+ State#v1{connection_state = blocking};
+internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->
+ State#v1{connection_state = running};
+internal_conserve_memory(false, State = #v1{connection_state = blocked,
+ heartbeater = Heartbeater,
+ callback = Callback,
+ recv_length = Length,
+ recv_ref = none}) ->
+ ok = rabbit_heartbeat:resume_monitor(Heartbeater),
+ switch_callback(State#v1{connection_state = running}, Callback, Length);
+internal_conserve_memory(_Conserve, State) ->
+ State.
+close_connection(State = #v1{queue_collector = Collector,
+ connection = #connection{
timeout_sec = TimeoutSec}}) ->
+ %% The spec says "Exclusive queues may only be accessed by the
+ %% current connection, and are deleted when that connection
+ %% closes." This does not strictly imply synchrony, but in
+ %% practice it seems to be what people assume.
+ rabbit_queue_collector:delete_all(Collector),
%% We terminate the connection after the specified interval, but
%% no later than ?CLOSING_TIMEOUT seconds.
TimeoutMillisec =
@@ -376,30 +433,45 @@ close_channel(Channel, State) ->
put({channel, Channel}, closing),
+handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) ->
+ {channel, Channel} = get({ch_fr_pid, ChFrPid}),
+ handle_exception(State, Channel, Reason);
handle_channel_exit(Channel, Reason, State) ->
handle_exception(State, Channel, Reason).
-handle_dependent_exit(Pid, normal, State) ->
- erase({chpid, Pid}),
- maybe_close(State);
-handle_dependent_exit(Pid, Reason, State) ->
- case channel_cleanup(Pid) of
- undefined -> exit({abnormal_dependent_exit, Pid, Reason});
- Channel -> maybe_close(handle_exception(State, Channel, Reason))
+handle_dependent_exit(ChSupPid, Reason, State) ->
+ case termination_kind(Reason) of
+ controlled ->
+ case erase({ch_sup_pid, ChSupPid}) of
+ undefined -> ok;
+ {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr)
+ end,
+ maybe_close(State);
+ uncontrolled ->
+ case channel_cleanup(ChSupPid) of
+ undefined ->
+ exit({abnormal_dependent_exit, ChSupPid, Reason});
+ Channel ->
+ maybe_close(handle_exception(State, Channel, Reason))
+ end
-channel_cleanup(Pid) ->
- case get({chpid, Pid}) of
- undefined -> undefined;
- {channel, Channel} -> erase({channel, Channel}),
- erase({chpid, Pid}),
- Channel
+channel_cleanup(ChSupPid) ->
+ case get({ch_sup_pid, ChSupPid}) of
+ undefined -> undefined;
+ {{channel, Channel}, ChFr} -> erase({channel, Channel}),
+ erase(ChFr),
+ erase({ch_sup_pid, ChSupPid}),
+ Channel
-all_channels() -> [Pid || {{chpid, Pid},_} <- get()].
+all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid},
+ {_Channel, {ch_fr_pid, ChFrPid}}} <- get()].
terminate_channels() ->
- NChannels = length([exit(Pid, normal) || Pid <- all_channels()]),
+ NChannels =
+ length([rabbit_framing_channel:shutdown(ChFrPid)
+ || ChFrPid <- all_channels()]),
if NChannels > 0 ->
Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
@@ -417,14 +489,15 @@ wait_for_channel_termination(0, TimerRef) ->
wait_for_channel_termination(N, TimerRef) ->
- {'EXIT', Pid, Reason} ->
- case channel_cleanup(Pid) of
+ {'DOWN', _MRef, process, ChSupPid, Reason} ->
+ case channel_cleanup(ChSupPid) of
undefined ->
- exit({abnormal_dependent_exit, Pid, Reason});
+ exit({abnormal_dependent_exit, ChSupPid, Reason});
Channel ->
- case Reason of
- normal -> ok;
- _ ->
+ case termination_kind(Reason) of
+ controlled ->
+ ok;
+ uncontrolled ->
"connection ~p, channel ~p - "
"error while terminating:~n~p~n",
@@ -437,24 +510,28 @@ wait_for_channel_termination(N, TimerRef) ->
maybe_close(State = #v1{connection_state = closing,
- queue_collector = Collector}) ->
+ connection = #connection{protocol = Protocol},
+ sock = Sock}) ->
case all_channels() of
[] ->
- %% Spec says "Exclusive queues may only be accessed by the current
- %% connection, and are deleted when that connection closes."
- %% This does not strictly imply synchrony, but in practice it seems
- %% to be what people assume.
- rabbit_reader_queue_collector:delete_all(Collector),
- ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
- close_connection(State);
+ NewState = close_connection(State),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
+ NewState;
_ -> State
maybe_close(State) ->
-handle_frame(Type, 0, Payload, State = #v1{connection_state = CS})
+termination_kind(normal) -> controlled;
+termination_kind(shutdown) -> controlled;
+termination_kind({shutdown, _Term}) -> controlled;
+termination_kind(_) -> uncontrolled.
+handle_frame(Type, 0, Payload,
+ State = #v1{connection_state = CS,
+ connection = #connection{protocol = Protocol}})
when CS =:= closing; CS =:= closed ->
- case analyze_frame(Type, Payload) of
+ case analyze_frame(Type, Payload, Protocol) of
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
_Other -> State
@@ -462,31 +539,38 @@ handle_frame(Type, 0, Payload, State = #v1{connection_state = CS})
handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
when CS =:= closing; CS =:= closed ->
-handle_frame(Type, 0, Payload, State) ->
- case analyze_frame(Type, Payload) of
+handle_frame(Type, 0, Payload,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
+ case analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, 0, Type, Payload});
heartbeat -> State;
- trace -> State;
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
Other -> throw({unexpected_frame_on_channel0, Other})
-handle_frame(Type, Channel, Payload, State) ->
- case analyze_frame(Type, Payload) of
+handle_frame(Type, Channel, Payload,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
+ case analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- trace -> throw({unexpected_trace_frame, Channel});
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
- {chpid, ChPid} ->
+ {ch_fr_pid, ChFrPid} ->
+ ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame),
case AnalyzedFrame of
{method, 'channel.close', _} ->
- erase({channel, Channel});
- _ -> ok
- end,
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
- State;
+ erase({channel, Channel}),
+ State;
+ {method, MethodName, _} ->
+ case (State#v1.connection_state =:= blocking andalso
+ Protocol:method_has_content(MethodName)) of
+ true -> State#v1{connection_state = blocked};
+ false -> State
+ end;
+ _ ->
+ State
+ end;
closing ->
%% According to the spec, after sending a
%% channel.close we must ignore all frames except
@@ -506,32 +590,37 @@ handle_frame(Type, Channel, Payload, State) ->
undefined ->
- case State#v1.connection_state of
- running -> ok = send_to_new_channel(
- Channel, AnalyzedFrame, State),
- State;
- Other -> throw({channel_frame_while_starting,
- Channel, Other, AnalyzedFrame})
+ case ?IS_RUNNING(State) of
+ true -> ok = send_to_new_channel(
+ Channel, AnalyzedFrame, State),
+ State;
+ false -> throw({channel_frame_while_starting,
+ Channel, State#v1.connection_state,
+ AnalyzedFrame})
-analyze_frame(?FRAME_METHOD, <<ClassId:16, MethodId:16, MethodFields/binary>>) ->
- {method, rabbit_framing:lookup_method_name({ClassId, MethodId}), MethodFields};
-analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>) ->
+ <<ClassId:16, MethodId:16, MethodFields/binary>>,
+ Protocol) ->
+ MethodName = Protocol:lookup_method_name({ClassId, MethodId}),
+ {method, MethodName, MethodFields};
+ <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>,
+ _Protocol) ->
{content_header, ClassId, Weight, BodySize, Properties};
-analyze_frame(?FRAME_BODY, Body) ->
+analyze_frame(?FRAME_BODY, Body, _Protocol) ->
{content_body, Body};
-analyze_frame(?FRAME_TRACE, _Body) ->
- trace;
-analyze_frame(?FRAME_HEARTBEAT, <<>>) ->
+analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) ->
-analyze_frame(_Type, _Body) ->
+analyze_frame(_Type, _Body, _Protocol) ->
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
%%?LOGDEBUG("Got frame header: ~p/~p/~p~n", [Type, Channel, PayloadSize]),
- {State, {frame_payload, Type, Channel, PayloadSize}, PayloadSize + 1};
+ {ensure_stats_timer(State), {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1};
handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) ->
case PayloadAndMarker of
@@ -543,54 +632,76 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat
throw({bad_payload, PayloadAndMarker})
-handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
- State = #v1{sock = Sock, connection = Connection}) ->
- case check_version({ProtocolMajor, ProtocolMinor},
- true ->
- ok = send_on_channel0(
- Sock,
- #'connection.start'{
- version_major = ?PROTOCOL_VERSION_MAJOR,
- version_minor = ?PROTOCOL_VERSION_MINOR,
- server_properties = server_properties(),
- mechanisms = <<"PLAIN AMQPLAIN">>,
- locales = <<"en_US">> }),
- {State#v1{connection = Connection#connection{
- timeout_sec = ?NORMAL_TIMEOUT},
- connection_state = starting},
- frame_header, 7};
- false ->
- throw({bad_version, ProtocolMajor, ProtocolMinor})
- end;
+%% The two rules pertaining to version negotiation:
+%% * If the server cannot support the protocol specified in the
+%% protocol header, it MUST respond with a valid protocol header and
+%% then close the socket connection.
+%% * The server MUST provide a protocol version that is lower than or
+%% equal to that requested by the client in the protocol header.
+handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) ->
+ start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State);
+%% This is the protocol header for 0-9, which we can safely treat as
+%% though it were 0-9-1.
+handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) ->
+ start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State);
+%% This is what most clients send for 0-8. The 0-8 spec, confusingly,
+%% defines the version as 8-0.
+handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) ->
+ start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+%% The 0-8 spec as on the AMQP web site actually has this as the
+%% protocol header; some libraries e.g., py-amqplib, send it when they
+%% want 0-8.
+handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) ->
+ start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) ->
+ refuse_connection(Sock, {bad_version, A, B, C, D});
handle_input(handshake, Other, #v1{sock = Sock}) ->
- ok = inet_op(fun () -> rabbit_net:send(
- Sock, <<"AMQP",1,1,
- throw({bad_header, Other});
+ refuse_connection(Sock, {bad_header, Other});
handle_input(Callback, Data, _State) ->
throw({bad_input, Callback, Data}).
-%% the 0-8 spec, confusingly, defines the version as 8-0
-adjust_version({8,0}) -> {0,8};
-adjust_version(Version) -> Version.
-check_version(ClientVersion, ServerVersion) ->
- {ClientMajor, ClientMinor} = adjust_version(ClientVersion),
- {ServerMajor, ServerMinor} = adjust_version(ServerVersion),
- ClientMajor > ServerMajor
- orelse
- (ClientMajor == ServerMajor andalso
- ClientMinor >= ServerMinor).
+%% Offer a protocol version to the client. Connection.start only
+%% includes a major and minor version number, Luckily 0-9 and 0-9-1
+%% are similar enough that clients will be happy with either.
+start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
+ Protocol,
+ State = #v1{sock = Sock, connection = Connection}) ->
+ Start = #'connection.start'{ version_major = ProtocolMajor,
+ version_minor = ProtocolMinor,
+ server_properties = server_properties(),
+ mechanisms = <<"PLAIN AMQPLAIN">>,
+ locales = <<"en_US">> },
+ ok = send_on_channel0(Sock, Start, Protocol),
+ {State#v1{connection = Connection#connection{
+ timeout_sec = ?NORMAL_TIMEOUT,
+ protocol = Protocol},
+ connection_state = starting},
+ frame_header, 7}.
+refuse_connection(Sock, Exception) ->
+ ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
+ throw(Exception).
+ensure_stats_timer(State = #v1{stats_timer = StatsTimer}) ->
+ Self = self(),
+ State#v1{stats_timer = rabbit_event:ensure_stats_timer_after(
+ StatsTimer,
+ fun() -> emit_stats(Self) end)}.
-handle_method0(MethodName, FieldsBin, State) ->
+handle_method0(MethodName, FieldsBin,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
- handle_method0(rabbit_framing:decode_method_fields(
- MethodName, FieldsBin),
+ handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
catch exit:Reason ->
CompleteReason = case Reason of
@@ -598,13 +709,14 @@ handle_method0(MethodName, FieldsBin, State) ->
Reason#amqp_error{method = MethodName};
OtherReason -> OtherReason
- case State#v1.connection_state of
- running -> send_exception(State, 0, CompleteReason);
+ case ?IS_RUNNING(State) of
+ true -> send_exception(State, 0, CompleteReason);
%% We don't trust the client at this point - force
%% them to wait for a bit so they can't DOS us with
%% repeated failed logins etc.
- Other -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, Other, CompleteReason})
+ false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({channel0_error, State#v1.connection_state,
+ CompleteReason})
@@ -612,14 +724,14 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
response = Response,
client_properties = ClientProperties},
State = #v1{connection_state = starting,
- connection = Connection,
+ connection = Connection =
+ #connection{protocol = Protocol},
sock = Sock}) ->
User = rabbit_access_control:check_login(Mechanism, Response),
- ok = send_on_channel0(
- Sock,
- #'connection.tune'{channel_max = 0,
+ Tune = #'connection.tune'{channel_max = 0,
frame_max = ?FRAME_MAX,
- heartbeat = 0}),
+ heartbeat = 0},
+ ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
connection = Connection#connection{
user = User,
@@ -628,7 +740,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
- sock = Sock}) ->
+ sock = Sock,
+ start_heartbeat_fun = SHF}) ->
if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) ->
not_allowed, "frame_max=~w < ~w min size",
@@ -638,51 +751,42 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ?FRAME_MAX]);
true ->
- rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat),
+ Heartbeater = SHF(Sock, ClientHeartbeat),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
- frame_max = FrameMax}}
+ frame_max = FrameMax},
+ heartbeater = Heartbeater}
-handle_method0(#''{virtual_host = VHostPath,
- insist = Insist},
+handle_method0(#''{virtual_host = VHostPath},
State = #v1{connection_state = opening,
connection = Connection = #connection{
- user = User},
+ user = User,
+ protocol = Protocol},
sock = Sock}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
- KnownHosts = format_listeners(rabbit_networking:active_listeners()),
- Redirects = compute_redirects(Insist),
- if Redirects == [] ->
- ok = send_on_channel0(
- Sock,
- #'connection.open_ok'{known_hosts = KnownHosts}),
- State#v1{connection_state = running,
- connection = NewConnection};
- true ->
- %% FIXME: 'host' is supposed to only contain one
- %% address; but which one do we pick? This is
- %% really a problem with the spec.
- Host = format_listeners(Redirects),
- rabbit_log:info("connection ~p redirecting to ~p~n",
- [self(), Host]),
- ok = send_on_channel0(
- Sock,
- #'connection.redirect'{host = Host,
- known_hosts = KnownHosts}),
- close_connection(State#v1{connection = NewConnection})
- end;
- State = #v1{connection_state = running}) ->
+ ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
+ State1 = internal_conserve_memory(
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ State#v1{connection_state = running,
+ connection = NewConnection}),
+ rabbit_event:notify(connection_created,
+ infos(?CREATION_EVENT_KEYS, State1)),
+ State1;
+handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
-handle_method0(#'connection.close'{}, State = #v1{connection_state = CS})
+ State = #v1{connection_state = CS,
+ connection = #connection{protocol = Protocol},
+ sock = Sock})
when CS =:= closing; CS =:= closed ->
%% We're already closed or closing, so we don't need to cleanup
%% anything.
- ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
State = #v1{connection_state = closed}) ->
@@ -695,23 +799,8 @@ handle_method0(_Method, #v1{connection_state = S}) ->
channel_error, "unexpected method in connection state ~w", [S]).
-send_on_channel0(Sock, Method) ->
- ok = rabbit_writer:internal_send_command(Sock, 0, Method).
-format_listeners(Listeners) ->
- list_to_binary(
- rabbit_misc:intersperse(
- $,,
- [io_lib:format("~s:~w", [Host, Port]) ||
- #listener{host = Host, port = Port} <- Listeners])).
-compute_redirects(true) -> [];
-compute_redirects(false) ->
- Node = node(),
- LNode = rabbit_load:pick(),
- if Node == LNode -> [];
- true -> rabbit_networking:node_listeners(LNode)
- end.
+send_on_channel0(Sock, Method, Protocol) ->
+ ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
@@ -745,6 +834,10 @@ i(state, #v1{connection_state = S}) ->
i(channels, #v1{}) ->
+i(protocol, #v1{connection = #connection{protocol = none}}) ->
+ none;
+i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
+ Protocol:version();
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
i(user, #v1{connection = #connection{user = none}}) ->
@@ -763,19 +856,22 @@ i(Item, #v1{}) ->
-send_to_new_channel(Channel, AnalyzedFrame,
- State = #v1{queue_collector = Collector}) ->
- #v1{sock = Sock, connection = #connection{
- frame_max = FrameMax,
- user = #user{username = Username},
- vhost = VHost}} = State,
- WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
- ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/6,
- [Channel, self(), WriterPid, Username, VHost, Collector]),
- put({channel, Channel}, {chpid, ChPid}),
- put({chpid, ChPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
+send_to_new_channel(Channel, AnalyzedFrame, State) ->
+ #v1{sock = Sock, queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = #user{username = Username},
+ vhost = VHost}} = State,
+ {ok, ChSupPid, ChFrPid} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {Protocol, Sock, Channel, FrameMax,
+ self(), Username, VHost, Collector}),
+ erlang:monitor(process, ChSupPid),
+ put({channel, Channel}, {ch_fr_pid, ChFrPid}),
+ put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}),
+ put({ch_fr_pid, ChFrPid}, {channel, Channel}),
+ ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame).
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",
@@ -788,25 +884,27 @@ handle_exception(State = #v1{connection_state = CS}, Channel, Reason) ->
log_channel_error(CS, Channel, Reason),
send_exception(State, Channel, Reason).
-send_exception(State, Channel, Reason) ->
- {ShouldClose, CloseChannel, CloseMethod} = map_exception(Channel, Reason),
+send_exception(State = #v1{connection = #connection{protocol = Protocol}},
+ Channel, Reason) ->
+ {ShouldClose, CloseChannel, CloseMethod} =
+ map_exception(Channel, Reason, Protocol),
NewState = case ShouldClose of
true -> terminate_channels(),
false -> close_channel(Channel, State)
ok = rabbit_writer:internal_send_command(
- NewState#v1.sock, CloseChannel, CloseMethod),
+ NewState#v1.sock, CloseChannel, CloseMethod, Protocol),
-map_exception(Channel, Reason) ->
+map_exception(Channel, Reason, Protocol) ->
{SuggestedClose, ReplyCode, ReplyText, FailedMethod} =
- lookup_amqp_exception(Reason),
+ lookup_amqp_exception(Reason, Protocol),
ShouldClose = SuggestedClose or (Channel == 0),
{ClassId, MethodId} = case FailedMethod of
{_, _} -> FailedMethod;
- none -> {0, 0};
- _ -> rabbit_framing:method_id(FailedMethod)
+ none -> {0, 0};
+ _ -> Protocol:method_id(FailedMethod)
{CloseChannel, CloseMethod} =
case ShouldClose of
@@ -821,22 +919,16 @@ map_exception(Channel, Reason) ->
{ShouldClose, CloseChannel, CloseMethod}.
-%% FIXME: this clause can go when we move to AMQP spec >=8.1
-lookup_amqp_exception(#amqp_error{name = precondition_failed,
- explanation = Expl,
- method = Method}) ->
- ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl),
- {false, 406, ExplBin, Method};
lookup_amqp_exception(#amqp_error{name = Name,
explanation = Expl,
- method = Method}) ->
- {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name),
+ method = Method},
+ Protocol) ->
+ {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name),
ExplBin = amqp_exception_explanation(Text, Expl),
{ShouldClose, Code, ExplBin, Method};
-lookup_amqp_exception(Other) ->
+lookup_amqp_exception(Other, Protocol) ->
rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]),
- {ShouldClose, Code, Text} =
- rabbit_framing:lookup_amqp_exception(internal_error),
+ {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error),
{ShouldClose, Code, Text, none}.
amqp_exception_explanation(Text, Expl) ->
@@ -845,3 +937,6 @@ amqp_exception_explanation(Text, Expl) ->
if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>;
true -> CompleteTextBin
+internal_emit_stats(State) ->
+ rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)).