path: root/src/rabbit_reader.erl
diff options
Diffstat (limited to 'src/rabbit_reader.erl')
1 files changed, 119 insertions, 101 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 3e03ae0c..9603faf5 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -41,7 +41,7 @@
@@ -53,9 +53,7 @@
-define(CLOSING_TIMEOUT, 1).
-%% set to zero once QPid fix their negotiation
--define(FRAME_MAX, 131072).
--define(CHANNEL_MAX, 0).
+-define(FRAME_MAX, 131072). %% set to zero once QPid fix their negotiation
@@ -64,8 +62,8 @@
[pid, address, port, peer_address, peer_port,
- recv_oct, recv_cnt, send_oct, send_cnt, send_pend,
- state, channels, user, vhost, timeout, frame_max, client_properties]).
+ recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels,
+ protocol, user, vhost, timeout, frame_max, client_properties]).
%% connection lifecycle
@@ -140,11 +138,11 @@
--spec(info_keys/0 :: () -> [info_key()]).
--spec(info/1 :: (pid()) -> [info()]).
--spec(info/2 :: (pid(), [info_key()]) -> [info()]).
+-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
+-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
+-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
--spec(server_properties/0 :: () -> amqp_table()).
+-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
@@ -242,7 +240,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
ProfilingValue = setup_profiling(),
- {ok, Collector} = rabbit_reader_queue_collector:start_link(),
+ {ok, Collector} = rabbit_queue_collector:start_link(),
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
@@ -251,7 +249,8 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
- client_properties = none},
+ client_properties = none,
+ protocol = none},
callback = uninitialized_callback,
recv_ref = none,
connection_state = pre_init,
@@ -274,7 +273,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%% gen_tcp:close(ClientSock),
- rabbit_reader_queue_collector:shutdown(Collector),
+ rabbit_queue_collector:shutdown(Collector),
@@ -439,24 +438,28 @@ wait_for_channel_termination(N, TimerRef) ->
maybe_close(State = #v1{connection_state = closing,
- queue_collector = Collector}) ->
+ queue_collector = Collector,
+ connection = #connection{protocol = Protocol},
+ sock = Sock}) ->
case all_channels() of
[] ->
%% Spec says "Exclusive queues may only be accessed by the current
%% connection, and are deleted when that connection closes."
%% This does not strictly imply synchrony, but in practice it seems
%% to be what people assume.
- rabbit_reader_queue_collector:delete_all(Collector),
- ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ rabbit_queue_collector:delete_all(Collector),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
_ -> State
maybe_close(State) ->
-handle_frame(Type, 0, Payload, State = #v1{connection_state = CS})
+handle_frame(Type, 0, Payload,
+ State = #v1{connection_state = CS,
+ connection = #connection{protocol = Protocol}})
when CS =:= closing; CS =:= closed ->
- case analyze_frame(Type, Payload) of
+ case analyze_frame(Type, Payload, Protocol) of
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
_Other -> State
@@ -464,16 +467,18 @@ handle_frame(Type, 0, Payload, State = #v1{connection_state = CS})
handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
when CS =:= closing; CS =:= closed ->
-handle_frame(Type, 0, Payload, State) ->
- case analyze_frame(Type, Payload) of
+handle_frame(Type, 0, Payload,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
+ case analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, 0, Type, Payload});
heartbeat -> State;
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
Other -> throw({unexpected_frame_on_channel0, Other})
-handle_frame(Type, Channel, Payload, State) ->
- case analyze_frame(Type, Payload) of
+handle_frame(Type, Channel, Payload,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
+ case analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
AnalyzedFrame ->
@@ -516,15 +521,20 @@ handle_frame(Type, Channel, Payload, State) ->
-analyze_frame(?FRAME_METHOD, <<ClassId:16, MethodId:16, MethodFields/binary>>) ->
- {method, rabbit_framing:lookup_method_name({ClassId, MethodId}), MethodFields};
-analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>) ->
+ <<ClassId:16, MethodId:16, MethodFields/binary>>,
+ Protocol) ->
+ MethodName = Protocol:lookup_method_name({ClassId, MethodId}),
+ {method, MethodName, MethodFields};
+ <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>,
+ _Protocol) ->
{content_header, ClassId, Weight, BodySize, Properties};
-analyze_frame(?FRAME_BODY, Body) ->
+analyze_frame(?FRAME_BODY, Body, _Protocol) ->
{content_body, Body};
-analyze_frame(?FRAME_HEARTBEAT, <<>>) ->
+analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) ->
-analyze_frame(_Type, _Body) ->
+analyze_frame(_Type, _Body, _Protocol) ->
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
@@ -549,20 +559,21 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat
%% * The server MUST provide a protocol version that is lower than or
%% equal to that requested by the client in the protocol header.
-%% We support 0-9-1 and 0-9, so by the first rule, we must close the
-%% connection if we're sent anything else. Then, we must send that
-%% version in the Connection.start method.
-handle_input(handshake, <<"AMQP",0,0,9,1>>, State) ->
- %% 0-9-1 style protocol header.
- protocol_negotiate(0, 9, 1, State);
-handle_input(handshake, <<"AMQP",1,1,0,9>>, State) ->
- %% 0-8 and 0-9 style protocol header; we support only 0-9
- protocol_negotiate(0, 9, 0, State);
+handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) ->
+ start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State);
+handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) ->
+ start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State);
+%% the 0-8 spec, confusingly, defines the version as 8-0
+handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) ->
+ start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) ->
+ refuse_connection(Sock, {bad_version, A, B, C, D});
handle_input(handshake, Other, #v1{sock = Sock}) ->
- ok = inet_op(fun () -> rabbit_net:send(
- Sock, <<"AMQP",0,0,9,1>>) end),
- throw({bad_header, Other});
+ refuse_connection(Sock, {bad_header, Other});
handle_input(Callback, Data, _State) ->
throw({bad_input, Callback, Data}).
@@ -570,27 +581,31 @@ handle_input(Callback, Data, _State) ->
%% Offer a protocol version to the client. Connection.start only
%% includes a major and minor version number, Luckily 0-9 and 0-9-1
%% are similar enough that clients will be happy with either.
-protocol_negotiate(ProtocolMajor, ProtocolMinor, _ProtocolRevision,
- State = #v1{sock = Sock, connection = Connection}) ->
- ok = send_on_channel0(
- Sock,
- #'connection.start'{
- version_major = ProtocolMajor,
- version_minor = ProtocolMinor,
- server_properties = server_properties(),
- mechanisms = <<"PLAIN AMQPLAIN">>,
- locales = <<"en_US">> }),
+start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
+ Protocol,
+ State = #v1{sock = Sock, connection = Connection}) ->
+ Start = #'connection.start'{ version_major = ProtocolMajor,
+ version_minor = ProtocolMinor,
+ server_properties = server_properties(),
+ mechanisms = <<"PLAIN AMQPLAIN">>,
+ locales = <<"en_US">> },
+ ok = send_on_channel0(Sock, Start, Protocol),
{State#v1{connection = Connection#connection{
- timeout_sec = ?NORMAL_TIMEOUT},
+ timeout_sec = ?NORMAL_TIMEOUT,
+ protocol = Protocol},
connection_state = starting},
frame_header, 7}.
+refuse_connection(Sock, Exception) ->
+ ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
+ throw(Exception).
-handle_method0(MethodName, FieldsBin, State) ->
+handle_method0(MethodName, FieldsBin,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
- handle_method0(rabbit_framing:decode_method_fields(
- MethodName, FieldsBin),
+ handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
catch exit:Reason ->
CompleteReason = case Reason of
@@ -612,34 +627,31 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
response = Response,
client_properties = ClientProperties},
State = #v1{connection_state = starting,
- connection = Connection,
+ connection = Connection =
+ #connection{protocol = Protocol},
sock = Sock}) ->
User = rabbit_access_control:check_login(Mechanism, Response),
- ok = send_on_channel0(
- Sock,
- #'connection.tune'{channel_max = ?CHANNEL_MAX,
+ Tune = #'connection.tune'{channel_max = 0,
frame_max = ?FRAME_MAX,
- heartbeat = 0}),
+ heartbeat = 0},
+ ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
connection = Connection#connection{
user = User,
client_properties = ClientProperties}};
-handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax,
- frame_max = FrameMax,
+handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
sock = Sock}) ->
- if (FrameMax =< ?FRAME_MIN_SIZE) or
+ if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) ->
+ rabbit_misc:protocol_error(
+ not_allowed, "frame_max=~w < ~w min size",
+ [FrameMax, ?FRAME_MIN_SIZE]);
(?FRAME_MAX /= 0) and (FrameMax > ?FRAME_MAX) ->
- mistuned, "peer sent tune_ok with invalid frame_max", []);
- %% If we have a channel_max limit that the client wishes to
- %% exceed, die as per spec. Not currently a problem, so we ignore
- %% the client's channel_max parameter.
-%% (?CHANNEL_MAX /= 0) and (ChannelMax > ?CHANNEL_MAX) ->
-%% rabbit_misc:protocol_error(
-%% mistuned, "peer sent tune_ok with invalid channel_max");
+ not_allowed, "frame_max=~w > ~w max size",
+ [FrameMax, ?FRAME_MAX]);
true ->
rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat),
State#v1{connection_state = opening,
@@ -647,27 +659,31 @@ handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax,
timeout_sec = ClientHeartbeat,
frame_max = FrameMax}}
handle_method0(#''{virtual_host = VHostPath},
State = #v1{connection_state = opening,
connection = Connection = #connection{
- user = User},
+ user = User,
+ protocol = Protocol},
sock = Sock}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
- ok = send_on_channel0(
- Sock,
- #'connection.open_ok'{deprecated_known_hosts = <<>>}),
+ ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
State#v1{connection_state = running,
connection = NewConnection};
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
-handle_method0(#'connection.close'{}, State = #v1{connection_state = CS})
+ State = #v1{connection_state = CS,
+ connection = #connection{protocol = Protocol},
+ sock = Sock})
when CS =:= closing; CS =:= closed ->
%% We're already closed or closing, so we don't need to cleanup
%% anything.
- ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
State = #v1{connection_state = closed}) ->
@@ -680,8 +696,8 @@ handle_method0(_Method, #v1{connection_state = S}) ->
channel_error, "unexpected method in connection state ~w", [S]).
-send_on_channel0(Sock, Method) ->
- ok = rabbit_writer:internal_send_command(Sock, 0, Method).
+send_on_channel0(Sock, Method, Protocol) ->
+ ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
@@ -715,6 +731,10 @@ i(state, #v1{connection_state = S}) ->
i(channels, #v1{}) ->
+i(protocol, #v1{connection = #connection{protocol = none}}) ->
+ none;
+i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
+ Protocol:version();
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
i(user, #v1{connection = #connection{user = none}}) ->
@@ -738,11 +758,13 @@ send_to_new_channel(Channel, AnalyzedFrame,
#v1{sock = Sock, connection = #connection{
frame_max = FrameMax,
user = #user{username = Username},
- vhost = VHost}} = State,
- WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
- ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/6,
- [Channel, self(), WriterPid, Username, VHost, Collector]),
+ vhost = VHost,
+ protocol = Protocol}} = State,
+ {ok, WriterPid} = rabbit_writer:start(Sock, Channel, FrameMax, Protocol),
+ {ok, ChPid} = rabbit_framing_channel:start_link(
+ fun rabbit_channel:start_link/6,
+ [Channel, self(), WriterPid, Username, VHost, Collector],
+ Protocol),
put({channel, Channel}, {chpid, ChPid}),
put({chpid, ChPid}, {channel, Channel}),
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
@@ -758,25 +780,27 @@ handle_exception(State = #v1{connection_state = CS}, Channel, Reason) ->
log_channel_error(CS, Channel, Reason),
send_exception(State, Channel, Reason).
-send_exception(State, Channel, Reason) ->
- {ShouldClose, CloseChannel, CloseMethod} = map_exception(Channel, Reason),
+send_exception(State = #v1{connection = #connection{protocol = Protocol}},
+ Channel, Reason) ->
+ {ShouldClose, CloseChannel, CloseMethod} =
+ map_exception(Channel, Reason, Protocol),
NewState = case ShouldClose of
true -> terminate_channels(),
false -> close_channel(Channel, State)
ok = rabbit_writer:internal_send_command(
- NewState#v1.sock, CloseChannel, CloseMethod),
+ NewState#v1.sock, CloseChannel, CloseMethod, Protocol),
-map_exception(Channel, Reason) ->
+map_exception(Channel, Reason, Protocol) ->
{SuggestedClose, ReplyCode, ReplyText, FailedMethod} =
- lookup_amqp_exception(Reason),
+ lookup_amqp_exception(Reason, Protocol),
ShouldClose = SuggestedClose or (Channel == 0),
{ClassId, MethodId} = case FailedMethod of
{_, _} -> FailedMethod;
- none -> {0, 0};
- _ -> rabbit_framing:method_id(FailedMethod)
+ none -> {0, 0};
+ _ -> Protocol:method_id(FailedMethod)
{CloseChannel, CloseMethod} =
case ShouldClose of
@@ -791,22 +815,16 @@ map_exception(Channel, Reason) ->
{ShouldClose, CloseChannel, CloseMethod}.
-%% FIXME: this clause can go when we move to AMQP spec >=8.1
-lookup_amqp_exception(#amqp_error{name = precondition_failed,
- explanation = Expl,
- method = Method}) ->
- ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl),
- {false, 406, ExplBin, Method};
lookup_amqp_exception(#amqp_error{name = Name,
explanation = Expl,
- method = Method}) ->
- {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name),
+ method = Method},
+ Protocol) ->
+ {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name),
ExplBin = amqp_exception_explanation(Text, Expl),
{ShouldClose, Code, ExplBin, Method};
-lookup_amqp_exception(Other) ->
+lookup_amqp_exception(Other, Protocol) ->
rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]),
- {ShouldClose, Code, Text} =
- rabbit_framing:lookup_amqp_exception(internal_error),
+ {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error),
{ShouldClose, Code, Text, none}.
amqp_exception_explanation(Text, Expl) ->