summaryrefslogtreecommitdiff
path: root/src/rabbit_reader.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_reader.erl')
-rw-r--r--src/rabbit_reader.erl819
1 files changed, 482 insertions, 337 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 6f12e22e..8106a46a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -10,8 +10,8 @@
%%
%% The Original Code is RabbitMQ.
%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
%%
-module(rabbit_reader).
@@ -23,7 +23,7 @@
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/4, mainloop/2]).
+-export([init/4, mainloop/2, recvloop/2]).
-export([conserve_resources/3, server_properties/1]).
@@ -37,21 +37,27 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state, conserve_resources,
- last_blocked_by, last_blocked_at}).
+ ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun,
+ buf, buf_len, throttle}).
+
+-record(connection, {name, host, peer_host, port, peer_port,
+ protocol, user, timeout_sec, frame_max, vhost,
+ client_properties, capabilities,
+ auth_mechanism, auth_state}).
+
+-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at,
+ blocked_sent}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, last_blocked_by, last_blocked_age,
channels]).
--define(CREATION_EVENT_KEYS, [pid, name, address, port, peer_address, peer_port,
- ssl, peer_cert_subject, peer_cert_issuer,
- peer_cert_validity, auth_mechanism,
- ssl_protocol, ssl_key_exchange,
- ssl_cipher, ssl_hash,
- protocol, user, vhost, timeout, frame_max,
- client_properties]).
+-define(CREATION_EVENT_KEYS,
+ [pid, name, port, peer_port, host,
+ peer_host, ssl, peer_cert_subject, peer_cert_issuer,
+ peer_cert_validity, auth_mechanism, ssl_protocol,
+ ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
+ timeout, frame_max, client_properties]).
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
@@ -60,6 +66,10 @@
State#v1.connection_state =:= blocking orelse
State#v1.connection_state =:= blocked)).
+-define(IS_STOPPING(State),
+ (State#v1.connection_state =:= closing orelse
+ State#v1.connection_state =:= closed)).
+
%%--------------------------------------------------------------------------
-ifdef(use_specs).
@@ -94,24 +104,24 @@
%%--------------------------------------------------------------------------
-start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) ->
- {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid,
+start_link(ChannelSup3Pid, Collector, StartHeartbeatFun) ->
+ {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSup3Pid,
Collector, StartHeartbeatFun])}.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) ->
+init(Parent, ChSup3Pid, Collector, StartHeartbeatFun) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
start_connection(
- Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock,
+ Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock,
SockTransform)
end.
system_continue(Parent, Deb, State) ->
- ?MODULE:mainloop(Deb, State#v1{parent = Parent}).
+ mainloop(Deb, State#v1{parent = Parent}).
system_terminate(Reason, _Parent, _Deb, _State) ->
exit(Reason).
@@ -133,8 +143,8 @@ info(Pid, Items) ->
force_event_refresh(Pid) ->
gen_server:cast(Pid, force_event_refresh).
-conserve_resources(Pid, _Source, Conserve) ->
- Pid ! {conserve_resources, Conserve},
+conserve_resources(Pid, Source, Conserve) ->
+ Pid ! {conserve_resources, Source, Conserve},
ok.
server_properties(Protocol) ->
@@ -169,84 +179,110 @@ server_capabilities(rabbit_framing_amqp_0_9_1) ->
[{<<"publisher_confirms">>, bool, true},
{<<"exchange_exchange_bindings">>, bool, true},
{<<"basic.nack">>, bool, true},
- {<<"consumer_cancel_notify">>, bool, true}];
+ {<<"consumer_cancel_notify">>, bool, true},
+ {<<"connection.blocked">>, bool, true}];
server_capabilities(_) ->
[].
+%%--------------------------------------------------------------------------
+
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
+socket_error(Reason) ->
+ log(error, "error on AMQP connection ~p: ~p (~s)~n",
+ [self(), Reason, rabbit_misc:format_inet_error(Reason)]).
+
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
socket_op(Sock, Fun) ->
case Fun(Sock) of
{ok, Res} -> Res;
- {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n",
- [self(), Reason]),
+ {error, Reason} -> socket_error(Reason),
+ %% NB: this is tcp socket, even in case of ssl
+ rabbit_net:fast_close(Sock),
exit(normal)
end.
-name(Sock) ->
- socket_op(Sock, fun (S) -> rabbit_net:connection_string(S, inbound) end).
-
-start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
+start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
- ConnStr = name(Sock),
- log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]),
+ Name = case rabbit_net:connection_string(Sock, inbound) of
+ {ok, Str} -> Str;
+ {error, enotconn} -> rabbit_net:fast_close(Sock),
+ exit(normal);
+ {error, Reason} -> socket_error(Reason),
+ rabbit_net:fast_close(Sock),
+ exit(normal)
+ end,
+ log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]),
ClientSock = socket_op(Sock, SockTransform),
- erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
- handshake_timeout),
+ erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
+ {PeerHost, PeerPort, Host, Port} =
+ socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
State = #v1{parent = Parent,
sock = ClientSock,
connection = #connection{
+ name = list_to_binary(Name),
+ host = Host,
+ peer_host = PeerHost,
+ port = Port,
+ peer_port = PeerPort,
protocol = none,
user = none,
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
client_properties = none,
- capabilities = []},
+ capabilities = [],
+ auth_mechanism = none,
+ auth_state = none},
callback = uninitialized_callback,
recv_len = 0,
pending_recv = false,
connection_state = pre_init,
queue_collector = Collector,
heartbeater = none,
- channel_sup_sup_pid = ChannelSupSupPid,
+ ch_sup3_pid = ChSup3Pid,
+ channel_sup_sup_pid = none,
start_heartbeat_fun = StartHeartbeatFun,
buf = [],
buf_len = 0,
- auth_mechanism = none,
- auth_state = none,
- conserve_resources = false,
- last_blocked_by = none,
- last_blocked_at = never},
+ throttle = #throttle{
+ alarmed_by = [],
+ last_blocked_by = none,
+ last_blocked_at = never,
+ blocked_sent = false}},
try
- ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
- recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
- State, #v1.stats_timer),
- handshake, 8)),
- log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr])
+ run({?MODULE, recvloop,
+ [Deb, switch_callback(rabbit_event:init_stats_timer(
+ State, #v1.stats_timer),
+ handshake, 8)]}),
+ log(info, "closing AMQP connection ~p (~s)~n", [self(), Name])
catch
Ex -> log(case Ex of
connection_closed_abruptly -> warning;
_ -> error
end, "closing AMQP connection ~p (~s):~n~p~n",
- [self(), ConnStr, Ex])
+ [self(), Name, Ex])
after
- %% The reader is the controlling process and hence its
- %% termination will close the socket. Furthermore,
- %% gen_tcp:close/1 waits for pending output to be sent, which
- %% results in unnecessary delays. However, to keep the
- %% file_handle_cache accounting as accurate as possible it
- %% would be good to close the socket immediately if we
- %% can. But we can only do this for non-ssl sockets.
- %%
- rabbit_net:maybe_fast_close(ClientSock),
+ %% We don't call gen_tcp:close/1 here since it waits for
+ %% pending output to be sent, which results in unnecessary
+ %% delays. We could just terminate - the reader is the
+ %% controlling process and hence its termination will close
+ %% the socket. However, to keep the file_handle_cache
+ %% accounting as accurate as possible we ought to close the
+ %% socket w/o delay before termination.
+ rabbit_net:fast_close(ClientSock),
+ rabbit_networking:unregister_connection(self()),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
+run({M, F, A}) ->
+ try apply(M, F, A)
+ catch {become, MFA} -> run(MFA)
+ end.
+
recvloop(Deb, State = #v1{pending_recv = true}) ->
mainloop(Deb, State);
recvloop(Deb, State = #v1{connection_state = blocked}) ->
@@ -276,18 +312,32 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
_ -> throw(connection_closed_abruptly)
end;
{inet_async, _Sock, _Ref, {error, Reason}} ->
+ maybe_emit_stats(State),
throw({inet_error, Reason});
+ {system, From, Request} ->
+ sys:handle_system_msg(Request, From, State#v1.parent,
+ ?MODULE, Deb, State);
Other ->
- handle_other(Other, Deb, State)
+ case handle_other(Other, State) of
+ stop -> ok;
+ NewState -> recvloop(Deb, NewState)
+ end
end.
-handle_other({conserve_resources, Conserve}, Deb, State) ->
- recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve}));
-handle_other({channel_closing, ChPid}, Deb, State) ->
+handle_other({conserve_resources, Source, Conserve},
+ State = #v1{throttle = Throttle =
+ #throttle{alarmed_by = CR}}) ->
+ CR1 = case Conserve of
+ true -> lists:usort([Source | CR]);
+ false -> CR -- [Source]
+ end,
+ Throttle1 = Throttle#throttle{alarmed_by = CR1},
+ control_throttle(State#v1{throttle = Throttle1});
+handle_other({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
- mainloop(Deb, maybe_close(control_throttle(State)));
-handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
+ maybe_close(control_throttle(State));
+handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
%% this is what we are expected to do according to
@@ -298,94 +348,133 @@ handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
%% ordinary error case. However, since this termination is
%% initiated by our parent it is probably more important to exit
%% quickly.
+ maybe_emit_stats(State),
exit(Reason);
-handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}},
- _Deb, _State) ->
+handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, State) ->
+ maybe_emit_stats(State),
throw(E);
-handle_other({channel_exit, Channel, Reason}, Deb, State) ->
- mainloop(Deb, handle_exception(State, Channel, Reason));
-handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) ->
- mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
-handle_other(terminate_connection, _Deb, State) ->
+handle_other({channel_exit, Channel, Reason}, State) ->
+ handle_exception(State, Channel, Reason);
+handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) ->
+ handle_dependent_exit(ChPid, Reason, State);
+handle_other(terminate_connection, State) ->
+ maybe_emit_stats(State),
+ stop;
+handle_other(handshake_timeout, State)
+ when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) ->
State;
-handle_other(handshake_timeout, Deb, State)
- when ?IS_RUNNING(State) orelse
- State#v1.connection_state =:= closing orelse
- State#v1.connection_state =:= closed ->
- mainloop(Deb, State);
-handle_other(handshake_timeout, _Deb, State) ->
+handle_other(handshake_timeout, State) ->
+ maybe_emit_stats(State),
throw({handshake_timeout, State#v1.callback});
-handle_other(timeout, Deb, State = #v1{connection_state = closed}) ->
- mainloop(Deb, State);
-handle_other(timeout, _Deb, #v1{connection_state = S}) ->
- throw({timeout, S});
-handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) ->
+handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
+ State;
+handle_other(heartbeat_timeout, State = #v1{connection_state = S}) ->
+ maybe_emit_stats(State),
+ throw({heartbeat_timeout, S});
+handle_other({'$gen_call', From, {shutdown, Explanation}}, State) ->
{ForceTermination, NewState} = terminate(Explanation, State),
gen_server:reply(From, ok),
case ForceTermination of
- force -> ok;
- normal -> mainloop(Deb, NewState)
+ force -> stop;
+ normal -> NewState
end;
-handle_other({'$gen_call', From, info}, Deb, State) ->
+handle_other({'$gen_call', From, info}, State) ->
gen_server:reply(From, infos(?INFO_KEYS, State)),
- mainloop(Deb, State);
-handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
+ State;
+handle_other({'$gen_call', From, {info, Items}}, State) ->
gen_server:reply(From, try {ok, infos(Items, State)}
catch Error -> {error, Error}
end),
- mainloop(Deb, State);
-handle_other({'$gen_cast', force_event_refresh}, Deb, State)
+ State;
+handle_other({'$gen_cast', force_event_refresh}, State)
when ?IS_RUNNING(State) ->
rabbit_event:notify(connection_created,
[{type, network} | infos(?CREATION_EVENT_KEYS, State)]),
- mainloop(Deb, State);
-handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
+ State;
+handle_other({'$gen_cast', force_event_refresh}, State) ->
%% Ignore, we will emit a created event once we start running.
- mainloop(Deb, State);
-handle_other(emit_stats, Deb, State) ->
- mainloop(Deb, emit_stats(State));
-handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
- sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
-handle_other({bump_credit, Msg}, Deb, State) ->
+ State;
+handle_other(ensure_stats, State) ->
+ ensure_stats_timer(State);
+handle_other(emit_stats, State) ->
+ emit_stats(State);
+handle_other({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
- recvloop(Deb, control_throttle(State));
-handle_other(Other, _Deb, _State) ->
+ control_throttle(State);
+handle_other(Other, State) ->
%% internal error -> something worth dying for
+ maybe_emit_stats(State),
exit({unexpected_message, Other}).
switch_callback(State, Callback, Length) ->
State#v1{callback = Callback, recv_len = Length}.
terminate(Explanation, State) when ?IS_RUNNING(State) ->
- {normal, send_exception(State, 0,
- rabbit_misc:amqp_error(
- connection_forced, Explanation, [], none))};
+ {normal, handle_exception(State, 0,
+ rabbit_misc:amqp_error(
+ connection_forced, Explanation, [], none))};
terminate(_Explanation, State) ->
{force, State}.
-control_throttle(State = #v1{connection_state = CS,
- conserve_resources = Mem}) ->
- case {CS, Mem orelse credit_flow:blocked()} of
+control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
+ IsThrottled = ((Throttle#throttle.alarmed_by =/= []) orelse
+ credit_flow:blocked()),
+ case {CS, IsThrottled} of
{running, true} -> State#v1{connection_state = blocking};
{blocking, false} -> State#v1{connection_state = running};
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
State#v1.heartbeater),
- State#v1{connection_state = running};
- {blocked, true} -> update_last_blocked_by(State);
+ maybe_send_unblocked(State),
+ State#v1{connection_state = running,
+ throttle = Throttle#throttle{
+ blocked_sent = false}};
+ {blocked, true} -> State#v1{throttle = update_last_blocked_by(
+ Throttle)};
{_, _} -> State
end.
-maybe_block(State = #v1{connection_state = blocking}) ->
+maybe_block(State = #v1{connection_state = blocking,
+ throttle = Throttle}) ->
ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
- update_last_blocked_by(State#v1{connection_state = blocked,
- last_blocked_at = erlang:now()});
+ Sent = maybe_send_blocked(State),
+ State#v1{connection_state = blocked,
+ throttle = update_last_blocked_by(
+ Throttle#throttle{last_blocked_at = erlang:now(),
+ blocked_sent = Sent})};
maybe_block(State) ->
State.
-update_last_blocked_by(State = #v1{conserve_resources = true}) ->
- State#v1{last_blocked_by = resource};
-update_last_blocked_by(State = #v1{conserve_resources = false}) ->
- State#v1{last_blocked_by = flow}.
+maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = []}}) ->
+ false;
+maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = CR},
+ connection = #connection{
+ protocol = Protocol,
+ capabilities = Capabilities},
+ sock = Sock}) ->
+ case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
+ {bool, true} ->
+ RStr = string:join([atom_to_list(A) || A <- CR], " & "),
+ Reason = list_to_binary(rabbit_misc:format("low on ~s", [RStr])),
+ ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason},
+ Protocol),
+ true;
+ _ ->
+ false
+ end.
+
+maybe_send_unblocked(#v1{throttle = #throttle{blocked_sent = false}}) ->
+ ok;
+maybe_send_unblocked(#v1{connection = #connection{protocol = Protocol},
+ sock = Sock}) ->
+ ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol).
+
+update_last_blocked_by(Throttle = #throttle{alarmed_by = []}) ->
+ Throttle#throttle{last_blocked_by = flow};
+update_last_blocked_by(Throttle) ->
+ Throttle#throttle{last_blocked_by = resource}.
+
+%%--------------------------------------------------------------------------
+%% error handling / termination
close_connection(State = #v1{queue_collector = Collector,
connection = #connection{
@@ -405,29 +494,15 @@ close_connection(State = #v1{queue_collector = Collector,
handle_dependent_exit(ChPid, Reason, State) ->
case {channel_cleanup(ChPid), termination_kind(Reason)} of
- {undefined, uncontrolled} ->
- exit({abnormal_dependent_exit, ChPid, Reason});
- {_Channel, controlled} ->
- maybe_close(control_throttle(State));
- {Channel, uncontrolled} ->
- log(error, "AMQP connection ~p, channel ~p - error:~n~p~n",
- [self(), Channel, Reason]),
- maybe_close(handle_exception(control_throttle(State),
- Channel, Reason))
- end.
-
-channel_cleanup(ChPid) ->
- case get({ch_pid, ChPid}) of
- undefined -> undefined;
- {Channel, MRef} -> credit_flow:peer_down(ChPid),
- erase({channel, Channel}),
- erase({ch_pid, ChPid}),
- erlang:demonitor(MRef, [flush]),
- Channel
+ {undefined, controlled} -> State;
+ {undefined, uncontrolled} -> exit({abnormal_dependent_exit,
+ ChPid, Reason});
+ {_Channel, controlled} -> maybe_close(control_throttle(State));
+ {Channel, uncontrolled} -> State1 = handle_exception(
+ State, Channel, Reason),
+ maybe_close(control_throttle(State1))
end.
-all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
-
terminate_channels() ->
NChannels =
length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
@@ -481,78 +556,179 @@ maybe_close(State) ->
termination_kind(normal) -> controlled;
termination_kind(_) -> uncontrolled.
+handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
+ log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
+ [self(), closed, Channel, Reason]),
+ State;
+handle_exception(State = #v1{connection = #connection{protocol = Protocol},
+ connection_state = CS},
+ Channel, Reason)
+ when ?IS_RUNNING(State) orelse CS =:= closing ->
+ log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
+ [self(), CS, Channel, Reason]),
+ {0, CloseMethod} =
+ rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
+ terminate_channels(),
+ State1 = close_connection(State),
+ ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
+ State1;
+handle_exception(State, Channel, Reason) ->
+ %% We don't trust the client at this point - force them to wait
+ %% for a bit so they can't DOS us with repeated failed logins etc.
+ timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({handshake_error, State#v1.connection_state, Channel, Reason}).
+
+%% we've "lost sync" with the client and hence must not accept any
+%% more input
+fatal_frame_error(Error, Type, Channel, Payload, State) ->
+ frame_error(Error, Type, Channel, Payload, State),
+ %% grace period to allow transmission of error
+ timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw(fatal_frame_error).
+
+frame_error(Error, Type, Channel, Payload, State) ->
+ {Str, Bin} = payload_snippet(Payload),
+ handle_exception(State, Channel,
+ rabbit_misc:amqp_error(frame_error,
+ "type ~p, ~s octets = ~p: ~p",
+ [Type, Str, Bin, Error], none)).
+
+unexpected_frame(Type, Channel, Payload, State) ->
+ {Str, Bin} = payload_snippet(Payload),
+ handle_exception(State, Channel,
+ rabbit_misc:amqp_error(unexpected_frame,
+ "type ~p, ~s octets = ~p",
+ [Type, Str, Bin], none)).
+
+payload_snippet(Payload) when size(Payload) =< 16 ->
+ {"all", Payload};
+payload_snippet(<<Snippet:16/binary, _/binary>>) ->
+ {"first 16", Snippet}.
+
+%%--------------------------------------------------------------------------
+
+create_channel(Channel, State) ->
+ #v1{sock = Sock, queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ connection = #connection{name = Name,
+ protocol = Protocol,
+ frame_max = FrameMax,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities}} = State,
+ {ok, _ChSupPid, {ChPid, AState}} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
+ Protocol, User, VHost, Capabilities, Collector}),
+ MRef = erlang:monitor(process, ChPid),
+ put({ch_pid, ChPid}, {Channel, MRef}),
+ put({channel, Channel}, {ChPid, AState}),
+ {ChPid, AState}.
+
+channel_cleanup(ChPid) ->
+ case get({ch_pid, ChPid}) of
+ undefined -> undefined;
+ {Channel, MRef} -> credit_flow:peer_down(ChPid),
+ erase({channel, Channel}),
+ erase({ch_pid, ChPid}),
+ erlang:demonitor(MRef, [flush]),
+ Channel
+ end.
+
+all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
+
+%%--------------------------------------------------------------------------
+
handle_frame(Type, 0, Payload,
- State = #v1{connection_state = CS,
- connection = #connection{protocol = Protocol}})
- when CS =:= closing; CS =:= closed ->
+ State = #v1{connection = #connection{protocol = Protocol}})
+ when ?IS_STOPPING(State) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
_Other -> State
end;
-handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
- when CS =:= closing; CS =:= closed ->
- State;
handle_frame(Type, 0, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
- error -> throw({unknown_frame, 0, Type, Payload});
+ error -> frame_error(unknown_frame, Type, 0, Payload, State);
heartbeat -> State;
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
- Other -> throw({unexpected_frame_on_channel0, Other})
+ _Other -> unexpected_frame(Type, 0, Payload, State)
end;
handle_frame(Type, Channel, Payload,
- State = #v1{connection = #connection{protocol = Protocol}}) ->
+ State = #v1{connection = #connection{protocol = Protocol}})
+ when ?IS_RUNNING(State) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
- error -> throw({unknown_frame, Channel, Type, Payload});
- heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, State)
- end.
+ error -> frame_error(unknown_frame, Type, Channel, Payload, State);
+ heartbeat -> unexpected_frame(Type, Channel, Payload, State);
+ Frame -> process_frame(Frame, Channel, State)
+ end;
+handle_frame(_Type, _Channel, _Payload, State) when ?IS_STOPPING(State) ->
+ State;
+handle_frame(Type, Channel, Payload, State) ->
+ unexpected_frame(Type, Channel, Payload, State).
process_frame(Frame, Channel, State) ->
- case get({channel, Channel}) of
- {ChPid, AState} ->
- case process_channel_frame(Frame, ChPid, AState) of
- {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {error, Reason} -> handle_exception(State, Channel, Reason)
- end;
- undefined when ?IS_RUNNING(State) ->
- ok = create_channel(Channel, State),
- process_frame(Frame, Channel, State);
- undefined ->
- throw({channel_frame_while_starting,
- Channel, State#v1.connection_state, Frame})
+ ChKey = {channel, Channel},
+ {ChPid, AState} = case get(ChKey) of
+ undefined -> create_channel(Channel, State);
+ Other -> Other
+ end,
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} ->
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, NewAState} ->
+ rabbit_channel:do(ChPid, Method),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, Content, NewAState} ->
+ rabbit_channel:do_flow(ChPid, Method, Content),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, control_throttle(State));
+ {error, Reason} ->
+ handle_exception(State, Channel, Reason)
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
channel_cleanup(ChPid),
+ %% This is not strictly necessary, but more obviously
+ %% correct. Also note that we do not need to call maybe_close/1
+ %% since we cannot possibly be in the 'closing' state.
control_throttle(State);
-post_process_frame({method, MethodName, _}, _ChPid,
- State = #v1{connection = #connection{
- protocol = Protocol}}) ->
- case Protocol:method_has_content(MethodName) of
- true -> erlang:bump_reductions(2000),
- maybe_block(control_throttle(State));
- false -> control_throttle(State)
- end;
+post_process_frame({content_header, _, _, _, _}, _ChPid, State) ->
+ maybe_block(State);
+post_process_frame({content_body, _}, _ChPid, State) ->
+ maybe_block(State);
post_process_frame(_Frame, _ChPid, State) ->
- control_throttle(State).
+ State.
+
+%%--------------------------------------------------------------------------
+%% We allow clients to exceed the frame size a little bit since quite
+%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical.
+-define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE).
+
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>,
+ State = #v1{connection = #connection{frame_max = FrameMax}})
+ when FrameMax /= 0 andalso
+ PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE ->
+ fatal_frame_error(
+ {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE},
+ Type, Channel, <<>>, State);
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
PayloadSize + 1));
-handle_input({frame_payload, Type, Channel, PayloadSize},
- PayloadAndMarker, State) ->
- case PayloadAndMarker of
- <<Payload:PayloadSize/binary, ?FRAME_END>> ->
- switch_callback(handle_frame(Type, Channel, Payload, State),
- frame_header, 7);
- _ ->
- throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker})
+handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
+ <<Payload:PayloadSize/binary, EndMarker>> = Data,
+ case EndMarker of
+ ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
+ switch_callback(State1, frame_header, 7);
+ _ -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
+ Type, Channel, Payload, State)
end;
%% The two rules pertaining to version negotiation:
@@ -582,8 +758,12 @@ handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) ->
handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) ->
start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+%% ... and finally, the 1.0 spec is crystal clear! Note that the
+handle_input(handshake, <<"AMQP", Id, 1, 0, 0>>, State) ->
+ become_1_0(Id, State);
+
handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) ->
- refuse_connection(Sock, {bad_version, A, B, C, D});
+ refuse_connection(Sock, {bad_version, {A, B, C, D}});
handle_input(handshake, Other, #v1{sock = Sock}) ->
refuse_connection(Sock, {bad_header, Other});
@@ -597,6 +777,7 @@ handle_input(Callback, Data, _State) ->
start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Protocol,
State = #v1{sock = Sock, connection = Connection}) ->
+ rabbit_networking:register_connection(self()),
Start = #'connection.start'{
version_major = ProtocolMajor,
version_minor = ProtocolMinor,
@@ -610,10 +791,16 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
connection_state = starting},
frame_header, 7).
-refuse_connection(Sock, Exception) ->
- ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
+refuse_connection(Sock, Exception, {A, B, C, D}) ->
+ ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",A,B,C,D>>) end),
throw(Exception).
+-ifdef(use_specs).
+-spec(refuse_connection/2 :: (rabbit_net:socket(), any()) -> no_return()).
+-endif.
+refuse_connection(Sock, Exception) ->
+ refuse_connection(Sock, Exception, {0, 0, 9, 1}).
+
ensure_stats_timer(State = #v1{connection_state = running}) ->
rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats);
ensure_stats_timer(State) ->
@@ -623,24 +810,14 @@ ensure_stats_timer(State) ->
handle_method0(MethodName, FieldsBin,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- HandleException =
- fun(R) ->
- case ?IS_RUNNING(State) of
- true -> send_exception(State, 0, R);
- %% We don't trust the client at this point - force
- %% them to wait for a bit so they can't DOS us with
- %% repeated failed logins etc.
- false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, State#v1.connection_state, R})
- end
- end,
try
handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
State)
catch exit:#amqp_error{method = none} = Reason ->
- HandleException(Reason#amqp_error{method = MethodName});
+ handle_exception(State, 0, Reason#amqp_error{method = MethodName});
Type:Reason ->
- HandleException({Type, Reason, MethodName, erlang:get_stacktrace()})
+ Stack = erlang:get_stacktrace(),
+ handle_exception(State, 0, {Type, Reason, MethodName, Stack})
end.
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
@@ -655,13 +832,13 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
{table, Capabilities1} -> Capabilities1;
_ -> []
end,
- State = State0#v1{auth_mechanism = AuthMechanism,
- auth_state = AuthMechanism:init(Sock),
- connection_state = securing,
+ State = State0#v1{connection_state = securing,
connection =
Connection#connection{
client_properties = ClientProperties,
- capabilities = Capabilities}},
+ capabilities = Capabilities,
+ auth_mechanism = {Mechanism, AuthMechanism},
+ auth_state = AuthMechanism:init(Sock)}},
auth_phase(Response, State);
handle_method0(#'connection.secure_ok'{response = Response},
@@ -687,7 +864,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
Frame = rabbit_binary_generator:build_heartbeat_frame(),
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
- ReceiveFun = fun() -> Parent ! timeout end,
+ ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
Heartbeater = SHF(Sock, ClientHeartbeat, SendFun,
ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
@@ -699,32 +876,39 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
- connection = Connection = #connection{
- user = User,
- protocol = Protocol},
- sock = Sock}) ->
+ connection = Connection = #connection{
+ user = User,
+ protocol = Protocol},
+ ch_sup3_pid = ChSup3Pid,
+ sock = Sock,
+ throttle = Throttle}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
+ Throttle1 = Throttle#throttle{alarmed_by = Conserve},
+ {ok, ChannelSupSupPid} =
+ supervisor2:start_child(
+ ChSup3Pid,
+ {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
State1 = control_throttle(
- State#v1{connection_state = running,
- connection = NewConnection,
- conserve_resources = Conserve}),
+ State#v1{connection_state = running,
+ connection = NewConnection,
+ channel_sup_sup_pid = ChannelSupSupPid,
+ throttle = Throttle1}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
- rabbit_event:if_enabled(State1, #v1.stats_timer,
- fun() -> emit_stats(State1) end),
+ maybe_emit_stats(State1),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
handle_method0(#'connection.close'{},
- State = #v1{connection_state = CS,
- connection = #connection{protocol = Protocol},
+ State = #v1{connection = #connection{protocol = Protocol},
sock = Sock})
- when CS =:= closing; CS =:= closed ->
+ when ?IS_STOPPING(State) ->
%% We're already closed or closing, so we don't need to cleanup
%% anything.
ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
@@ -733,19 +917,20 @@ handle_method0(#'connection.close_ok'{},
State = #v1{connection_state = closed}) ->
self() ! terminate_connection,
State;
-handle_method0(_Method, State = #v1{connection_state = CS})
- when CS =:= closing; CS =:= closed ->
+handle_method0(_Method, State) when ?IS_STOPPING(State) ->
State;
handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
-%% Compute frame_max for this instance. Could simply use 0, but breaks
-%% QPid Java client.
server_frame_max() ->
{ok, FrameMax} = application:get_env(rabbit, frame_max),
FrameMax.
+server_heartbeat() ->
+ {ok, Heartbeat} = application:get_env(rabbit, heartbeat),
+ Heartbeat.
+
send_on_channel0(Sock, Method, Protocol) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
@@ -777,115 +962,86 @@ auth_mechanisms_binary(Sock) ->
string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")).
auth_phase(Response,
- State = #v1{auth_mechanism = AuthMechanism,
- auth_state = AuthState,
- connection = Connection =
- #connection{protocol = Protocol},
+ State = #v1{connection = Connection =
+ #connection{protocol = Protocol,
+ auth_mechanism = {Name, AuthMechanism},
+ auth_state = AuthState},
sock = Sock}) ->
case AuthMechanism:handle_response(Response, AuthState) of
{refused, Msg, Args} ->
rabbit_misc:protocol_error(
access_refused, "~s login refused: ~s",
- [proplists:get_value(name, AuthMechanism:description()),
- io_lib:format(Msg, Args)]);
+ [Name, io_lib:format(Msg, Args)]);
{protocol_error, Msg, Args} ->
rabbit_misc:protocol_error(syntax_error, Msg, Args);
{challenge, Challenge, AuthState1} ->
Secure = #'connection.secure'{challenge = Challenge},
ok = send_on_channel0(Sock, Secure, Protocol),
- State#v1{auth_state = AuthState1};
+ State#v1{connection = Connection#connection{
+ auth_state = AuthState1}};
{ok, User} ->
Tune = #'connection.tune'{channel_max = 0,
frame_max = server_frame_max(),
- heartbeat = 0},
+ heartbeat = server_heartbeat()},
ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
- connection = Connection#connection{user = User}}
+ connection = Connection#connection{user = User,
+ auth_state = none}}
end.
%%--------------------------------------------------------------------------
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-i(pid, #v1{}) ->
- self();
-i(name, #v1{sock = Sock}) ->
- list_to_binary(name(Sock));
-i(address, #v1{sock = Sock}) ->
- socket_info(fun rabbit_net:sockname/1, fun ({A, _}) -> A end, Sock);
-i(port, #v1{sock = Sock}) ->
- socket_info(fun rabbit_net:sockname/1, fun ({_, P}) -> P end, Sock);
-i(peer_address, #v1{sock = Sock}) ->
- socket_info(fun rabbit_net:peername/1, fun ({A, _}) -> A end, Sock);
-i(peer_port, #v1{sock = Sock}) ->
- socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock);
-i(ssl, #v1{sock = Sock}) ->
- rabbit_net:is_ssl(Sock);
-i(ssl_protocol, #v1{sock = Sock}) ->
- ssl_info(fun ({P, _}) -> P end, Sock);
-i(ssl_key_exchange, #v1{sock = Sock}) ->
- ssl_info(fun ({_, {K, _, _}}) -> K end, Sock);
-i(ssl_cipher, #v1{sock = Sock}) ->
- ssl_info(fun ({_, {_, C, _}}) -> C end, Sock);
-i(ssl_hash, #v1{sock = Sock}) ->
- ssl_info(fun ({_, {_, _, H}}) -> H end, Sock);
-i(peer_cert_issuer, #v1{sock = Sock}) ->
- cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock);
-i(peer_cert_subject, #v1{sock = Sock}) ->
- cert_info(fun rabbit_ssl:peer_cert_subject/1, Sock);
-i(peer_cert_validity, #v1{sock = Sock}) ->
- cert_info(fun rabbit_ssl:peer_cert_validity/1, Sock);
-i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
- SockStat =:= recv_cnt;
- SockStat =:= send_oct;
- SockStat =:= send_cnt;
- SockStat =:= send_pend ->
- socket_info(fun () -> rabbit_net:getstat(Sock, [SockStat]) end,
- fun ([{_, I}]) -> I end);
-i(state, #v1{connection_state = S}) ->
- S;
-i(last_blocked_by, #v1{last_blocked_by = By}) ->
- By;
-i(last_blocked_age, #v1{last_blocked_at = never}) ->
+i(pid, #v1{}) -> self();
+i(SockStat, S) when SockStat =:= recv_oct;
+ SockStat =:= recv_cnt;
+ SockStat =:= send_oct;
+ SockStat =:= send_cnt;
+ SockStat =:= send_pend ->
+ socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end,
+ fun ([{_, I}]) -> I end, S);
+i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock);
+i(ssl_protocol, S) -> ssl_info(fun ({P, _}) -> P end, S);
+i(ssl_key_exchange, S) -> ssl_info(fun ({_, {K, _, _}}) -> K end, S);
+i(ssl_cipher, S) -> ssl_info(fun ({_, {_, C, _}}) -> C end, S);
+i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S);
+i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
+i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
+i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
+i(state, #v1{connection_state = CS}) -> CS;
+i(last_blocked_by, #v1{throttle = #throttle{last_blocked_by = By}}) -> By;
+i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) ->
infinity;
-i(last_blocked_age, #v1{last_blocked_at = T}) ->
+i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) ->
timer:now_diff(erlang:now(), T) / 1000000;
-i(channels, #v1{}) ->
- length(all_channels());
-i(protocol, #v1{connection = #connection{protocol = none}}) ->
- none;
-i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
- Protocol:version();
-i(auth_mechanism, #v1{auth_mechanism = none}) ->
- none;
-i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
- proplists:get_value(name, Mechanism:description());
-i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
- Username;
-i(user, #v1{connection = #connection{user = none}}) ->
- '';
-i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
- VHost;
-i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
- Timeout;
-i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
- FrameMax;
-i(client_properties, #v1{connection = #connection{
- client_properties = ClientProperties}}) ->
- ClientProperties;
-i(Item, #v1{}) ->
- throw({bad_argument, Item}).
-
-socket_info(Get, Select, Sock) ->
- socket_info(fun() -> Get(Sock) end, Select).
-
-socket_info(Get, Select) ->
- case Get() of
+i(channels, #v1{}) -> length(all_channels());
+i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
+
+ic(name, #connection{name = Name}) -> Name;
+ic(host, #connection{host = Host}) -> Host;
+ic(peer_host, #connection{peer_host = PeerHost}) -> PeerHost;
+ic(port, #connection{port = Port}) -> Port;
+ic(peer_port, #connection{peer_port = PeerPort}) -> PeerPort;
+ic(protocol, #connection{protocol = none}) -> none;
+ic(protocol, #connection{protocol = P}) -> P:version();
+ic(user, #connection{user = none}) -> '';
+ic(user, #connection{user = U}) -> U#user.username;
+ic(vhost, #connection{vhost = VHost}) -> VHost;
+ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout;
+ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax;
+ic(client_properties, #connection{client_properties = CP}) -> CP;
+ic(auth_mechanism, #connection{auth_mechanism = none}) -> none;
+ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name;
+ic(Item, #connection{}) -> throw({bad_argument, Item}).
+
+socket_info(Get, Select, #v1{sock = Sock}) ->
+ case Get(Sock) of
{ok, T} -> Select(T);
{error, _} -> ''
end.
-ssl_info(F, Sock) ->
+ssl_info(F, #v1{sock = Sock}) ->
%% The first ok form is R14
%% The second is R13 - the extra term is exportability (by inspection,
%% the docs are wrong)
@@ -896,58 +1052,47 @@ ssl_info(F, Sock) ->
{ok, {P, {K, C, H, _}}} -> F({P, {K, C, H}})
end.
-cert_info(F, Sock) ->
+cert_info(F, #v1{sock = Sock}) ->
case rabbit_net:peercert(Sock) of
nossl -> '';
{error, no_peercert} -> '';
{ok, Cert} -> list_to_binary(F(Cert))
end.
-%%--------------------------------------------------------------------------
-
-create_channel(Channel, State) ->
- #v1{sock = Sock, queue_collector = Collector,
- channel_sup_sup_pid = ChanSupSup,
- connection = #connection{protocol = Protocol,
- frame_max = FrameMax,
- user = User,
- vhost = VHost,
- capabilities = Capabilities}} = State,
- {ok, _ChSupPid, {ChPid, AState}} =
- rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock),
- Protocol, User, VHost, Capabilities, Collector}),
- MRef = erlang:monitor(process, ChPid),
- put({ch_pid, ChPid}, {Channel, MRef}),
- put({channel, Channel}, {ChPid, AState}),
- ok.
-
-process_channel_frame(Frame, ChPid, AState) ->
- case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} -> {ok, NewAState};
- {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
- {ok, NewAState};
- {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
- ChPid, Method, Content),
- {ok, NewAState};
- {error, Reason} -> {error, Reason}
- end.
-
-handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
- State;
-handle_exception(State, Channel, Reason) ->
- send_exception(State, Channel, Reason).
-
-send_exception(State = #v1{connection = #connection{protocol = Protocol}},
- Channel, Reason) ->
- {0, CloseMethod} =
- rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
- terminate_channels(),
- State1 = close_connection(State),
- ok = rabbit_writer:internal_send_command(
- State1#v1.sock, 0, CloseMethod, Protocol),
- State1.
+maybe_emit_stats(State) ->
+ rabbit_event:if_enabled(State, #v1.stats_timer,
+ fun() -> emit_stats(State) end).
emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
rabbit_event:reset_stats_timer(State, #v1.stats_timer).
+
+%% 1.0 stub
+-ifdef(use_specs).
+-spec(become_1_0/2 :: (non_neg_integer(), #v1{}) -> no_return()).
+-endif.
+become_1_0(Id, State = #v1{sock = Sock}) ->
+ case code:is_loaded(rabbit_amqp1_0_reader) of
+ false -> refuse_connection(Sock, amqp1_0_plugin_not_enabled);
+ _ -> Mode = case Id of
+ 0 -> amqp;
+ 3 -> sasl;
+ _ -> refuse_connection(
+ Sock, {unsupported_amqp1_0_protocol_id, Id},
+ {3, 1, 0, 0})
+ end,
+ throw({become, {rabbit_amqp1_0_reader, init,
+ [Mode, pack_for_1_0(State)]}})
+ end.
+
+pack_for_1_0(#v1{parent = Parent,
+ sock = Sock,
+ recv_len = RecvLen,
+ pending_recv = PendingRecv,
+ queue_collector = QueueCollector,
+ ch_sup3_pid = ChSup3Pid,
+ start_heartbeat_fun = SHF,
+ buf = Buf,
+ buf_len = BufLen}) ->
+ {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ChSup3Pid, SHF,
+ Buf, BufLen}.