summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-12-05 18:29:19 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-12-05 18:29:19 +0000
commit25f17a80601d6f407298c2fa8f959422f8e90085 (patch)
tree9ba46770090e34ab432d87fef1cc7c3cd928def7
parentb41498b6d1bdcd4660bb4076e1768dac2466b9de (diff)
parent80fd513bda062aae194e07f5795dcbf286d7a4b0 (diff)
downloadrabbitmq-server-25f17a80601d6f407298c2fa8f959422f8e90085.tar.gz
merge in default
-rw-r--r--docs/rabbitmq.config.example5
-rw-r--r--docs/rabbitmqctl.1.xml4
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--src/rabbit_reader.erl155
-rw-r--r--src/rabbit_upgrade.erl9
5 files changed, 108 insertions, 67 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index 91402649..c0d6cc70 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -138,6 +138,11 @@
%%
%% {frame_max, 131072},
+ %% Set the max permissible number of channels per connection.
+ %% 0 means "no limit".
+ %%
+ %% {channel_max, 128},
+
%% Customising Socket Options.
%%
%% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 6410235d..d2a3f7c7 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1430,6 +1430,10 @@
<listitem><para>Maximum frame size (bytes).</para></listitem>
</varlistentry>
<varlistentry>
+ <term>channel_max</term>
+ <listitem><para>Maximum number of channels on this connection.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>client_properties</term>
<listitem><para>Informational properties transmitted by the client
during connection establishment.</para></listitem>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index a10bad45..29f06e79 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -25,6 +25,7 @@
%% 0 ("no limit") would make a better default, but that
%% breaks the QPid Java client
{frame_max, 131072},
+ {channel_max, 0},
{heartbeat, 580},
{msg_store_file_size_limit, 16777216},
{queue_index_max_journal_entries, 65536},
@@ -52,6 +53,7 @@
{nodelay, true},
{linger, {true, 0}},
{exit_on_close, false}]},
+ {halt_on_upgrade_failure, true},
{hipe_compile, false},
%% see bug 24513 for how this list was created
{hipe_modules,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 2221ca79..144aa8a9 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -32,6 +32,7 @@
-define(CLOSING_TIMEOUT, 30).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
-define(SILENT_CLOSE_DELAY, 3).
+-define(CHANNEL_MIN, 1).
%%--------------------------------------------------------------------------
@@ -40,7 +41,7 @@
stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
- protocol, user, timeout_sec, frame_max, vhost,
+ protocol, user, timeout_sec, frame_max, channel_max, vhost,
client_properties, capabilities,
auth_mechanism, auth_state}).
@@ -55,7 +56,7 @@
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
peer_cert_validity, auth_mechanism, ssl_protocol,
ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
- timeout, frame_max, client_properties]).
+ timeout, frame_max, channel_max, client_properties]).
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
@@ -605,17 +606,26 @@ create_channel(Channel, State) ->
connection = #connection{name = Name,
protocol = Protocol,
frame_max = FrameMax,
+ channel_max = ChannelMax,
user = User,
vhost = VHost,
capabilities = Capabilities}} = State,
- {ok, _ChSupPid, {ChPid, AState}} =
- rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
- Protocol, User, VHost, Capabilities, Collector}),
- MRef = erlang:monitor(process, ChPid),
- put({ch_pid, ChPid}, {Channel, MRef}),
- put({channel, Channel}, {ChPid, AState}),
- {ChPid, AState}.
+ N = length(all_channels()),
+ case ChannelMax == 0 orelse N < ChannelMax of
+ true -> {ok, _ChSupPid, {ChPid, AState}} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
+ Protocol, User, VHost, Capabilities,
+ Collector}),
+ MRef = erlang:monitor(process, ChPid),
+ put({ch_pid, ChPid}, {Channel, MRef}),
+ put({channel, Channel}, {ChPid, AState}),
+ {ok, {ChPid, AState}};
+ false -> {error, rabbit_misc:amqp_error(
+ not_allowed, "number of channels opened (~w) has "
+ "reached the negotiated channel_max (~w)",
+ [N, ChannelMax], 'none')}
+ end.
channel_cleanup(ChPid) ->
case get({ch_pid, ChPid}) of
@@ -663,24 +673,28 @@ handle_frame(Type, Channel, Payload, State) ->
process_frame(Frame, Channel, State) ->
ChKey = {channel, Channel},
- {ChPid, AState} = case get(ChKey) of
- undefined -> create_channel(Channel, State);
- Other -> Other
- end,
- case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} ->
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {ok, Method, NewAState} ->
- rabbit_channel:do(ChPid, Method),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {ok, Method, Content, NewAState} ->
- rabbit_channel:do_flow(ChPid, Method, Content),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, control_throttle(State));
- {error, Reason} ->
- handle_exception(State, Channel, Reason)
+ case (case get(ChKey) of
+ undefined -> create_channel(Channel, State);
+ Other -> {ok, Other}
+ end) of
+ {error, Error} ->
+ handle_exception(State, Channel, Error);
+ {ok, {ChPid, AState}} ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} ->
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, NewAState} ->
+ rabbit_channel:do(ChPid, Method),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, Content, NewAState} ->
+ rabbit_channel:do_flow(ChPid, Method, Content),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, control_throttle(State));
+ {error, Reason} ->
+ handle_exception(State, Channel, Reason)
+ end
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
@@ -837,38 +851,33 @@ handle_method0(#'connection.secure_ok'{response = Response},
State = #v1{connection_state = securing}) ->
auth_phase(Response, State);
-handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
- heartbeat = ClientHeartbeat},
+handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
+ channel_max = ChannelMax,
+ heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
helper_sup = SupPid,
sock = Sock}) ->
- ServerFrameMax = server_frame_max(),
- if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE ->
- rabbit_misc:protocol_error(
- not_allowed, "frame_max=~w < ~w min size",
- [FrameMax, ?FRAME_MIN_SIZE]);
- ServerFrameMax /= 0 andalso FrameMax > ServerFrameMax ->
- rabbit_misc:protocol_error(
- not_allowed, "frame_max=~w > ~w max size",
- [FrameMax, ServerFrameMax]);
- true ->
- {ok, Collector} =
- rabbit_connection_helper_sup:start_queue_collector(SupPid),
- Frame = rabbit_binary_generator:build_heartbeat_frame(),
- SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
- Parent = self(),
- ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
- Heartbeater =
- rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat,
- SendFun, ClientHeartbeat, ReceiveFun),
- State#v1{connection_state = opening,
- connection = Connection#connection{
- timeout_sec = ClientHeartbeat,
- frame_max = FrameMax},
- queue_collector = Collector,
- heartbeater = Heartbeater}
- end;
+ ok = validate_negotiated_integer_value(
+ frame_max, ?FRAME_MIN_SIZE, FrameMax),
+ ok = validate_negotiated_integer_value(
+ channel_max, ?CHANNEL_MIN, ChannelMax),
+ {ok, Collector} =
+ rabbit_connection_helper_sup:start_queue_collector(SupPid),
+ Frame = rabbit_binary_generator:build_heartbeat_frame(),
+ SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
+ Parent = self(),
+ ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
+ Heartbeater =
+ rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat,
+ SendFun, ClientHeartbeat, ReceiveFun),
+ State#v1{connection_state = opening,
+ connection = Connection#connection{
+ frame_max = FrameMax,
+ channel_max = ChannelMax,
+ timeout_sec = ClientHeartbeat},
+ queue_collector = Collector,
+ heartbeater = Heartbeater};
handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
@@ -916,13 +925,28 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
-server_frame_max() ->
- {ok, FrameMax} = application:get_env(rabbit, frame_max),
- FrameMax.
+validate_negotiated_integer_value(Field, Min, ClientValue) ->
+ ServerValue = get_env(Field),
+ if ClientValue /= 0 andalso ClientValue < Min ->
+ fail_negotiation(Field, min, ServerValue, ClientValue);
+ ServerValue /= 0 andalso ClientValue > ServerValue ->
+ fail_negotiation(Field, max, ServerValue, ClientValue);
+ true ->
+ ok
+ end.
+
+fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) ->
+ {S1, S2} = case MinOrMax of
+ min -> {lower, minimum};
+ max -> {higher, maximum}
+ end,
+ rabbit_misc:protocol_error(
+ not_allowed, "negotiated ~w = ~w is ~w than the ~w allowed value (~w)",
+ [Field, ClientValue, S1, S2, ServerValue], 'connection.tune').
-server_heartbeat() ->
- {ok, Heartbeat} = application:get_env(rabbit, heartbeat),
- Heartbeat.
+get_env(Key) ->
+ {ok, Value} = application:get_env(rabbit, Key),
+ Value.
send_on_channel0(Sock, Method, Protocol) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
@@ -988,9 +1012,9 @@ auth_phase(Response,
State#v1{connection = Connection#connection{
auth_state = AuthState1}};
{ok, User} ->
- Tune = #'connection.tune'{channel_max = 0,
- frame_max = server_frame_max(),
- heartbeat = server_heartbeat()},
+ Tune = #'connection.tune'{frame_max = get_env(frame_max),
+ channel_max = get_env(channel_max),
+ heartbeat = get_env(heartbeat)},
ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
connection = Connection#connection{user = User,
@@ -1042,6 +1066,7 @@ ic(user, #connection{user = U}) -> U#user.username;
ic(vhost, #connection{vhost = VHost}) -> VHost;
ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout;
ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax;
+ic(channel_max, #connection{channel_max = ChMax}) -> ChMax;
ic(client_properties, #connection{client_properties = CP}) -> CP;
ic(auth_mechanism, #connection{auth_mechanism = none}) -> none;
ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name;
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 1047b823..c1f142d7 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -191,9 +191,14 @@ die(Msg, Args) ->
%% straight out into do_boot, generating an erl_crash.dump
%% and displaying any error message in a confusing way.
error_logger:error_msg(Msg, Args),
- io:format("~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args),
+ Str = rabbit_misc:format(
+ "~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args),
+ io:format(Str),
error_logger:logfile(close),
- halt(1).
+ case application:get_env(rabbit, halt_on_upgrade_failure) of
+ {ok, false} -> throw({upgrade_error, Str});
+ _ -> halt(1) %% i.e. true or undefined
+ end.
primary_upgrade(Upgrades, Nodes) ->
Others = Nodes -- [node()],