diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-12-03 17:14:55 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-12-03 17:14:55 +0000 |
commit | 077be866a983102148b508cf548f5da997fd32d1 (patch) | |
tree | 36bcd9082daf0e6e441d37ea4200ecff60466830 | |
parent | ee9e3cb4ff6bc802f03f56e40234e11044f66c7a (diff) | |
parent | 5b163c9d705db10a847dfde870ef169b6291e40d (diff) | |
download | rabbitmq-server-077be866a983102148b508cf548f5da997fd32d1.tar.gz |
stable to default
-rw-r--r-- | codegen.py | 8 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.xml | 7 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 59 | ||||
-rw-r--r-- | src/rabbit_binary_parser.erl | 18 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 9 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 9 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 2 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 15 | ||||
-rw-r--r-- | src/rabbit_vhost.erl | 20 |
9 files changed, 125 insertions, 22 deletions
@@ -187,7 +187,7 @@ def genErl(spec): elif type == 'table': return p+'Len:32/unsigned, '+p+'Tab:'+p+'Len/binary' - def genFieldPostprocessing(packed): + def genFieldPostprocessing(packed, hasContent): for f in packed: type = erlType(f.domain) if type == 'bit': @@ -199,6 +199,10 @@ def genErl(spec): elif type == 'table': print " F%d = rabbit_binary_parser:parse_table(F%dTab)," % \ (f.index, f.index) + # We skip the check on content-bearing methods for + # speed. This is a sanity check, not a security thing. + elif type == 'shortstr' and not hasContent: + print " rabbit_binary_parser:assert_utf8(F%d)," % (f.index) else: pass @@ -214,7 +218,7 @@ def genErl(spec): restSeparator = '' recordConstructorExpr = '#%s{%s}' % (m.erlangName(), fieldMapList(m.arguments)) print "decode_method_fields(%s, <<%s>>) ->" % (m.erlangName(), binaryPattern) - genFieldPostprocessing(packedFields) + genFieldPostprocessing(packedFields, m.hasContent) print " %s;" % (recordConstructorExpr,) def genDecodeProperties(c): diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 6ec7ee07..19d29577 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1135,6 +1135,13 @@ <listitem><para>Number of consumers.</para></listitem> </varlistentry> <varlistentry> + <term>consumer_utilisation</term> + <listitem><para>Fraction of the time (between 0.0 and 1.0) + that the queue is able to immediately deliver messages to + consumers. This can be less than 1.0 if consumers are limited + by network congestion or prefetch count.</para></listitem> + </varlistentry> + <varlistentry> <term>memory</term> <listitem><para>Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.</para></listitem> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4ff30ce0..65ab15c0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,6 +39,7 @@ backing_queue, backing_queue_state, active_consumers, + consumer_use, expires, sync_timer_ref, rate_timer_ref, @@ -95,6 +96,7 @@ messages_unacknowledged, messages, consumers, + consumer_utilisation, memory, slave_pids, synchronised_slave_pids, @@ -149,6 +151,7 @@ init_state(Q) -> exclusive_consumer = none, has_had_consumers = false, active_consumers = priority_queue:new(), + consumer_use = {inactive, now_micros(), 0, 0.0}, senders = pmon:new(delegate), msg_id_to_channel = gb_trees:empty(), status = running, @@ -482,10 +485,12 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; deliver_msgs_to_consumers(DeliverFun, false, - State = #q{active_consumers = ActiveConsumers}) -> + State = #q{active_consumers = ActiveConsumers, + consumer_use = CUInfo}) -> case priority_queue:out_p(ActiveConsumers) of {empty, _} -> - {false, State}; + {false, + State#q{consumer_use = update_consumer_use(CUInfo, inactive)}}; {{value, QEntry, Priority}, Tail} -> {Stop, State1} = deliver_msg_to_consumer( DeliverFun, QEntry, Priority, @@ -536,6 +541,26 @@ deliver_from_queue_deliver(AckRequired, State) -> {Result, State1} = fetch(AckRequired, State), {Result, is_empty(State1), State1}. +update_consumer_use({inactive, _, _, _} = CUInfo, inactive) -> + CUInfo; +update_consumer_use({active, _, _} = CUInfo, active) -> + CUInfo; +update_consumer_use({active, Since, Avg}, inactive) -> + Now = now_micros(), + {inactive, Now, Now - Since, Avg}; +update_consumer_use({inactive, Since, Active, Avg}, active) -> + Now = now_micros(), + {active, Now, consumer_use_avg(Active, Now - Since, Avg)}. + +consumer_use_avg(Active, Inactive, Avg) -> + Time = Inactive + Active, + Ratio = Active / Time, + Weight = erlang:min(1, Time / 1000000), + case Avg of + undefined -> Ratio; + _ -> Ratio * Weight + Avg * (1 - Weight) + end. + confirm_messages([], State) -> State; confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> @@ -713,7 +738,7 @@ possibly_unblock(State, ChPid, Update) -> end end. -unblock(State, C = #cr{limiter = Limiter}) -> +unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) -> case lists:partition( fun({_P, {_ChPid, #consumer{tag = CTag}}}) -> rabbit_limiter:is_consumer_blocked(Limiter, CTag) @@ -725,12 +750,14 @@ unblock(State, C = #cr{limiter = Limiter}) -> BlockedQ = priority_queue:from_list(Blocked), UnblockedQ = priority_queue:from_list(Unblocked), update_ch_record(C#cr{blocked_consumers = BlockedQ}), - AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ), - State1 = State#q{active_consumers = AC1}, + State1 = State#q{consumer_use = + update_consumer_use(CUInfo, active)}, + AC1 = priority_queue:join(State1#q.active_consumers, UnblockedQ), + State2 = State1#q{active_consumers = AC1}, [notify_decorators( - consumer_unblocked, [{consumer_tag, CTag}], State1) || + consumer_unblocked, [{consumer_tag, CTag}], State2) || {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked], - run_message_queue(State1) + run_message_queue(State2) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; @@ -1037,6 +1064,16 @@ i(messages, State) -> messages_unacknowledged]]); i(consumers, _) -> consumer_count(); +i(consumer_utilisation, #q{consumer_use = ConsumerUse}) -> + case consumer_count() of + 0 -> ''; + _ -> case ConsumerUse of + {active, Since, Avg} -> + consumer_use_avg(now_micros() - Since, 0, Avg); + {inactive, Since, Active, Avg} -> + consumer_use_avg(Active, now_micros() - Since, Avg) + end + end; i(memory, _) -> {memory, M} = process_info(self(), memory), M; @@ -1076,7 +1113,10 @@ emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> - rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). + ExtraKs = [K || {K, _} <- Extra], + Infos = [{K, V} || {K, V} <- infos(?STATISTICS_KEYS, State), + not lists:member(K, ExtraKs)], + rabbit_event:notify(queue_stats, Extra ++ Infos). emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args) -> rabbit_event:notify(consumer_created, @@ -1537,7 +1577,8 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, BQS3 = BQ:handle_pre_hibernate(BQS2), rabbit_event:if_enabled( State, #q.stats_timer, - fun () -> emit_stats(State, [{idle_since, now()}]) end), + fun () -> emit_stats(State, [{idle_since, now()}, + {consumer_utilisation, ''}]) end), State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3}, #q.stats_timer), {hibernate, stop_rate_timer(State1)}. diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index dc6d090f..088ad0e5 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -20,6 +20,7 @@ -export([parse_table/1]). -export([ensure_content_decoded/1, clear_decoded_content/1]). +-export([validate_utf8/1, assert_utf8/1]). %%---------------------------------------------------------------------------- @@ -30,6 +31,8 @@ (rabbit_types:content()) -> rabbit_types:decoded_content()). -spec(clear_decoded_content/1 :: (rabbit_types:content()) -> rabbit_types:undecoded_content()). +-spec(validate_utf8/1 :: (binary()) -> 'ok' | 'error'). +-spec(assert_utf8/1 :: (binary()) -> 'ok'). -endif. @@ -99,3 +102,18 @@ clear_decoded_content(Content = #content{properties_bin = none}) -> Content; clear_decoded_content(Content = #content{}) -> Content#content{properties = none}. + +assert_utf8(B) -> + case validate_utf8(B) of + ok -> ok; + error -> rabbit_misc:protocol_error( + frame_error, "Malformed UTF-8 in shortstr", []) + end. + +validate_utf8(Bin) -> + try + xmerl_ucs:from_utf8(Bin), + ok + catch exit:{ucs, _} -> + error + end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a3a0c754..6aa88898 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -550,6 +550,14 @@ check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) -> check_not_default_exchange(_) -> ok. +check_exchange_deletion(XName = #resource{name = <<"amq.rabbitmq.", _/binary>>, + kind = exchange}) -> + rabbit_misc:protocol_error( + access_refused, "deletion of system ~s not allowed", + [rabbit_misc:rs(XName)]); +check_exchange_deletion(_) -> + ok. + %% check that an exchange/queue name does not contain the reserved %% "amq." prefix. %% @@ -933,6 +941,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, _, State = #ch{virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_not_default_exchange(ExchangeName), + check_exchange_deletion(ExchangeName), check_configure_permitted(ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of {error, not_found} -> diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 6f36f99d..f3463286 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -706,7 +706,14 @@ unsafe_rpc(Node, Mod, Fun, Args) -> end. call(Node, {Mod, Fun, Args}) -> - rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary/1, Args)). + rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)). + +list_to_binary_utf8(L) -> + B = list_to_binary(L), + case rabbit_binary_parser:validate_utf8(B) of + ok -> B; + error -> throw({error, {not_utf_8, L}}) + end. rpc_call(Node, Mod, Fun, Args) -> rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT). diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 17ed8563..ab8c62fe 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -51,7 +51,7 @@ stop() -> init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), - topic, true, false, false, []), + topic, true, false, true, []), {ok, #resource{virtual_host = DefaultVHost, kind = exchange, name = ?LOG_EXCH_NAME}}. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 6f95ef60..90372461 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -46,6 +46,7 @@ -rabbit_upgrade({exchange_decorators, mnesia, [policy]}). -rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}). -rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}). +-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}). %% ------------------------------------------------------------------- @@ -74,6 +75,7 @@ -spec(exchange_decorators/0 :: () -> 'ok'). -spec(policy_apply_to/0 :: () -> 'ok'). -spec(queue_decorators/0 :: () -> 'ok'). +-spec(internal_system_x/0 :: () -> 'ok'). -endif. @@ -340,6 +342,19 @@ queue_decorators(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, sync_slave_pids, policy, gm_pids, decorators]). +internal_system_x() -> + transform( + rabbit_durable_exchange, + fun ({exchange, Name = {resource, _, _, <<"amq.rabbitmq.", _/binary>>}, + Type, Dur, AutoDel, _Int, Args, Scratches, Policy, Decorators}) -> + {exchange, Name, Type, Dur, AutoDel, true, Args, Scratches, + Policy, Decorators}; + (X) -> + X + end, + [name, type, durable, auto_delete, internal, arguments, scratches, policy, + decorators]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 8d013d43..047bce77 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -60,15 +60,17 @@ add(VHostPath) -> (ok, false) -> [rabbit_exchange:declare( rabbit_misc:r(VHostPath, exchange, Name), - Type, true, false, false, []) || - {Name,Type} <- - [{<<"">>, direct}, - {<<"amq.direct">>, direct}, - {<<"amq.topic">>, topic}, - {<<"amq.match">>, headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, headers}, %% per 0-9-1 xml - {<<"amq.fanout">>, fanout}, - {<<"amq.rabbitmq.trace">>, topic}]], + Type, true, false, Internal, []) || + {Name, Type, Internal} <- + [{<<"">>, direct, false}, + {<<"amq.direct">>, direct, false}, + {<<"amq.topic">>, topic, false}, + %% per 0-9-1 pdf + {<<"amq.match">>, headers, false}, + %% per 0-9-1 xml + {<<"amq.headers">>, headers, false}, + {<<"amq.fanout">>, fanout, false}, + {<<"amq.rabbitmq.trace">>, topic, true}]], ok end), rabbit_event:notify(vhost_created, info(VHostPath)), |