summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-06 16:30:31 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-06 16:30:31 +0000
commit81b96105117379db9030472c1cb2789eed22cf71 (patch)
treeb4de7a885b8a1139676e334af8e0302a1dfb1636
parent220b62f2b8ea599dc33497243dc3584a8ff0fa6e (diff)
parent89141b248b0a8d842a91a1415274a59d9df9b249 (diff)
downloadrabbitmq-server-81b96105117379db9030472c1cb2789eed22cf71.tar.gz
Merge bug 25942
-rw-r--r--codegen.py8
-rw-r--r--docs/rabbitmq.config.example5
-rw-r--r--docs/rabbitmqctl.1.xml28
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--src/credit_flow.erl21
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl55
-rw-r--r--src/rabbit_binary_parser.erl18
-rw-r--r--src/rabbit_channel.erl20
-rw-r--r--src/rabbit_control_main.erl9
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_node_monitor.erl6
-rw-r--r--src/rabbit_reader.erl285
-rw-r--r--src/rabbit_upgrade_functions.erl15
-rw-r--r--src/rabbit_vhost.erl20
-rw-r--r--src/rabbit_writer.erl2
16 files changed, 323 insertions, 175 deletions
diff --git a/codegen.py b/codegen.py
index 842549cf..91fa1154 100644
--- a/codegen.py
+++ b/codegen.py
@@ -187,7 +187,7 @@ def genErl(spec):
elif type == 'table':
return p+'Len:32/unsigned, '+p+'Tab:'+p+'Len/binary'
- def genFieldPostprocessing(packed):
+ def genFieldPostprocessing(packed, hasContent):
for f in packed:
type = erlType(f.domain)
if type == 'bit':
@@ -199,6 +199,10 @@ def genErl(spec):
elif type == 'table':
print " F%d = rabbit_binary_parser:parse_table(F%dTab)," % \
(f.index, f.index)
+ # We skip the check on content-bearing methods for
+ # speed. This is a sanity check, not a security thing.
+ elif type == 'shortstr' and not hasContent:
+ print " rabbit_binary_parser:assert_utf8(F%d)," % (f.index)
else:
pass
@@ -214,7 +218,7 @@ def genErl(spec):
restSeparator = ''
recordConstructorExpr = '#%s{%s}' % (m.erlangName(), fieldMapList(m.arguments))
print "decode_method_fields(%s, <<%s>>) ->" % (m.erlangName(), binaryPattern)
- genFieldPostprocessing(packedFields)
+ genFieldPostprocessing(packedFields, m.hasContent)
print " %s;" % (recordConstructorExpr,)
def genDecodeProperties(c):
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index 91402649..c0d6cc70 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -138,6 +138,11 @@
%%
%% {frame_max, 131072},
+ %% Set the max permissible number of channels per connection.
+ %% 0 means "no limit".
+ %%
+ %% {channel_max, 128},
+
%% Customising Socket Options.
%%
%% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 6ec7ee07..d2a3f7c7 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1135,6 +1135,13 @@
<listitem><para>Number of consumers.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>consumer_utilisation</term>
+ <listitem><para>Fraction of the time (between 0.0 and 1.0)
+ that the queue is able to immediately deliver messages to
+ consumers. This can be less than 1.0 if consumers are limited
+ by network congestion or prefetch count.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>memory</term>
<listitem><para>Bytes of memory consumed by the Erlang process associated with the
queue, including stack, heap and internal structures.</para></listitem>
@@ -1390,24 +1397,9 @@
</varlistentry>
<varlistentry>
- <term>last_blocked_by</term>
- <listitem><para>The reason for which this connection
- was last blocked. One of 'resource' - due to a memory
- or disk alarm, 'flow' - due to internal flow control, or
- 'none' if the connection was never
- blocked.</para></listitem>
- </varlistentry>
- <varlistentry>
- <term>last_blocked_age</term>
- <listitem><para>Time, in seconds, since this
- connection was last blocked, or
- 'infinity'.</para></listitem>
- </varlistentry>
-
- <varlistentry>
<term>state</term>
<listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>,
- <command>opening</command>, <command>running</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
+ <command>opening</command>, <command>running</command>, <command>flow</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
</varlistentry>
<varlistentry>
<term>channels</term>
@@ -1438,6 +1430,10 @@
<listitem><para>Maximum frame size (bytes).</para></listitem>
</varlistentry>
<varlistentry>
+ <term>channel_max</term>
+ <listitem><para>Maximum number of channels on this connection.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>client_properties</term>
<listitem><para>Informational properties transmitted by the client
during connection establishment.</para></listitem>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index f0fee96a..29f06e79 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -25,6 +25,7 @@
%% 0 ("no limit") would make a better default, but that
%% breaks the QPid Java client
{frame_max, 131072},
+ {channel_max, 0},
{heartbeat, 580},
{msg_store_file_size_limit, 16777216},
{queue_index_max_journal_entries, 65536},
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index d48d649e..39a257ac 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -30,7 +30,7 @@
-define(DEFAULT_CREDIT, {200, 50}).
--export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]).
+-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0, state/0]).
-export([peer_down/1]).
%%----------------------------------------------------------------------------
@@ -110,6 +110,18 @@ blocked() -> case get(credit_blocked) of
_ -> true
end.
+state() -> case blocked() of
+ true -> flow;
+ false -> case get(credit_blocked_at) of
+ undefined -> running;
+ B -> Diff = timer:now_diff(erlang:now(), B),
+ case Diff < 5000000 of
+ true -> flow;
+ false -> running
+ end
+ end
+ end.
+
peer_down(Peer) ->
%% In theory we could also remove it from credit_deferred here, but it
%% doesn't really matter; at some point later we will drain
@@ -128,7 +140,12 @@ grant(To, Quantity) ->
true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred])
end.
-block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
+block(From) ->
+ case blocked() of
+ false -> put(credit_blocked_at, erlang:now());
+ true -> ok
+ end,
+ ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
unblock(From) ->
?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8306f134..6b1e00b7 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -115,7 +115,8 @@
-spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(consumers/1 ::
(rabbit_types:amqqueue())
- -> [{pid(), rabbit_types:ctag(), boolean()}]).
+ -> [{pid(), rabbit_types:ctag(), boolean(),
+ rabbit_framing:amqp_table()}]).
-spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(consumers_all/1 ::
(rabbit_types:vhost())
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 886db8d1..cb59edd9 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -39,6 +39,7 @@
backing_queue,
backing_queue_state,
active_consumers,
+ consumer_use,
expires,
sync_timer_ref,
rate_timer_ref,
@@ -95,11 +96,12 @@
messages_unacknowledged,
messages,
consumers,
+ consumer_utilisation,
memory,
slave_pids,
synchronised_slave_pids,
backing_queue_status,
- status
+ state
]).
-define(CREATION_EVENT_KEYS,
@@ -149,6 +151,7 @@ init_state(Q) ->
exclusive_consumer = none,
has_had_consumers = false,
active_consumers = priority_queue:new(),
+ consumer_use = {inactive, now_micros(), 0, 0.0},
senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running,
@@ -486,7 +489,7 @@ deliver_msgs_to_consumers(DeliverFun, false,
State = #q{active_consumers = ActiveConsumers}) ->
case priority_queue:out_p(ActiveConsumers) of
{empty, _} ->
- {false, State};
+ {false, update_consumer_use(State, inactive)};
{{value, QEntry, Priority}, Tail} ->
{Stop, State1} = deliver_msg_to_consumer(
DeliverFun, QEntry, Priority,
@@ -533,6 +536,29 @@ deliver_msg_to_consumer0(DeliverFun,
unsent_message_count = Count + 1}),
{Stop, State1}.
+update_consumer_use(State = #q{consumer_use = CUInfo}, Use) ->
+ State#q{consumer_use = update_consumer_use1(CUInfo, Use)}.
+
+update_consumer_use1({inactive, _, _, _} = CUInfo, inactive) ->
+ CUInfo;
+update_consumer_use1({active, _, _} = CUInfo, active) ->
+ CUInfo;
+update_consumer_use1({active, Since, Avg}, inactive) ->
+ Now = now_micros(),
+ {inactive, Now, Now - Since, Avg};
+update_consumer_use1({inactive, Since, Active, Avg}, active) ->
+ Now = now_micros(),
+ {active, Now, consumer_use_avg(Active, Now - Since, Avg)}.
+
+consumer_use_avg(Active, Inactive, Avg) ->
+ Time = Inactive + Active,
+ Ratio = Active / Time,
+ Weight = erlang:min(1, Time / 1000000),
+ case Avg of
+ undefined -> Ratio;
+ _ -> Ratio * Weight + Avg * (1 - Weight)
+ end.
+
confirm_messages([], State) ->
State;
confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
@@ -725,7 +751,8 @@ unblock(State, C = #cr{limiter = Limiter}) ->
UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ}),
AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ),
- State1 = State#q{active_consumers = AC1},
+ State1 = update_consumer_use(State#q{active_consumers = AC1},
+ active),
[notify_decorators(
consumer_unblocked, [{consumer_tag, CTag}], State1) ||
{_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked],
@@ -1035,6 +1062,16 @@ i(messages, State) ->
messages_unacknowledged]]);
i(consumers, _) ->
consumer_count();
+i(consumer_utilisation, #q{consumer_use = ConsumerUse}) ->
+ case consumer_count() of
+ 0 -> '';
+ _ -> case ConsumerUse of
+ {active, Since, Avg} ->
+ consumer_use_avg(now_micros() - Since, 0, Avg);
+ {inactive, Since, Active, Avg} ->
+ consumer_use_avg(Active, now_micros() - Since, Avg)
+ end
+ end;
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -1052,8 +1089,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
-i(status, #q{status = Status}) ->
- Status;
+i(state, #q{status = running}) -> credit_flow:state();
+i(state, #q{status = State}) -> State;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
i(Item, _) ->
@@ -1074,7 +1111,10 @@ emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
- rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
+ ExtraKs = [K || {K, _} <- Extra],
+ Infos = [{K, V} || {K, V} <- infos(?STATISTICS_KEYS, State),
+ not lists:member(K, ExtraKs)],
+ rabbit_event:notify(queue_stats, Extra ++ Infos).
emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args) ->
rabbit_event:notify(consumer_created,
@@ -1537,7 +1577,8 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
BQS3 = BQ:handle_pre_hibernate(BQS2),
rabbit_event:if_enabled(
State, #q.stats_timer,
- fun () -> emit_stats(State, [{idle_since, now()}]) end),
+ fun () -> emit_stats(State, [{idle_since, now()},
+ {consumer_utilisation, ''}]) end),
State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3},
#q.stats_timer),
{hibernate, stop_rate_timer(State1)}.
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 2d703033..f65d8ea7 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -20,6 +20,7 @@
-export([parse_table/1]).
-export([ensure_content_decoded/1, clear_decoded_content/1]).
+-export([validate_utf8/1, assert_utf8/1]).
%%----------------------------------------------------------------------------
@@ -30,6 +31,8 @@
(rabbit_types:content()) -> rabbit_types:decoded_content()).
-spec(clear_decoded_content/1 ::
(rabbit_types:content()) -> rabbit_types:undecoded_content()).
+-spec(validate_utf8/1 :: (binary()) -> 'ok' | 'error').
+-spec(assert_utf8/1 :: (binary()) -> 'ok').
-endif.
@@ -99,3 +102,18 @@ clear_decoded_content(Content = #content{properties_bin = none}) ->
Content;
clear_decoded_content(Content = #content{}) ->
Content#content{properties = none}.
+
+assert_utf8(B) ->
+ case validate_utf8(B) of
+ ok -> ok;
+ error -> rabbit_misc:protocol_error(
+ frame_error, "Malformed UTF-8 in shortstr", [])
+ end.
+
+validate_utf8(Bin) ->
+ try
+ xmerl_ucs:from_utf8(Bin),
+ ok
+ catch exit:{ucs, _} ->
+ error
+ end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a3a0c754..4d778f94 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -53,7 +53,8 @@
messages_uncommitted,
acks_uncommitted,
prefetch_count,
- client_flow_blocked]).
+ client_flow_blocked,
+ state]).
-define(CREATION_EVENT_KEYS,
[pid,
@@ -550,6 +551,14 @@ check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) ->
check_not_default_exchange(_) ->
ok.
+check_exchange_deletion(XName = #resource{name = <<"amq.rabbitmq.", _/binary>>,
+ kind = exchange}) ->
+ rabbit_misc:protocol_error(
+ access_refused, "deletion of system ~s not allowed",
+ [rabbit_misc:rs(XName)]);
+check_exchange_deletion(_) ->
+ ok.
+
%% check that an exchange/queue name does not contain the reserved
%% "amq." prefix.
%%
@@ -592,7 +601,11 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
record_confirms(MXs, State#ch{unconfirmed = UC1}).
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
- {reply, #'channel.open_ok'{}, State#ch{state = running}};
+ %% Don't leave "starting" as the state for 5s. TODO is this TRTTD?
+ State1 = State#ch{state = running},
+ rabbit_event:if_enabled(State1, #ch.stats_timer,
+ fun() -> emit_stats(State1) end),
+ {reply, #'channel.open_ok'{}, State1};
handle_method(#'channel.open'{}, _, _State) ->
rabbit_misc:protocol_error(
@@ -933,6 +946,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
_, State = #ch{virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_not_default_exchange(ExchangeName),
+ check_exchange_deletion(ExchangeName),
check_configure_permitted(ExchangeName, State),
case rabbit_exchange:delete(ExchangeName, IfUnused) of
{error, not_found} ->
@@ -1615,6 +1629,8 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs);
i(messages_uncommitted, #ch{}) -> 0;
i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
+i(state, #ch{state = running}) -> credit_flow:state();
+i(state, #ch{state = State}) -> State;
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_prefetch_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 6f36f99d..f3463286 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -706,7 +706,14 @@ unsafe_rpc(Node, Mod, Fun, Args) ->
end.
call(Node, {Mod, Fun, Args}) ->
- rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary/1, Args)).
+ rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
+
+list_to_binary_utf8(L) ->
+ B = list_to_binary(L),
+ case rabbit_binary_parser:validate_utf8(B) of
+ ok -> B;
+ error -> throw({error, {not_utf_8, L}})
+ end.
rpc_call(Node, Mod, Fun, Args) ->
rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 17ed8563..ab8c62fe 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -51,7 +51,7 @@ stop() ->
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- topic, true, false, false, []),
+ topic, true, false, true, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 10e68198..488f1df5 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -201,7 +201,7 @@ init([]) ->
%% writing out the cluster status files - bad things can then
%% happen.
process_flag(trap_exit, true),
- net_kernel:monitor_nodes(true),
+ net_kernel:monitor_nodes(true, [nodedown_reason]),
{ok, _} = mnesia:subscribe(system),
{ok, #state{monitors = pmon:new(),
subscribers = pmon:new(),
@@ -267,7 +267,9 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state{subscribers = Subscribers}) ->
{noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}};
-handle_info({nodedown, Node}, State) ->
+handle_info({nodedown, Node, Info}, State) ->
+ rabbit_log:info("node ~p down: ~p~n",
+ [Node, proplists:get_value(nodedown_reason, Info)]),
ok = handle_dead_node(Node),
{noreply, State};
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 32b52e6e..f07ce419 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -32,15 +32,17 @@
-define(CLOSING_TIMEOUT, 30).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
-define(SILENT_CLOSE_DELAY, 3).
+-define(CHANNEL_MIN, 1).
%%--------------------------------------------------------------------------
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, helper_sup, queue_collector, heartbeater,
- stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}).
+ stats_timer, channel_sup_sup_pid, buf, buf_len, channel_count,
+ throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
- protocol, user, timeout_sec, frame_max, vhost,
+ protocol, user, timeout_sec, frame_max, channel_max, vhost,
client_properties, capabilities,
auth_mechanism, auth_state}).
@@ -48,15 +50,14 @@
blocked_sent}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
- send_pend, state, last_blocked_by, last_blocked_age,
- channels]).
+ send_pend, state, channels]).
-define(CREATION_EVENT_KEYS,
[pid, name, port, peer_port, host,
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
peer_cert_validity, auth_mechanism, ssl_protocol,
ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
- timeout, frame_max, client_properties]).
+ timeout, frame_max, channel_max, client_properties]).
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
@@ -240,6 +241,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
channel_sup_sup_pid = none,
buf = [],
buf_len = 0,
+ channel_count = 0,
throttle = #throttle{
alarmed_by = [],
last_blocked_by = none,
@@ -328,8 +330,8 @@ handle_other({conserve_resources, Source, Conserve},
control_throttle(State#v1{throttle = Throttle1});
handle_other({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
- channel_cleanup(ChPid),
- maybe_close(control_throttle(State));
+ {_, State1} = channel_cleanup(ChPid, State),
+ maybe_close(control_throttle(State1));
handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -486,63 +488,59 @@ close_connection(State = #v1{queue_collector = Collector,
State#v1{connection_state = closed}.
handle_dependent_exit(ChPid, Reason, State) ->
- case {channel_cleanup(ChPid), termination_kind(Reason)} of
- {undefined, controlled} -> State;
+ {Channel, State1} = channel_cleanup(ChPid, State),
+ case {Channel, termination_kind(Reason)} of
+ {undefined, controlled} -> State1;
{undefined, uncontrolled} -> exit({abnormal_dependent_exit,
ChPid, Reason});
- {_Channel, controlled} -> maybe_close(control_throttle(State));
- {Channel, uncontrolled} -> State1 = handle_exception(
- State, Channel, Reason),
- maybe_close(control_throttle(State1))
+ {_, controlled} -> maybe_close(control_throttle(State1));
+ {_, uncontrolled} -> State2 = handle_exception(
+ State1, Channel, Reason),
+ maybe_close(control_throttle(State2))
end.
-terminate_channels() ->
- NChannels =
- length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
- if NChannels > 0 ->
- Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
- TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
- wait_for_channel_termination(NChannels, TimerRef);
- true -> ok
- end.
+terminate_channels(#v1{channel_count = 0} = State) ->
+ State;
+terminate_channels(#v1{channel_count = ChannelCount} = State) ->
+ lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
+ Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * ChannelCount,
+ TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
+ wait_for_channel_termination(ChannelCount, TimerRef, State).
-wait_for_channel_termination(0, TimerRef) ->
+wait_for_channel_termination(0, TimerRef, State) ->
case erlang:cancel_timer(TimerRef) of
false -> receive
- cancel_wait -> ok
+ cancel_wait -> State
end;
- _ -> ok
+ _ -> State
end;
-
-wait_for_channel_termination(N, TimerRef) ->
+wait_for_channel_termination(N, TimerRef, State) ->
receive
{'DOWN', _MRef, process, ChPid, Reason} ->
- case {channel_cleanup(ChPid), termination_kind(Reason)} of
- {undefined, _} ->
- exit({abnormal_dependent_exit, ChPid, Reason});
- {_Channel, controlled} ->
- wait_for_channel_termination(N-1, TimerRef);
- {Channel, uncontrolled} ->
- log(error,
- "AMQP connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
- [self(), Channel, Reason]),
- wait_for_channel_termination(N-1, TimerRef)
+ {Channel, State1} = channel_cleanup(ChPid, State),
+ case {Channel, termination_kind(Reason)} of
+ {undefined, _} -> exit({abnormal_dependent_exit,
+ ChPid, Reason});
+ {_, controlled} -> wait_for_channel_termination(
+ N-1, TimerRef, State1);
+ {_, uncontrolled} -> log(error,
+ "AMQP connection ~p, channel ~p - "
+ "error while terminating:~n~p~n",
+ [self(), Channel, Reason]),
+ wait_for_channel_termination(
+ N-1, TimerRef, State1)
end;
cancel_wait ->
exit(channel_termination_timeout)
end.
maybe_close(State = #v1{connection_state = closing,
- connection = #connection{protocol = Protocol},
- sock = Sock}) ->
- case all_channels() of
- [] ->
- NewState = close_connection(State),
- ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
- NewState;
- _ -> State
- end;
+ channel_count = 0,
+ connection = #connection{protocol = Protocol},
+ sock = Sock}) ->
+ NewState = close_connection(State),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
+ NewState;
maybe_close(State) ->
State.
@@ -561,8 +559,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol},
[self(), CS, Channel, Reason]),
{0, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
- terminate_channels(),
- State1 = close_connection(State),
+ State1 = close_connection(terminate_channels(State)),
ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
State1;
handle_exception(State, Channel, Reason) ->
@@ -600,15 +597,26 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) ->
%%--------------------------------------------------------------------------
-create_channel(Channel, State) ->
- #v1{sock = Sock, queue_collector = Collector,
- channel_sup_sup_pid = ChanSupSup,
- connection = #connection{name = Name,
- protocol = Protocol,
- frame_max = FrameMax,
- user = User,
- vhost = VHost,
- capabilities = Capabilities}} = State,
+create_channel(_Channel,
+ #v1{channel_count = ChannelCount,
+ connection = #connection{channel_max = ChannelMax}})
+ when ChannelMax /= 0 andalso ChannelCount >= ChannelMax ->
+ {error, rabbit_misc:amqp_error(
+ not_allowed, "number of channels opened (~w) has reached the "
+ "negotiated channel_max (~w)",
+ [ChannelCount, ChannelMax], 'none')};
+create_channel(Channel,
+ #v1{sock = Sock,
+ queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ channel_count = ChannelCount,
+ connection =
+ #connection{name = Name,
+ protocol = Protocol,
+ frame_max = FrameMax,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities}} = State) ->
{ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
@@ -616,16 +624,16 @@ create_channel(Channel, State) ->
MRef = erlang:monitor(process, ChPid),
put({ch_pid, ChPid}, {Channel, MRef}),
put({channel, Channel}, {ChPid, AState}),
- {ChPid, AState}.
+ {ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}}.
-channel_cleanup(ChPid) ->
+channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) ->
case get({ch_pid, ChPid}) of
- undefined -> undefined;
+ undefined -> {undefined, State};
{Channel, MRef} -> credit_flow:peer_down(ChPid),
erase({channel, Channel}),
erase({ch_pid, ChPid}),
erlang:demonitor(MRef, [flush]),
- Channel
+ {Channel, State#v1{channel_count = ChannelCount - 1}}
end.
all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
@@ -664,32 +672,36 @@ handle_frame(Type, Channel, Payload, State) ->
process_frame(Frame, Channel, State) ->
ChKey = {channel, Channel},
- {ChPid, AState} = case get(ChKey) of
- undefined -> create_channel(Channel, State);
- Other -> Other
- end,
- case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} ->
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {ok, Method, NewAState} ->
- rabbit_channel:do(ChPid, Method),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {ok, Method, Content, NewAState} ->
- rabbit_channel:do_flow(ChPid, Method, Content),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, control_throttle(State));
- {error, Reason} ->
- handle_exception(State, Channel, Reason)
+ case (case get(ChKey) of
+ undefined -> create_channel(Channel, State);
+ Other -> {ok, Other, State}
+ end) of
+ {error, Error} ->
+ handle_exception(State, Channel, Error);
+ {ok, {ChPid, AState}, State1} ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} ->
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State1);
+ {ok, Method, NewAState} ->
+ rabbit_channel:do(ChPid, Method),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State1);
+ {ok, Method, Content, NewAState} ->
+ rabbit_channel:do_flow(ChPid, Method, Content),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, control_throttle(State1));
+ {error, Reason} ->
+ handle_exception(State1, Channel, Reason)
+ end
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
- channel_cleanup(ChPid),
+ {_, State1} = channel_cleanup(ChPid, State),
%% This is not strictly necessary, but more obviously
%% correct. Also note that we do not need to call maybe_close/1
%% since we cannot possibly be in the 'closing' state.
- control_throttle(State);
+ control_throttle(State1);
post_process_frame({content_header, _, _, _, _}, _ChPid, State) ->
maybe_block(State);
post_process_frame({content_body, _}, _ChPid, State) ->
@@ -808,7 +820,7 @@ handle_method0(MethodName, FieldsBin,
try
handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
State)
- catch throw:{writer_inet_error, closed} ->
+ catch throw:{inet_error, closed} ->
maybe_emit_stats(State),
throw(connection_closed_abruptly);
exit:#amqp_error{method = none} = Reason ->
@@ -843,38 +855,33 @@ handle_method0(#'connection.secure_ok'{response = Response},
State = #v1{connection_state = securing}) ->
auth_phase(Response, State);
-handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
- heartbeat = ClientHeartbeat},
+handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
+ channel_max = ChannelMax,
+ heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
helper_sup = SupPid,
sock = Sock}) ->
- ServerFrameMax = server_frame_max(),
- if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE ->
- rabbit_misc:protocol_error(
- not_allowed, "frame_max=~w < ~w min size",
- [FrameMax, ?FRAME_MIN_SIZE]);
- ServerFrameMax /= 0 andalso FrameMax > ServerFrameMax ->
- rabbit_misc:protocol_error(
- not_allowed, "frame_max=~w > ~w max size",
- [FrameMax, ServerFrameMax]);
- true ->
- {ok, Collector} =
- rabbit_connection_helper_sup:start_queue_collector(SupPid),
- Frame = rabbit_binary_generator:build_heartbeat_frame(),
- SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
- Parent = self(),
- ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
- Heartbeater =
- rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat,
- SendFun, ClientHeartbeat, ReceiveFun),
- State#v1{connection_state = opening,
- connection = Connection#connection{
- timeout_sec = ClientHeartbeat,
- frame_max = FrameMax},
- queue_collector = Collector,
- heartbeater = Heartbeater}
- end;
+ ok = validate_negotiated_integer_value(
+ frame_max, ?FRAME_MIN_SIZE, FrameMax),
+ ok = validate_negotiated_integer_value(
+ channel_max, ?CHANNEL_MIN, ChannelMax),
+ {ok, Collector} =
+ rabbit_connection_helper_sup:start_queue_collector(SupPid),
+ Frame = rabbit_binary_generator:build_heartbeat_frame(),
+ SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
+ Parent = self(),
+ ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
+ Heartbeater =
+ rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat,
+ SendFun, ClientHeartbeat, ReceiveFun),
+ State#v1{connection_state = opening,
+ connection = Connection#connection{
+ frame_max = FrameMax,
+ channel_max = ChannelMax,
+ timeout_sec = ClientHeartbeat},
+ queue_collector = Collector,
+ heartbeater = Heartbeater};
handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
@@ -922,13 +929,28 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
-server_frame_max() ->
- {ok, FrameMax} = application:get_env(rabbit, frame_max),
- FrameMax.
+validate_negotiated_integer_value(Field, Min, ClientValue) ->
+ ServerValue = get_env(Field),
+ if ClientValue /= 0 andalso ClientValue < Min ->
+ fail_negotiation(Field, min, ServerValue, ClientValue);
+ ServerValue /= 0 andalso ClientValue > ServerValue ->
+ fail_negotiation(Field, max, ServerValue, ClientValue);
+ true ->
+ ok
+ end.
+
+fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) ->
+ {S1, S2} = case MinOrMax of
+ min -> {lower, minimum};
+ max -> {higher, maximum}
+ end,
+ rabbit_misc:protocol_error(
+ not_allowed, "negotiated ~w = ~w is ~w than the ~w allowed value (~w)",
+ [Field, ClientValue, S1, S2, ServerValue], 'connection.tune').
-server_heartbeat() ->
- {ok, Heartbeat} = application:get_env(rabbit, heartbeat),
- Heartbeat.
+get_env(Key) ->
+ {ok, Value} = application:get_env(rabbit, Key),
+ Value.
send_on_channel0(Sock, Method, Protocol) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
@@ -994,9 +1016,9 @@ auth_phase(Response,
State#v1{connection = Connection#connection{
auth_state = AuthState1}};
{ok, User} ->
- Tune = #'connection.tune'{channel_max = 0,
- frame_max = server_frame_max(),
- heartbeat = server_heartbeat()},
+ Tune = #'connection.tune'{frame_max = get_env(frame_max),
+ channel_max = get_env(channel_max),
+ heartbeat = get_env(heartbeat)},
ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
connection = Connection#connection{user = User,
@@ -1023,13 +1045,15 @@ i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S);
i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
-i(state, #v1{connection_state = CS}) -> CS;
-i(last_blocked_by, #v1{throttle = #throttle{last_blocked_by = By}}) -> By;
-i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) ->
- infinity;
-i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) ->
- timer:now_diff(erlang:now(), T) / 1000000;
-i(channels, #v1{}) -> length(all_channels());
+i(channels, #v1{channel_count = ChannelCount}) -> ChannelCount;
+i(state, #v1{connection_state = ConnectionState,
+ throttle = #throttle{last_blocked_by = BlockedBy,
+ last_blocked_at = T}}) ->
+ Recent = T =/= never andalso timer:now_diff(erlang:now(), T) < 5000000,
+ case {BlockedBy, Recent} of
+ {flow, true} -> flow;
+ {_, _} -> ConnectionState
+ end;
i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
ic(name, #connection{name = Name}) -> Name;
@@ -1044,6 +1068,7 @@ ic(user, #connection{user = U}) -> U#user.username;
ic(vhost, #connection{vhost = VHost}) -> VHost;
ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout;
ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax;
+ic(channel_max, #connection{channel_max = ChMax}) -> ChMax;
ic(client_properties, #connection{client_properties = CP}) -> CP;
ic(auth_mechanism, #connection{auth_mechanism = none}) -> none;
ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name;
@@ -1084,10 +1109,8 @@ emit_stats(State) ->
%% If we emit an event which looks like we are in flow control, it's not a
%% good idea for it to be our last even if we go idle. Keep emitting
%% events, either we stay busy or we drop out of flow control.
- %% The 5 is to match the test in formatters.js:fmt_connection_state().
- %% This magic number will go away when bug 24829 is merged.
- case proplists:get_value(last_blocked_age, Infos) < 5 of
- true -> ensure_stats_timer(State1);
+ case proplists:get_value(state, Infos) of
+ flow -> ensure_stats_timer(State1);
_ -> State1
end.
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 6f95ef60..90372461 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -46,6 +46,7 @@
-rabbit_upgrade({exchange_decorators, mnesia, [policy]}).
-rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}).
-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}).
+-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}).
%% -------------------------------------------------------------------
@@ -74,6 +75,7 @@
-spec(exchange_decorators/0 :: () -> 'ok').
-spec(policy_apply_to/0 :: () -> 'ok').
-spec(queue_decorators/0 :: () -> 'ok').
+-spec(internal_system_x/0 :: () -> 'ok').
-endif.
@@ -340,6 +342,19 @@ queue_decorators(Table) ->
[name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
sync_slave_pids, policy, gm_pids, decorators]).
+internal_system_x() ->
+ transform(
+ rabbit_durable_exchange,
+ fun ({exchange, Name = {resource, _, _, <<"amq.rabbitmq.", _/binary>>},
+ Type, Dur, AutoDel, _Int, Args, Scratches, Policy, Decorators}) ->
+ {exchange, Name, Type, Dur, AutoDel, true, Args, Scratches,
+ Policy, Decorators};
+ (X) ->
+ X
+ end,
+ [name, type, durable, auto_delete, internal, arguments, scratches, policy,
+ decorators]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 8d013d43..047bce77 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -60,15 +60,17 @@ add(VHostPath) ->
(ok, false) ->
[rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, false, []) ||
- {Name,Type} <-
- [{<<"">>, direct},
- {<<"amq.direct">>, direct},
- {<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, fanout},
- {<<"amq.rabbitmq.trace">>, topic}]],
+ Type, true, false, Internal, []) ||
+ {Name, Type, Internal} <-
+ [{<<"">>, direct, false},
+ {<<"amq.direct">>, direct, false},
+ {<<"amq.topic">>, topic, false},
+ %% per 0-9-1 pdf
+ {<<"amq.match">>, headers, false},
+ %% per 0-9-1 xml
+ {<<"amq.headers">>, headers, false},
+ {<<"amq.fanout">>, fanout, false},
+ {<<"amq.rabbitmq.trace">>, topic, true}]],
ok
end),
rabbit_event:notify(vhost_created, info(VHostPath)),
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 92d48e63..34dd3d3b 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -272,7 +272,7 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
[MethodFrame | ContentFrames].
tcp_send(Sock, Data) ->
- rabbit_misc:throw_on_error(writer_inet_error,
+ rabbit_misc:throw_on_error(inet_error,
fun () -> rabbit_net:send(Sock, Data) end).
internal_send_command(Sock, Channel, MethodRecord, Protocol) ->