diff options
Diffstat (limited to 'src/rabbit_reader.erl')
-rw-r--r-- | src/rabbit_reader.erl | 220 |
1 files changed, 119 insertions, 101 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 3e03ae0c..9603faf5 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -41,7 +41,7 @@ -export([server_properties/0]). --export([analyze_frame/2]). +-export([analyze_frame/3]). -import(gen_tcp). -import(fprof). @@ -53,9 +53,7 @@ -define(CLOSING_TIMEOUT, 1). -define(CHANNEL_TERMINATION_TIMEOUT, 3). -define(SILENT_CLOSE_DELAY, 3). -%% set to zero once QPid fix their negotiation --define(FRAME_MAX, 131072). --define(CHANNEL_MAX, 0). +-define(FRAME_MAX, 131072). %% set to zero once QPid fix their negotiation %--------------------------------------------------------------------------- @@ -64,8 +62,8 @@ -define(INFO_KEYS, [pid, address, port, peer_address, peer_port, - recv_oct, recv_cnt, send_oct, send_cnt, send_pend, - state, channels, user, vhost, timeout, frame_max, client_properties]). + recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels, + protocol, user, vhost, timeout, frame_max, client_properties]). %% connection lifecycle %% @@ -140,11 +138,11 @@ -ifdef(use_specs). --spec(info_keys/0 :: () -> [info_key()]). --spec(info/1 :: (pid()) -> [info()]). --spec(info/2 :: (pid(), [info_key()]) -> [info()]). +-spec(info_keys/0 :: () -> [rabbit_types:info_key()]). +-spec(info/1 :: (pid()) -> [rabbit_types:info()]). +-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). --spec(server_properties/0 :: () -> amqp_table()). +-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). -endif. @@ -242,7 +240,7 @@ start_connection(Parent, Deb, Sock, SockTransform) -> erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), ProfilingValue = setup_profiling(), - {ok, Collector} = rabbit_reader_queue_collector:start_link(), + {ok, Collector} = rabbit_queue_collector:start_link(), try mainloop(Parent, Deb, switch_callback( #v1{sock = ClientSock, @@ -251,7 +249,8 @@ start_connection(Parent, Deb, Sock, SockTransform) -> timeout_sec = ?HANDSHAKE_TIMEOUT, frame_max = ?FRAME_MIN_SIZE, vhost = none, - client_properties = none}, + client_properties = none, + protocol = none}, callback = uninitialized_callback, recv_ref = none, connection_state = pre_init, @@ -274,7 +273,7 @@ start_connection(Parent, Deb, Sock, SockTransform) -> %% %% gen_tcp:close(ClientSock), teardown_profiling(ProfilingValue), - rabbit_reader_queue_collector:shutdown(Collector), + rabbit_queue_collector:shutdown(Collector), rabbit_misc:unlink_and_capture_exit(Collector) end, done. @@ -439,24 +438,28 @@ wait_for_channel_termination(N, TimerRef) -> end. maybe_close(State = #v1{connection_state = closing, - queue_collector = Collector}) -> + queue_collector = Collector, + connection = #connection{protocol = Protocol}, + sock = Sock}) -> case all_channels() of [] -> %% Spec says "Exclusive queues may only be accessed by the current %% connection, and are deleted when that connection closes." %% This does not strictly imply synchrony, but in practice it seems %% to be what people assume. - rabbit_reader_queue_collector:delete_all(Collector), - ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}), + rabbit_queue_collector:delete_all(Collector), + ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), close_connection(State); _ -> State end; maybe_close(State) -> State. -handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) +handle_frame(Type, 0, Payload, + State = #v1{connection_state = CS, + connection = #connection{protocol = Protocol}}) when CS =:= closing; CS =:= closed -> - case analyze_frame(Type, Payload) of + case analyze_frame(Type, Payload, Protocol) of {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); _Other -> State @@ -464,16 +467,18 @@ handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) when CS =:= closing; CS =:= closed -> State; -handle_frame(Type, 0, Payload, State) -> - case analyze_frame(Type, Payload) of +handle_frame(Type, 0, Payload, + State = #v1{connection = #connection{protocol = Protocol}}) -> + case analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, 0, Type, Payload}); heartbeat -> State; {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); Other -> throw({unexpected_frame_on_channel0, Other}) end; -handle_frame(Type, Channel, Payload, State) -> - case analyze_frame(Type, Payload) of +handle_frame(Type, Channel, Payload, + State = #v1{connection = #connection{protocol = Protocol}}) -> + case analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); AnalyzedFrame -> @@ -516,15 +521,20 @@ handle_frame(Type, Channel, Payload, State) -> end end. -analyze_frame(?FRAME_METHOD, <<ClassId:16, MethodId:16, MethodFields/binary>>) -> - {method, rabbit_framing:lookup_method_name({ClassId, MethodId}), MethodFields}; -analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>) -> +analyze_frame(?FRAME_METHOD, + <<ClassId:16, MethodId:16, MethodFields/binary>>, + Protocol) -> + MethodName = Protocol:lookup_method_name({ClassId, MethodId}), + {method, MethodName, MethodFields}; +analyze_frame(?FRAME_HEADER, + <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>, + _Protocol) -> {content_header, ClassId, Weight, BodySize, Properties}; -analyze_frame(?FRAME_BODY, Body) -> +analyze_frame(?FRAME_BODY, Body, _Protocol) -> {content_body, Body}; -analyze_frame(?FRAME_HEARTBEAT, <<>>) -> +analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) -> heartbeat; -analyze_frame(_Type, _Body) -> +analyze_frame(_Type, _Body, _Protocol) -> error. handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> @@ -549,20 +559,21 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat %% %% * The server MUST provide a protocol version that is lower than or %% equal to that requested by the client in the protocol header. -%% -%% We support 0-9-1 and 0-9, so by the first rule, we must close the -%% connection if we're sent anything else. Then, we must send that -%% version in the Connection.start method. -handle_input(handshake, <<"AMQP",0,0,9,1>>, State) -> - %% 0-9-1 style protocol header. - protocol_negotiate(0, 9, 1, State); -handle_input(handshake, <<"AMQP",1,1,0,9>>, State) -> - %% 0-8 and 0-9 style protocol header; we support only 0-9 - protocol_negotiate(0, 9, 0, State); +handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) -> + start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State); + +handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) -> + start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State); + +%% the 0-8 spec, confusingly, defines the version as 8-0 +handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) -> + start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); + +handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) -> + refuse_connection(Sock, {bad_version, A, B, C, D}); + handle_input(handshake, Other, #v1{sock = Sock}) -> - ok = inet_op(fun () -> rabbit_net:send( - Sock, <<"AMQP",0,0,9,1>>) end), - throw({bad_header, Other}); + refuse_connection(Sock, {bad_header, Other}); handle_input(Callback, Data, _State) -> throw({bad_input, Callback, Data}). @@ -570,27 +581,31 @@ handle_input(Callback, Data, _State) -> %% Offer a protocol version to the client. Connection.start only %% includes a major and minor version number, Luckily 0-9 and 0-9-1 %% are similar enough that clients will be happy with either. -protocol_negotiate(ProtocolMajor, ProtocolMinor, _ProtocolRevision, - State = #v1{sock = Sock, connection = Connection}) -> - ok = send_on_channel0( - Sock, - #'connection.start'{ - version_major = ProtocolMajor, - version_minor = ProtocolMinor, - server_properties = server_properties(), - mechanisms = <<"PLAIN AMQPLAIN">>, - locales = <<"en_US">> }), +start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, + Protocol, + State = #v1{sock = Sock, connection = Connection}) -> + Start = #'connection.start'{ version_major = ProtocolMajor, + version_minor = ProtocolMinor, + server_properties = server_properties(), + mechanisms = <<"PLAIN AMQPLAIN">>, + locales = <<"en_US">> }, + ok = send_on_channel0(Sock, Start, Protocol), {State#v1{connection = Connection#connection{ - timeout_sec = ?NORMAL_TIMEOUT}, + timeout_sec = ?NORMAL_TIMEOUT, + protocol = Protocol}, connection_state = starting}, frame_header, 7}. +refuse_connection(Sock, Exception) -> + ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end), + throw(Exception). + %%-------------------------------------------------------------------------- -handle_method0(MethodName, FieldsBin, State) -> +handle_method0(MethodName, FieldsBin, + State = #v1{connection = #connection{protocol = Protocol}}) -> try - handle_method0(rabbit_framing:decode_method_fields( - MethodName, FieldsBin), + handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), State) catch exit:Reason -> CompleteReason = case Reason of @@ -612,34 +627,31 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, response = Response, client_properties = ClientProperties}, State = #v1{connection_state = starting, - connection = Connection, + connection = Connection = + #connection{protocol = Protocol}, sock = Sock}) -> User = rabbit_access_control:check_login(Mechanism, Response), - ok = send_on_channel0( - Sock, - #'connection.tune'{channel_max = ?CHANNEL_MAX, + Tune = #'connection.tune'{channel_max = 0, frame_max = ?FRAME_MAX, - heartbeat = 0}), + heartbeat = 0}, + ok = send_on_channel0(Sock, Tune, Protocol), State#v1{connection_state = tuning, connection = Connection#connection{ user = User, client_properties = ClientProperties}}; -handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax, - frame_max = FrameMax, +handle_method0(#'connection.tune_ok'{frame_max = FrameMax, heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, connection = Connection, sock = Sock}) -> - if (FrameMax =< ?FRAME_MIN_SIZE) or + if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) -> + rabbit_misc:protocol_error( + not_allowed, "frame_max=~w < ~w min size", + [FrameMax, ?FRAME_MIN_SIZE]); (?FRAME_MAX /= 0) and (FrameMax > ?FRAME_MAX) -> rabbit_misc:protocol_error( - mistuned, "peer sent tune_ok with invalid frame_max", []); - %% If we have a channel_max limit that the client wishes to - %% exceed, die as per spec. Not currently a problem, so we ignore - %% the client's channel_max parameter. -%% (?CHANNEL_MAX /= 0) and (ChannelMax > ?CHANNEL_MAX) -> -%% rabbit_misc:protocol_error( -%% mistuned, "peer sent tune_ok with invalid channel_max"); + not_allowed, "frame_max=~w > ~w max size", + [FrameMax, ?FRAME_MAX]); true -> rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat), State#v1{connection_state = opening, @@ -647,27 +659,31 @@ handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax, timeout_sec = ClientHeartbeat, frame_max = FrameMax}} end; + handle_method0(#'connection.open'{virtual_host = VHostPath}, + State = #v1{connection_state = opening, connection = Connection = #connection{ - user = User}, + user = User, + protocol = Protocol}, sock = Sock}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, - ok = send_on_channel0( - Sock, - #'connection.open_ok'{deprecated_known_hosts = <<>>}), + ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), State#v1{connection_state = running, connection = NewConnection}; handle_method0(#'connection.close'{}, State = #v1{connection_state = running}) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); -handle_method0(#'connection.close'{}, State = #v1{connection_state = CS}) +handle_method0(#'connection.close'{}, + State = #v1{connection_state = CS, + connection = #connection{protocol = Protocol}, + sock = Sock}) when CS =:= closing; CS =:= closed -> %% We're already closed or closing, so we don't need to cleanup %% anything. - ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}), + ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), State; handle_method0(#'connection.close_ok'{}, State = #v1{connection_state = closed}) -> @@ -680,8 +696,8 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -send_on_channel0(Sock, Method) -> - ok = rabbit_writer:internal_send_command(Sock, 0, Method). +send_on_channel0(Sock, Method, Protocol) -> + ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). %%-------------------------------------------------------------------------- @@ -715,6 +731,10 @@ i(state, #v1{connection_state = S}) -> S; i(channels, #v1{}) -> length(all_channels()); +i(protocol, #v1{connection = #connection{protocol = none}}) -> + none; +i(protocol, #v1{connection = #connection{protocol = Protocol}}) -> + Protocol:version(); i(user, #v1{connection = #connection{user = #user{username = Username}}}) -> Username; i(user, #v1{connection = #connection{user = none}}) -> @@ -738,11 +758,13 @@ send_to_new_channel(Channel, AnalyzedFrame, #v1{sock = Sock, connection = #connection{ frame_max = FrameMax, user = #user{username = Username}, - vhost = VHost}} = State, - WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), - ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/6, - [Channel, self(), WriterPid, Username, VHost, Collector]), + vhost = VHost, + protocol = Protocol}} = State, + {ok, WriterPid} = rabbit_writer:start(Sock, Channel, FrameMax, Protocol), + {ok, ChPid} = rabbit_framing_channel:start_link( + fun rabbit_channel:start_link/6, + [Channel, self(), WriterPid, Username, VHost, Collector], + Protocol), put({channel, Channel}, {chpid, ChPid}), put({chpid, ChPid}, {channel, Channel}), ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). @@ -758,25 +780,27 @@ handle_exception(State = #v1{connection_state = CS}, Channel, Reason) -> log_channel_error(CS, Channel, Reason), send_exception(State, Channel, Reason). -send_exception(State, Channel, Reason) -> - {ShouldClose, CloseChannel, CloseMethod} = map_exception(Channel, Reason), +send_exception(State = #v1{connection = #connection{protocol = Protocol}}, + Channel, Reason) -> + {ShouldClose, CloseChannel, CloseMethod} = + map_exception(Channel, Reason, Protocol), NewState = case ShouldClose of true -> terminate_channels(), close_connection(State); false -> close_channel(Channel, State) end, ok = rabbit_writer:internal_send_command( - NewState#v1.sock, CloseChannel, CloseMethod), + NewState#v1.sock, CloseChannel, CloseMethod, Protocol), NewState. -map_exception(Channel, Reason) -> +map_exception(Channel, Reason, Protocol) -> {SuggestedClose, ReplyCode, ReplyText, FailedMethod} = - lookup_amqp_exception(Reason), + lookup_amqp_exception(Reason, Protocol), ShouldClose = SuggestedClose or (Channel == 0), {ClassId, MethodId} = case FailedMethod of {_, _} -> FailedMethod; - none -> {0, 0}; - _ -> rabbit_framing:method_id(FailedMethod) + none -> {0, 0}; + _ -> Protocol:method_id(FailedMethod) end, {CloseChannel, CloseMethod} = case ShouldClose of @@ -791,22 +815,16 @@ map_exception(Channel, Reason) -> end, {ShouldClose, CloseChannel, CloseMethod}. -%% FIXME: this clause can go when we move to AMQP spec >=8.1 -lookup_amqp_exception(#amqp_error{name = precondition_failed, - explanation = Expl, - method = Method}) -> - ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl), - {false, 406, ExplBin, Method}; lookup_amqp_exception(#amqp_error{name = Name, explanation = Expl, - method = Method}) -> - {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name), + method = Method}, + Protocol) -> + {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name), ExplBin = amqp_exception_explanation(Text, Expl), {ShouldClose, Code, ExplBin, Method}; -lookup_amqp_exception(Other) -> +lookup_amqp_exception(Other, Protocol) -> rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), - {ShouldClose, Code, Text} = - rabbit_framing:lookup_amqp_exception(internal_error), + {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error), {ShouldClose, Code, Text, none}. amqp_exception_explanation(Text, Expl) -> |