summaryrefslogtreecommitdiff
path: root/src/rabbit_reader.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_reader.erl')
-rw-r--r--src/rabbit_reader.erl66
1 files changed, 37 insertions, 29 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 3908b646..42af91a8 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -35,9 +35,8 @@
-define(CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
-define(SILENT_CLOSE_DELAY, 3).
--define(FRAME_MAX, 131072). %% set to zero once QPid fix their negotiation
-%---------------------------------------------------------------------------
+%%--------------------------------------------------------------------------
-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
connection_state, queue_collector, heartbeater, stats_timer,
@@ -62,7 +61,7 @@
State#v1.connection_state =:= blocking orelse
State#v1.connection_state =:= blocked)).
-%%----------------------------------------------------------------------------
+%%--------------------------------------------------------------------------
-ifdef(use_specs).
@@ -158,14 +157,15 @@ server_properties(Protocol) ->
{copyright, ?COPYRIGHT_MESSAGE},
{information, ?INFORMATION_MESSAGE}]]],
- %% Filter duplicated properties in favor of config file provided values
+ %% Filter duplicated properties in favour of config file provided values
lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end,
NormalizedConfigServerProps).
server_capabilities(rabbit_framing_amqp_0_9_1) ->
[{<<"publisher_confirms">>, bool, true},
{<<"exchange_exchange_bindings">>, bool, true},
- {<<"basic.nack">>, bool, true}];
+ {<<"basic.nack">>, bool, true},
+ {<<"consumer_cancel_notify">>, bool, true}];
server_capabilities(_) ->
[].
@@ -201,7 +201,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
- client_properties = none},
+ client_properties = none,
+ capabilities = []},
callback = uninitialized_callback,
recv_length = 0,
recv_ref = none,
@@ -564,7 +565,7 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
version_major = ProtocolMajor,
version_minor = ProtocolMinor,
server_properties = server_properties(Protocol),
- mechanisms = auth_mechanisms_binary(),
+ mechanisms = auth_mechanisms_binary(Sock),
locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol),
switch_callback(State#v1{connection = Connection#connection{
@@ -592,14 +593,14 @@ handle_method0(MethodName, FieldsBin,
State = #v1{connection = #connection{protocol = Protocol}}) ->
HandleException =
fun(R) ->
- case ?IS_RUNNING(State) of
- true -> send_exception(State, 0, R);
- %% We don't trust the client at this point - force
- %% them to wait for a bit so they can't DOS us with
- %% repeated failed logins etc.
- false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, State#v1.connection_state, R})
- end
+ case ?IS_RUNNING(State) of
+ true -> send_exception(State, 0, R);
+ %% We don't trust the client at this point - force
+ %% them to wait for a bit so they can't DOS us with
+ %% repeated failed logins etc.
+ false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({channel0_error, State#v1.connection_state, R})
+ end
end,
try
handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
@@ -616,7 +617,7 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
State0 = #v1{connection_state = starting,
connection = Connection,
sock = Sock}) ->
- AuthMechanism = auth_mechanism_to_module(Mechanism),
+ AuthMechanism = auth_mechanism_to_module(Mechanism, Sock),
Capabilities =
case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of
{table, Capabilities1} -> Capabilities1;
@@ -641,14 +642,15 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
connection = Connection,
sock = Sock,
start_heartbeat_fun = SHF}) ->
- if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) ->
+ ServerFrameMax = server_frame_max(),
+ if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE ->
rabbit_misc:protocol_error(
not_allowed, "frame_max=~w < ~w min size",
[FrameMax, ?FRAME_MIN_SIZE]);
- (?FRAME_MAX /= 0) and (FrameMax > ?FRAME_MAX) ->
+ ServerFrameMax /= 0 andalso FrameMax > ServerFrameMax ->
rabbit_misc:protocol_error(
not_allowed, "frame_max=~w > ~w max size",
- [FrameMax, ?FRAME_MAX]);
+ [FrameMax, ServerFrameMax]);
true ->
Frame = rabbit_binary_generator:build_heartbeat_frame(),
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
@@ -679,7 +681,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
State#v1{connection_state = running,
connection = NewConnection}),
rabbit_event:notify(connection_created,
- infos(?CREATION_EVENT_KEYS, State1)),
+ [{type, network} |
+ infos(?CREATION_EVENT_KEYS, State1)]),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State1) end),
State1;
@@ -706,17 +709,23 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
+%% Compute frame_max for this instance. Could simply use 0, but breaks
+%% QPid Java client.
+server_frame_max() ->
+ {ok, FrameMax} = application:get_env(rabbit, frame_max),
+ FrameMax.
+
send_on_channel0(Sock, Method, Protocol) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
-auth_mechanism_to_module(TypeBin) ->
+auth_mechanism_to_module(TypeBin, Sock) ->
case rabbit_registry:binary_to_type(TypeBin) of
{error, not_found} ->
rabbit_misc:protocol_error(
command_invalid, "unknown authentication mechanism '~s'",
[TypeBin]);
T ->
- case {lists:member(T, auth_mechanisms()),
+ case {lists:member(T, auth_mechanisms(Sock)),
rabbit_registry:lookup_module(auth_mechanism, T)} of
{true, {ok, Module}} ->
Module;
@@ -727,15 +736,14 @@ auth_mechanism_to_module(TypeBin) ->
end
end.
-auth_mechanisms() ->
+auth_mechanisms(Sock) ->
{ok, Configured} = application:get_env(auth_mechanisms),
- [Name || {Name, _Module} <- rabbit_registry:lookup_all(auth_mechanism),
- lists:member(Name, Configured)].
+ [Name || {Name, Module} <- rabbit_registry:lookup_all(auth_mechanism),
+ Module:should_offer(Sock), lists:member(Name, Configured)].
-auth_mechanisms_binary() ->
+auth_mechanisms_binary(Sock) ->
list_to_binary(
- string:join(
- [atom_to_list(A) || A <- auth_mechanisms()], " ")).
+ string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")).
auth_phase(Response,
State = #v1{auth_mechanism = AuthMechanism,
@@ -757,7 +765,7 @@ auth_phase(Response,
State#v1{auth_state = AuthState1};
{ok, User} ->
Tune = #'connection.tune'{channel_max = 0,
- frame_max = ?FRAME_MAX,
+ frame_max = server_frame_max(),
heartbeat = 0},
ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,