summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-12-03 17:14:55 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-12-03 17:14:55 +0000
commit077be866a983102148b508cf548f5da997fd32d1 (patch)
tree36bcd9082daf0e6e441d37ea4200ecff60466830
parentee9e3cb4ff6bc802f03f56e40234e11044f66c7a (diff)
parent5b163c9d705db10a847dfde870ef169b6291e40d (diff)
downloadrabbitmq-server-077be866a983102148b508cf548f5da997fd32d1.tar.gz
stable to default
-rw-r--r--codegen.py8
-rw-r--r--docs/rabbitmqctl.1.xml7
-rw-r--r--src/rabbit_amqqueue_process.erl59
-rw-r--r--src/rabbit_binary_parser.erl18
-rw-r--r--src/rabbit_channel.erl9
-rw-r--r--src/rabbit_control_main.erl9
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_upgrade_functions.erl15
-rw-r--r--src/rabbit_vhost.erl20
9 files changed, 125 insertions, 22 deletions
diff --git a/codegen.py b/codegen.py
index 842549cf..91fa1154 100644
--- a/codegen.py
+++ b/codegen.py
@@ -187,7 +187,7 @@ def genErl(spec):
elif type == 'table':
return p+'Len:32/unsigned, '+p+'Tab:'+p+'Len/binary'
- def genFieldPostprocessing(packed):
+ def genFieldPostprocessing(packed, hasContent):
for f in packed:
type = erlType(f.domain)
if type == 'bit':
@@ -199,6 +199,10 @@ def genErl(spec):
elif type == 'table':
print " F%d = rabbit_binary_parser:parse_table(F%dTab)," % \
(f.index, f.index)
+ # We skip the check on content-bearing methods for
+ # speed. This is a sanity check, not a security thing.
+ elif type == 'shortstr' and not hasContent:
+ print " rabbit_binary_parser:assert_utf8(F%d)," % (f.index)
else:
pass
@@ -214,7 +218,7 @@ def genErl(spec):
restSeparator = ''
recordConstructorExpr = '#%s{%s}' % (m.erlangName(), fieldMapList(m.arguments))
print "decode_method_fields(%s, <<%s>>) ->" % (m.erlangName(), binaryPattern)
- genFieldPostprocessing(packedFields)
+ genFieldPostprocessing(packedFields, m.hasContent)
print " %s;" % (recordConstructorExpr,)
def genDecodeProperties(c):
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 6ec7ee07..19d29577 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1135,6 +1135,13 @@
<listitem><para>Number of consumers.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>consumer_utilisation</term>
+ <listitem><para>Fraction of the time (between 0.0 and 1.0)
+ that the queue is able to immediately deliver messages to
+ consumers. This can be less than 1.0 if consumers are limited
+ by network congestion or prefetch count.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>memory</term>
<listitem><para>Bytes of memory consumed by the Erlang process associated with the
queue, including stack, heap and internal structures.</para></listitem>
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4ff30ce0..65ab15c0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -39,6 +39,7 @@
backing_queue,
backing_queue_state,
active_consumers,
+ consumer_use,
expires,
sync_timer_ref,
rate_timer_ref,
@@ -95,6 +96,7 @@
messages_unacknowledged,
messages,
consumers,
+ consumer_utilisation,
memory,
slave_pids,
synchronised_slave_pids,
@@ -149,6 +151,7 @@ init_state(Q) ->
exclusive_consumer = none,
has_had_consumers = false,
active_consumers = priority_queue:new(),
+ consumer_use = {inactive, now_micros(), 0, 0.0},
senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running,
@@ -482,10 +485,12 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
- State = #q{active_consumers = ActiveConsumers}) ->
+ State = #q{active_consumers = ActiveConsumers,
+ consumer_use = CUInfo}) ->
case priority_queue:out_p(ActiveConsumers) of
{empty, _} ->
- {false, State};
+ {false,
+ State#q{consumer_use = update_consumer_use(CUInfo, inactive)}};
{{value, QEntry, Priority}, Tail} ->
{Stop, State1} = deliver_msg_to_consumer(
DeliverFun, QEntry, Priority,
@@ -536,6 +541,26 @@ deliver_from_queue_deliver(AckRequired, State) ->
{Result, State1} = fetch(AckRequired, State),
{Result, is_empty(State1), State1}.
+update_consumer_use({inactive, _, _, _} = CUInfo, inactive) ->
+ CUInfo;
+update_consumer_use({active, _, _} = CUInfo, active) ->
+ CUInfo;
+update_consumer_use({active, Since, Avg}, inactive) ->
+ Now = now_micros(),
+ {inactive, Now, Now - Since, Avg};
+update_consumer_use({inactive, Since, Active, Avg}, active) ->
+ Now = now_micros(),
+ {active, Now, consumer_use_avg(Active, Now - Since, Avg)}.
+
+consumer_use_avg(Active, Inactive, Avg) ->
+ Time = Inactive + Active,
+ Ratio = Active / Time,
+ Weight = erlang:min(1, Time / 1000000),
+ case Avg of
+ undefined -> Ratio;
+ _ -> Ratio * Weight + Avg * (1 - Weight)
+ end.
+
confirm_messages([], State) ->
State;
confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
@@ -713,7 +738,7 @@ possibly_unblock(State, ChPid, Update) ->
end
end.
-unblock(State, C = #cr{limiter = Limiter}) ->
+unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) ->
case lists:partition(
fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
rabbit_limiter:is_consumer_blocked(Limiter, CTag)
@@ -725,12 +750,14 @@ unblock(State, C = #cr{limiter = Limiter}) ->
BlockedQ = priority_queue:from_list(Blocked),
UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ}),
- AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ),
- State1 = State#q{active_consumers = AC1},
+ State1 = State#q{consumer_use =
+ update_consumer_use(CUInfo, active)},
+ AC1 = priority_queue:join(State1#q.active_consumers, UnblockedQ),
+ State2 = State1#q{active_consumers = AC1},
[notify_decorators(
- consumer_unblocked, [{consumer_tag, CTag}], State1) ||
+ consumer_unblocked, [{consumer_tag, CTag}], State2) ||
{_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked],
- run_message_queue(State1)
+ run_message_queue(State2)
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
@@ -1037,6 +1064,16 @@ i(messages, State) ->
messages_unacknowledged]]);
i(consumers, _) ->
consumer_count();
+i(consumer_utilisation, #q{consumer_use = ConsumerUse}) ->
+ case consumer_count() of
+ 0 -> '';
+ _ -> case ConsumerUse of
+ {active, Since, Avg} ->
+ consumer_use_avg(now_micros() - Since, 0, Avg);
+ {inactive, Since, Active, Avg} ->
+ consumer_use_avg(Active, now_micros() - Since, Avg)
+ end
+ end;
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -1076,7 +1113,10 @@ emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
- rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
+ ExtraKs = [K || {K, _} <- Extra],
+ Infos = [{K, V} || {K, V} <- infos(?STATISTICS_KEYS, State),
+ not lists:member(K, ExtraKs)],
+ rabbit_event:notify(queue_stats, Extra ++ Infos).
emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args) ->
rabbit_event:notify(consumer_created,
@@ -1537,7 +1577,8 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
BQS3 = BQ:handle_pre_hibernate(BQS2),
rabbit_event:if_enabled(
State, #q.stats_timer,
- fun () -> emit_stats(State, [{idle_since, now()}]) end),
+ fun () -> emit_stats(State, [{idle_since, now()},
+ {consumer_utilisation, ''}]) end),
State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3},
#q.stats_timer),
{hibernate, stop_rate_timer(State1)}.
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index dc6d090f..088ad0e5 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -20,6 +20,7 @@
-export([parse_table/1]).
-export([ensure_content_decoded/1, clear_decoded_content/1]).
+-export([validate_utf8/1, assert_utf8/1]).
%%----------------------------------------------------------------------------
@@ -30,6 +31,8 @@
(rabbit_types:content()) -> rabbit_types:decoded_content()).
-spec(clear_decoded_content/1 ::
(rabbit_types:content()) -> rabbit_types:undecoded_content()).
+-spec(validate_utf8/1 :: (binary()) -> 'ok' | 'error').
+-spec(assert_utf8/1 :: (binary()) -> 'ok').
-endif.
@@ -99,3 +102,18 @@ clear_decoded_content(Content = #content{properties_bin = none}) ->
Content;
clear_decoded_content(Content = #content{}) ->
Content#content{properties = none}.
+
+assert_utf8(B) ->
+ case validate_utf8(B) of
+ ok -> ok;
+ error -> rabbit_misc:protocol_error(
+ frame_error, "Malformed UTF-8 in shortstr", [])
+ end.
+
+validate_utf8(Bin) ->
+ try
+ xmerl_ucs:from_utf8(Bin),
+ ok
+ catch exit:{ucs, _} ->
+ error
+ end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a3a0c754..6aa88898 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -550,6 +550,14 @@ check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) ->
check_not_default_exchange(_) ->
ok.
+check_exchange_deletion(XName = #resource{name = <<"amq.rabbitmq.", _/binary>>,
+ kind = exchange}) ->
+ rabbit_misc:protocol_error(
+ access_refused, "deletion of system ~s not allowed",
+ [rabbit_misc:rs(XName)]);
+check_exchange_deletion(_) ->
+ ok.
+
%% check that an exchange/queue name does not contain the reserved
%% "amq." prefix.
%%
@@ -933,6 +941,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
_, State = #ch{virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_not_default_exchange(ExchangeName),
+ check_exchange_deletion(ExchangeName),
check_configure_permitted(ExchangeName, State),
case rabbit_exchange:delete(ExchangeName, IfUnused) of
{error, not_found} ->
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 6f36f99d..f3463286 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -706,7 +706,14 @@ unsafe_rpc(Node, Mod, Fun, Args) ->
end.
call(Node, {Mod, Fun, Args}) ->
- rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary/1, Args)).
+ rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
+
+list_to_binary_utf8(L) ->
+ B = list_to_binary(L),
+ case rabbit_binary_parser:validate_utf8(B) of
+ ok -> B;
+ error -> throw({error, {not_utf_8, L}})
+ end.
rpc_call(Node, Mod, Fun, Args) ->
rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 17ed8563..ab8c62fe 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -51,7 +51,7 @@ stop() ->
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- topic, true, false, false, []),
+ topic, true, false, true, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 6f95ef60..90372461 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -46,6 +46,7 @@
-rabbit_upgrade({exchange_decorators, mnesia, [policy]}).
-rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}).
-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}).
+-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}).
%% -------------------------------------------------------------------
@@ -74,6 +75,7 @@
-spec(exchange_decorators/0 :: () -> 'ok').
-spec(policy_apply_to/0 :: () -> 'ok').
-spec(queue_decorators/0 :: () -> 'ok').
+-spec(internal_system_x/0 :: () -> 'ok').
-endif.
@@ -340,6 +342,19 @@ queue_decorators(Table) ->
[name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
sync_slave_pids, policy, gm_pids, decorators]).
+internal_system_x() ->
+ transform(
+ rabbit_durable_exchange,
+ fun ({exchange, Name = {resource, _, _, <<"amq.rabbitmq.", _/binary>>},
+ Type, Dur, AutoDel, _Int, Args, Scratches, Policy, Decorators}) ->
+ {exchange, Name, Type, Dur, AutoDel, true, Args, Scratches,
+ Policy, Decorators};
+ (X) ->
+ X
+ end,
+ [name, type, durable, auto_delete, internal, arguments, scratches, policy,
+ decorators]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 8d013d43..047bce77 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -60,15 +60,17 @@ add(VHostPath) ->
(ok, false) ->
[rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, false, []) ||
- {Name,Type} <-
- [{<<"">>, direct},
- {<<"amq.direct">>, direct},
- {<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, fanout},
- {<<"amq.rabbitmq.trace">>, topic}]],
+ Type, true, false, Internal, []) ||
+ {Name, Type, Internal} <-
+ [{<<"">>, direct, false},
+ {<<"amq.direct">>, direct, false},
+ {<<"amq.topic">>, topic, false},
+ %% per 0-9-1 pdf
+ {<<"amq.match">>, headers, false},
+ %% per 0-9-1 xml
+ {<<"amq.headers">>, headers, false},
+ {<<"amq.fanout">>, fanout, false},
+ {<<"amq.rabbitmq.trace">>, topic, true}]],
ok
end),
rabbit_event:notify(vhost_created, info(VHostPath)),