diff options
Diffstat (limited to 'src/rabbit_reader.erl')
-rw-r--r-- | src/rabbit_reader.erl | 66 |
1 files changed, 37 insertions, 29 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 3908b646..42af91a8 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -35,9 +35,8 @@ -define(CLOSING_TIMEOUT, 1). -define(CHANNEL_TERMINATION_TIMEOUT, 3). -define(SILENT_CLOSE_DELAY, 3). --define(FRAME_MAX, 131072). %% set to zero once QPid fix their negotiation -%--------------------------------------------------------------------------- +%%-------------------------------------------------------------------------- -record(v1, {parent, sock, connection, callback, recv_length, recv_ref, connection_state, queue_collector, heartbeater, stats_timer, @@ -62,7 +61,7 @@ State#v1.connection_state =:= blocking orelse State#v1.connection_state =:= blocked)). -%%---------------------------------------------------------------------------- +%%-------------------------------------------------------------------------- -ifdef(use_specs). @@ -158,14 +157,15 @@ server_properties(Protocol) -> {copyright, ?COPYRIGHT_MESSAGE}, {information, ?INFORMATION_MESSAGE}]]], - %% Filter duplicated properties in favor of config file provided values + %% Filter duplicated properties in favour of config file provided values lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end, NormalizedConfigServerProps). server_capabilities(rabbit_framing_amqp_0_9_1) -> [{<<"publisher_confirms">>, bool, true}, {<<"exchange_exchange_bindings">>, bool, true}, - {<<"basic.nack">>, bool, true}]; + {<<"basic.nack">>, bool, true}, + {<<"consumer_cancel_notify">>, bool, true}]; server_capabilities(_) -> []. @@ -201,7 +201,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, timeout_sec = ?HANDSHAKE_TIMEOUT, frame_max = ?FRAME_MIN_SIZE, vhost = none, - client_properties = none}, + client_properties = none, + capabilities = []}, callback = uninitialized_callback, recv_length = 0, recv_ref = none, @@ -564,7 +565,7 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, version_major = ProtocolMajor, version_minor = ProtocolMinor, server_properties = server_properties(Protocol), - mechanisms = auth_mechanisms_binary(), + mechanisms = auth_mechanisms_binary(Sock), locales = <<"en_US">> }, ok = send_on_channel0(Sock, Start, Protocol), switch_callback(State#v1{connection = Connection#connection{ @@ -592,14 +593,14 @@ handle_method0(MethodName, FieldsBin, State = #v1{connection = #connection{protocol = Protocol}}) -> HandleException = fun(R) -> - case ?IS_RUNNING(State) of - true -> send_exception(State, 0, R); - %% We don't trust the client at this point - force - %% them to wait for a bit so they can't DOS us with - %% repeated failed logins etc. - false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), - throw({channel0_error, State#v1.connection_state, R}) - end + case ?IS_RUNNING(State) of + true -> send_exception(State, 0, R); + %% We don't trust the client at this point - force + %% them to wait for a bit so they can't DOS us with + %% repeated failed logins etc. + false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), + throw({channel0_error, State#v1.connection_state, R}) + end end, try handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), @@ -616,7 +617,7 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, State0 = #v1{connection_state = starting, connection = Connection, sock = Sock}) -> - AuthMechanism = auth_mechanism_to_module(Mechanism), + AuthMechanism = auth_mechanism_to_module(Mechanism, Sock), Capabilities = case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of {table, Capabilities1} -> Capabilities1; @@ -641,14 +642,15 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, connection = Connection, sock = Sock, start_heartbeat_fun = SHF}) -> - if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) -> + ServerFrameMax = server_frame_max(), + if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE -> rabbit_misc:protocol_error( not_allowed, "frame_max=~w < ~w min size", [FrameMax, ?FRAME_MIN_SIZE]); - (?FRAME_MAX /= 0) and (FrameMax > ?FRAME_MAX) -> + ServerFrameMax /= 0 andalso FrameMax > ServerFrameMax -> rabbit_misc:protocol_error( not_allowed, "frame_max=~w > ~w max size", - [FrameMax, ?FRAME_MAX]); + [FrameMax, ServerFrameMax]); true -> Frame = rabbit_binary_generator:build_heartbeat_frame(), SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, @@ -679,7 +681,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, State#v1{connection_state = running, connection = NewConnection}), rabbit_event:notify(connection_created, - infos(?CREATION_EVENT_KEYS, State1)), + [{type, network} | + infos(?CREATION_EVENT_KEYS, State1)]), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State1) end), State1; @@ -706,17 +709,23 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). +%% Compute frame_max for this instance. Could simply use 0, but breaks +%% QPid Java client. +server_frame_max() -> + {ok, FrameMax} = application:get_env(rabbit, frame_max), + FrameMax. + send_on_channel0(Sock, Method, Protocol) -> ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). -auth_mechanism_to_module(TypeBin) -> +auth_mechanism_to_module(TypeBin, Sock) -> case rabbit_registry:binary_to_type(TypeBin) of {error, not_found} -> rabbit_misc:protocol_error( command_invalid, "unknown authentication mechanism '~s'", [TypeBin]); T -> - case {lists:member(T, auth_mechanisms()), + case {lists:member(T, auth_mechanisms(Sock)), rabbit_registry:lookup_module(auth_mechanism, T)} of {true, {ok, Module}} -> Module; @@ -727,15 +736,14 @@ auth_mechanism_to_module(TypeBin) -> end end. -auth_mechanisms() -> +auth_mechanisms(Sock) -> {ok, Configured} = application:get_env(auth_mechanisms), - [Name || {Name, _Module} <- rabbit_registry:lookup_all(auth_mechanism), - lists:member(Name, Configured)]. + [Name || {Name, Module} <- rabbit_registry:lookup_all(auth_mechanism), + Module:should_offer(Sock), lists:member(Name, Configured)]. -auth_mechanisms_binary() -> +auth_mechanisms_binary(Sock) -> list_to_binary( - string:join( - [atom_to_list(A) || A <- auth_mechanisms()], " ")). + string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")). auth_phase(Response, State = #v1{auth_mechanism = AuthMechanism, @@ -757,7 +765,7 @@ auth_phase(Response, State#v1{auth_state = AuthState1}; {ok, User} -> Tune = #'connection.tune'{channel_max = 0, - frame_max = ?FRAME_MAX, + frame_max = server_frame_max(), heartbeat = 0}, ok = send_on_channel0(Sock, Tune, Protocol), State#v1{connection_state = tuning, |