diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-01 19:42:35 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-01 19:42:35 +0000 |
commit | 81c13921632b9055fb02d95a43fb5a30cbdbc769 (patch) | |
tree | 83c1f048180a17414c98731d96ddad9706506c96 | |
parent | 58bba5b4d071ca56e896754180f5f18957c58ad0 (diff) | |
parent | 999bc555309df76482c617def580df152472a98f (diff) | |
download | rabbitmq-server-81c13921632b9055fb02d95a43fb5a30cbdbc769.tar.gz |
merge stable into default
-rw-r--r-- | codegen.py | 8 | ||||
-rw-r--r-- | docs/rabbitmq.config.example | 5 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.xml | 28 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
-rw-r--r-- | src/credit_flow.erl | 21 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 55 | ||||
-rw-r--r-- | src/rabbit_binary_parser.erl | 18 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 20 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 9 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 2 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 6 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 283 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 15 | ||||
-rw-r--r-- | src/rabbit_vhost.erl | 20 |
15 files changed, 321 insertions, 173 deletions
@@ -187,7 +187,7 @@ def genErl(spec): elif type == 'table': return p+'Len:32/unsigned, '+p+'Tab:'+p+'Len/binary' - def genFieldPostprocessing(packed): + def genFieldPostprocessing(packed, hasContent): for f in packed: type = erlType(f.domain) if type == 'bit': @@ -199,6 +199,10 @@ def genErl(spec): elif type == 'table': print " F%d = rabbit_binary_parser:parse_table(F%dTab)," % \ (f.index, f.index) + # We skip the check on content-bearing methods for + # speed. This is a sanity check, not a security thing. + elif type == 'shortstr' and not hasContent: + print " rabbit_binary_parser:assert_utf8(F%d)," % (f.index) else: pass @@ -214,7 +218,7 @@ def genErl(spec): restSeparator = '' recordConstructorExpr = '#%s{%s}' % (m.erlangName(), fieldMapList(m.arguments)) print "decode_method_fields(%s, <<%s>>) ->" % (m.erlangName(), binaryPattern) - genFieldPostprocessing(packedFields) + genFieldPostprocessing(packedFields, m.hasContent) print " %s;" % (recordConstructorExpr,) def genDecodeProperties(c): diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index 91402649..c0d6cc70 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -138,6 +138,11 @@ %% %% {frame_max, 131072}, + %% Set the max permissible number of channels per connection. + %% 0 means "no limit". + %% + %% {channel_max, 128}, + %% Customising Socket Options. %% %% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 6ec7ee07..d2a3f7c7 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1135,6 +1135,13 @@ <listitem><para>Number of consumers.</para></listitem> </varlistentry> <varlistentry> + <term>consumer_utilisation</term> + <listitem><para>Fraction of the time (between 0.0 and 1.0) + that the queue is able to immediately deliver messages to + consumers. This can be less than 1.0 if consumers are limited + by network congestion or prefetch count.</para></listitem> + </varlistentry> + <varlistentry> <term>memory</term> <listitem><para>Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.</para></listitem> @@ -1390,24 +1397,9 @@ </varlistentry> <varlistentry> - <term>last_blocked_by</term> - <listitem><para>The reason for which this connection - was last blocked. One of 'resource' - due to a memory - or disk alarm, 'flow' - due to internal flow control, or - 'none' if the connection was never - blocked.</para></listitem> - </varlistentry> - <varlistentry> - <term>last_blocked_age</term> - <listitem><para>Time, in seconds, since this - connection was last blocked, or - 'infinity'.</para></listitem> - </varlistentry> - - <varlistentry> <term>state</term> <listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>, - <command>opening</command>, <command>running</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem> + <command>opening</command>, <command>running</command>, <command>flow</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem> </varlistentry> <varlistentry> <term>channels</term> @@ -1438,6 +1430,10 @@ <listitem><para>Maximum frame size (bytes).</para></listitem> </varlistentry> <varlistentry> + <term>channel_max</term> + <listitem><para>Maximum number of channels on this connection.</para></listitem> + </varlistentry> + <varlistentry> <term>client_properties</term> <listitem><para>Informational properties transmitted by the client during connection establishment.</para></listitem> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index f0fee96a..29f06e79 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -25,6 +25,7 @@ %% 0 ("no limit") would make a better default, but that %% breaks the QPid Java client {frame_max, 131072}, + {channel_max, 0}, {heartbeat, 580}, {msg_store_file_size_limit, 16777216}, {queue_index_max_journal_entries, 65536}, diff --git a/src/credit_flow.erl b/src/credit_flow.erl index d48d649e..39a257ac 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -30,7 +30,7 @@ -define(DEFAULT_CREDIT, {200, 50}). --export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]). +-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0, state/0]). -export([peer_down/1]). %%---------------------------------------------------------------------------- @@ -110,6 +110,18 @@ blocked() -> case get(credit_blocked) of _ -> true end. +state() -> case blocked() of + true -> flow; + false -> case get(credit_blocked_at) of + undefined -> running; + B -> Diff = timer:now_diff(erlang:now(), B), + case Diff < 5000000 of + true -> flow; + false -> running + end + end + end. + peer_down(Peer) -> %% In theory we could also remove it from credit_deferred here, but it %% doesn't really matter; at some point later we will drain @@ -128,7 +140,12 @@ grant(To, Quantity) -> true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred]) end. -block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]). +block(From) -> + case blocked() of + false -> put(credit_blocked_at, erlang:now()); + true -> ok + end, + ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]). unblock(From) -> ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8306f134..6b1e00b7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -115,7 +115,8 @@ -spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(consumers/1 :: (rabbit_types:amqqueue()) - -> [{pid(), rabbit_types:ctag(), boolean()}]). + -> [{pid(), rabbit_types:ctag(), boolean(), + rabbit_framing:amqp_table()}]). -spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(consumers_all/1 :: (rabbit_types:vhost()) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 886db8d1..cb59edd9 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,6 +39,7 @@ backing_queue, backing_queue_state, active_consumers, + consumer_use, expires, sync_timer_ref, rate_timer_ref, @@ -95,11 +96,12 @@ messages_unacknowledged, messages, consumers, + consumer_utilisation, memory, slave_pids, synchronised_slave_pids, backing_queue_status, - status + state ]). -define(CREATION_EVENT_KEYS, @@ -149,6 +151,7 @@ init_state(Q) -> exclusive_consumer = none, has_had_consumers = false, active_consumers = priority_queue:new(), + consumer_use = {inactive, now_micros(), 0, 0.0}, senders = pmon:new(delegate), msg_id_to_channel = gb_trees:empty(), status = running, @@ -486,7 +489,7 @@ deliver_msgs_to_consumers(DeliverFun, false, State = #q{active_consumers = ActiveConsumers}) -> case priority_queue:out_p(ActiveConsumers) of {empty, _} -> - {false, State}; + {false, update_consumer_use(State, inactive)}; {{value, QEntry, Priority}, Tail} -> {Stop, State1} = deliver_msg_to_consumer( DeliverFun, QEntry, Priority, @@ -533,6 +536,29 @@ deliver_msg_to_consumer0(DeliverFun, unsent_message_count = Count + 1}), {Stop, State1}. +update_consumer_use(State = #q{consumer_use = CUInfo}, Use) -> + State#q{consumer_use = update_consumer_use1(CUInfo, Use)}. + +update_consumer_use1({inactive, _, _, _} = CUInfo, inactive) -> + CUInfo; +update_consumer_use1({active, _, _} = CUInfo, active) -> + CUInfo; +update_consumer_use1({active, Since, Avg}, inactive) -> + Now = now_micros(), + {inactive, Now, Now - Since, Avg}; +update_consumer_use1({inactive, Since, Active, Avg}, active) -> + Now = now_micros(), + {active, Now, consumer_use_avg(Active, Now - Since, Avg)}. + +consumer_use_avg(Active, Inactive, Avg) -> + Time = Inactive + Active, + Ratio = Active / Time, + Weight = erlang:min(1, Time / 1000000), + case Avg of + undefined -> Ratio; + _ -> Ratio * Weight + Avg * (1 - Weight) + end. + confirm_messages([], State) -> State; confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> @@ -725,7 +751,8 @@ unblock(State, C = #cr{limiter = Limiter}) -> UnblockedQ = priority_queue:from_list(Unblocked), update_ch_record(C#cr{blocked_consumers = BlockedQ}), AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ), - State1 = State#q{active_consumers = AC1}, + State1 = update_consumer_use(State#q{active_consumers = AC1}, + active), [notify_decorators( consumer_unblocked, [{consumer_tag, CTag}], State1) || {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked], @@ -1035,6 +1062,16 @@ i(messages, State) -> messages_unacknowledged]]); i(consumers, _) -> consumer_count(); +i(consumer_utilisation, #q{consumer_use = ConsumerUse}) -> + case consumer_count() of + 0 -> ''; + _ -> case ConsumerUse of + {active, Since, Avg} -> + consumer_use_avg(now_micros() - Since, 0, Avg); + {inactive, Since, Active, Avg} -> + consumer_use_avg(Active, now_micros() - Since, Avg) + end + end; i(memory, _) -> {memory, M} = process_info(self(), memory), M; @@ -1052,8 +1089,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> false -> ''; true -> SSPids end; -i(status, #q{status = Status}) -> - Status; +i(state, #q{status = running}) -> credit_flow:state(); +i(state, #q{status = State}) -> State; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); i(Item, _) -> @@ -1074,7 +1111,10 @@ emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> - rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). + ExtraKs = [K || {K, _} <- Extra], + Infos = [{K, V} || {K, V} <- infos(?STATISTICS_KEYS, State), + not lists:member(K, ExtraKs)], + rabbit_event:notify(queue_stats, Extra ++ Infos). emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args) -> rabbit_event:notify(consumer_created, @@ -1537,7 +1577,8 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, BQS3 = BQ:handle_pre_hibernate(BQS2), rabbit_event:if_enabled( State, #q.stats_timer, - fun () -> emit_stats(State, [{idle_since, now()}]) end), + fun () -> emit_stats(State, [{idle_since, now()}, + {consumer_utilisation, ''}]) end), State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3}, #q.stats_timer), {hibernate, stop_rate_timer(State1)}. diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index dc6d090f..088ad0e5 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -20,6 +20,7 @@ -export([parse_table/1]). -export([ensure_content_decoded/1, clear_decoded_content/1]). +-export([validate_utf8/1, assert_utf8/1]). %%---------------------------------------------------------------------------- @@ -30,6 +31,8 @@ (rabbit_types:content()) -> rabbit_types:decoded_content()). -spec(clear_decoded_content/1 :: (rabbit_types:content()) -> rabbit_types:undecoded_content()). +-spec(validate_utf8/1 :: (binary()) -> 'ok' | 'error'). +-spec(assert_utf8/1 :: (binary()) -> 'ok'). -endif. @@ -99,3 +102,18 @@ clear_decoded_content(Content = #content{properties_bin = none}) -> Content; clear_decoded_content(Content = #content{}) -> Content#content{properties = none}. + +assert_utf8(B) -> + case validate_utf8(B) of + ok -> ok; + error -> rabbit_misc:protocol_error( + frame_error, "Malformed UTF-8 in shortstr", []) + end. + +validate_utf8(Bin) -> + try + xmerl_ucs:from_utf8(Bin), + ok + catch exit:{ucs, _} -> + error + end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a3a0c754..4d778f94 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -53,7 +53,8 @@ messages_uncommitted, acks_uncommitted, prefetch_count, - client_flow_blocked]). + client_flow_blocked, + state]). -define(CREATION_EVENT_KEYS, [pid, @@ -550,6 +551,14 @@ check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) -> check_not_default_exchange(_) -> ok. +check_exchange_deletion(XName = #resource{name = <<"amq.rabbitmq.", _/binary>>, + kind = exchange}) -> + rabbit_misc:protocol_error( + access_refused, "deletion of system ~s not allowed", + [rabbit_misc:rs(XName)]); +check_exchange_deletion(_) -> + ok. + %% check that an exchange/queue name does not contain the reserved %% "amq." prefix. %% @@ -592,7 +601,11 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> record_confirms(MXs, State#ch{unconfirmed = UC1}). handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> - {reply, #'channel.open_ok'{}, State#ch{state = running}}; + %% Don't leave "starting" as the state for 5s. TODO is this TRTTD? + State1 = State#ch{state = running}, + rabbit_event:if_enabled(State1, #ch.stats_timer, + fun() -> emit_stats(State1) end), + {reply, #'channel.open_ok'{}, State1}; handle_method(#'channel.open'{}, _, _State) -> rabbit_misc:protocol_error( @@ -933,6 +946,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, _, State = #ch{virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_not_default_exchange(ExchangeName), + check_exchange_deletion(ExchangeName), check_configure_permitted(ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of {error, not_found} -> @@ -1615,6 +1629,8 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs); i(messages_uncommitted, #ch{}) -> 0; i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; +i(state, #ch{state = running}) -> credit_flow:state(); +i(state, #ch{state = State}) -> State; i(prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_prefetch_limit(Limiter); i(client_flow_blocked, #ch{limiter = Limiter}) -> diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 6f36f99d..f3463286 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -706,7 +706,14 @@ unsafe_rpc(Node, Mod, Fun, Args) -> end. call(Node, {Mod, Fun, Args}) -> - rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary/1, Args)). + rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)). + +list_to_binary_utf8(L) -> + B = list_to_binary(L), + case rabbit_binary_parser:validate_utf8(B) of + ok -> B; + error -> throw({error, {not_utf_8, L}}) + end. rpc_call(Node, Mod, Fun, Args) -> rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT). diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 17ed8563..ab8c62fe 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -51,7 +51,7 @@ stop() -> init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), - topic, true, false, false, []), + topic, true, false, true, []), {ok, #resource{virtual_host = DefaultVHost, kind = exchange, name = ?LOG_EXCH_NAME}}. diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 10e68198..488f1df5 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -201,7 +201,7 @@ init([]) -> %% writing out the cluster status files - bad things can then %% happen. process_flag(trap_exit, true), - net_kernel:monitor_nodes(true), + net_kernel:monitor_nodes(true, [nodedown_reason]), {ok, _} = mnesia:subscribe(system), {ok, #state{monitors = pmon:new(), subscribers = pmon:new(), @@ -267,7 +267,9 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #state{subscribers = Subscribers}) -> {noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}}; -handle_info({nodedown, Node}, State) -> +handle_info({nodedown, Node, Info}, State) -> + rabbit_log:info("node ~p down: ~p~n", + [Node, proplists:get_value(nodedown_reason, Info)]), ok = handle_dead_node(Node), {noreply, State}; diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 32b52e6e..0aa44d9f 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -32,15 +32,17 @@ -define(CLOSING_TIMEOUT, 30). -define(CHANNEL_TERMINATION_TIMEOUT, 3). -define(SILENT_CLOSE_DELAY, 3). +-define(CHANNEL_MIN, 1). %%-------------------------------------------------------------------------- -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, helper_sup, queue_collector, heartbeater, - stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}). + stats_timer, channel_sup_sup_pid, buf, buf_len, channel_count, + throttle}). -record(connection, {name, host, peer_host, port, peer_port, - protocol, user, timeout_sec, frame_max, vhost, + protocol, user, timeout_sec, frame_max, channel_max, vhost, client_properties, capabilities, auth_mechanism, auth_state}). @@ -48,15 +50,14 @@ blocked_sent}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, - send_pend, state, last_blocked_by, last_blocked_age, - channels]). + send_pend, state, channels]). -define(CREATION_EVENT_KEYS, [pid, name, port, peer_port, host, peer_host, ssl, peer_cert_subject, peer_cert_issuer, peer_cert_validity, auth_mechanism, ssl_protocol, ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost, - timeout, frame_max, client_properties]). + timeout, frame_max, channel_max, client_properties]). -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). @@ -240,6 +241,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> channel_sup_sup_pid = none, buf = [], buf_len = 0, + channel_count = 0, throttle = #throttle{ alarmed_by = [], last_blocked_by = none, @@ -328,8 +330,8 @@ handle_other({conserve_resources, Source, Conserve}, control_throttle(State#v1{throttle = Throttle1}); handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), - channel_cleanup(ChPid), - maybe_close(control_throttle(State)); + {_, State1} = channel_cleanup(ChPid, State), + maybe_close(control_throttle(State1)); handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), @@ -486,63 +488,59 @@ close_connection(State = #v1{queue_collector = Collector, State#v1{connection_state = closed}. handle_dependent_exit(ChPid, Reason, State) -> - case {channel_cleanup(ChPid), termination_kind(Reason)} of - {undefined, controlled} -> State; + {Channel, State1} = channel_cleanup(ChPid, State), + case {Channel, termination_kind(Reason)} of + {undefined, controlled} -> State1; {undefined, uncontrolled} -> exit({abnormal_dependent_exit, ChPid, Reason}); - {_Channel, controlled} -> maybe_close(control_throttle(State)); - {Channel, uncontrolled} -> State1 = handle_exception( - State, Channel, Reason), - maybe_close(control_throttle(State1)) + {_, controlled} -> maybe_close(control_throttle(State1)); + {_, uncontrolled} -> State2 = handle_exception( + State1, Channel, Reason), + maybe_close(control_throttle(State2)) end. -terminate_channels() -> - NChannels = - length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]), - if NChannels > 0 -> - Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, - TimerRef = erlang:send_after(Timeout, self(), cancel_wait), - wait_for_channel_termination(NChannels, TimerRef); - true -> ok - end. +terminate_channels(#v1{channel_count = 0} = State) -> + State; +terminate_channels(#v1{channel_count = ChannelCount} = State) -> + lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), + Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * ChannelCount, + TimerRef = erlang:send_after(Timeout, self(), cancel_wait), + wait_for_channel_termination(ChannelCount, TimerRef, State). -wait_for_channel_termination(0, TimerRef) -> +wait_for_channel_termination(0, TimerRef, State) -> case erlang:cancel_timer(TimerRef) of false -> receive - cancel_wait -> ok + cancel_wait -> State end; - _ -> ok + _ -> State end; - -wait_for_channel_termination(N, TimerRef) -> +wait_for_channel_termination(N, TimerRef, State) -> receive {'DOWN', _MRef, process, ChPid, Reason} -> - case {channel_cleanup(ChPid), termination_kind(Reason)} of - {undefined, _} -> - exit({abnormal_dependent_exit, ChPid, Reason}); - {_Channel, controlled} -> - wait_for_channel_termination(N-1, TimerRef); - {Channel, uncontrolled} -> - log(error, - "AMQP connection ~p, channel ~p - " - "error while terminating:~n~p~n", - [self(), Channel, Reason]), - wait_for_channel_termination(N-1, TimerRef) + {Channel, State1} = channel_cleanup(ChPid, State), + case {Channel, termination_kind(Reason)} of + {undefined, _} -> exit({abnormal_dependent_exit, + ChPid, Reason}); + {_, controlled} -> wait_for_channel_termination( + N-1, TimerRef, State1); + {_, uncontrolled} -> log(error, + "AMQP connection ~p, channel ~p - " + "error while terminating:~n~p~n", + [self(), Channel, Reason]), + wait_for_channel_termination( + N-1, TimerRef, State1) end; cancel_wait -> exit(channel_termination_timeout) end. maybe_close(State = #v1{connection_state = closing, - connection = #connection{protocol = Protocol}, - sock = Sock}) -> - case all_channels() of - [] -> - NewState = close_connection(State), - ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), - NewState; - _ -> State - end; + channel_count = 0, + connection = #connection{protocol = Protocol}, + sock = Sock}) -> + NewState = close_connection(State), + ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), + NewState; maybe_close(State) -> State. @@ -561,8 +559,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol}, [self(), CS, Channel, Reason]), {0, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), - terminate_channels(), - State1 = close_connection(State), + State1 = close_connection(terminate_channels(State)), ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol), State1; handle_exception(State, Channel, Reason) -> @@ -600,15 +597,26 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) -> %%-------------------------------------------------------------------------- -create_channel(Channel, State) -> - #v1{sock = Sock, queue_collector = Collector, - channel_sup_sup_pid = ChanSupSup, - connection = #connection{name = Name, - protocol = Protocol, - frame_max = FrameMax, - user = User, - vhost = VHost, - capabilities = Capabilities}} = State, +create_channel(_Channel, + #v1{channel_count = ChannelCount, + connection = #connection{channel_max = ChannelMax}}) + when ChannelMax /= 0 andalso ChannelCount >= ChannelMax -> + {error, rabbit_misc:amqp_error( + not_allowed, "number of channels opened (~w) has reached the " + "negotiated channel_max (~w)", + [ChannelCount, ChannelMax], 'none')}; +create_channel(Channel, + #v1{sock = Sock, + queue_collector = Collector, + channel_sup_sup_pid = ChanSupSup, + channel_count = ChannelCount, + connection = + #connection{name = Name, + protocol = Protocol, + frame_max = FrameMax, + user = User, + vhost = VHost, + capabilities = Capabilities}} = State) -> {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, @@ -616,16 +624,16 @@ create_channel(Channel, State) -> MRef = erlang:monitor(process, ChPid), put({ch_pid, ChPid}, {Channel, MRef}), put({channel, Channel}, {ChPid, AState}), - {ChPid, AState}. + {ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}}. -channel_cleanup(ChPid) -> +channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) -> case get({ch_pid, ChPid}) of - undefined -> undefined; + undefined -> {undefined, State}; {Channel, MRef} -> credit_flow:peer_down(ChPid), erase({channel, Channel}), erase({ch_pid, ChPid}), erlang:demonitor(MRef, [flush]), - Channel + {Channel, State#v1{channel_count = ChannelCount - 1}} end. all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. @@ -664,32 +672,36 @@ handle_frame(Type, Channel, Payload, State) -> process_frame(Frame, Channel, State) -> ChKey = {channel, Channel}, - {ChPid, AState} = case get(ChKey) of - undefined -> create_channel(Channel, State); - Other -> Other - end, - case rabbit_command_assembler:process(Frame, AState) of - {ok, NewAState} -> - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {ok, Method, NewAState} -> - rabbit_channel:do(ChPid, Method), - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {ok, Method, Content, NewAState} -> - rabbit_channel:do_flow(ChPid, Method, Content), - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, control_throttle(State)); - {error, Reason} -> - handle_exception(State, Channel, Reason) + case (case get(ChKey) of + undefined -> create_channel(Channel, State); + Other -> {ok, Other, State} + end) of + {error, Error} -> + handle_exception(State, Channel, Error); + {ok, {ChPid, AState}, State1} -> + case rabbit_command_assembler:process(Frame, AState) of + {ok, NewAState} -> + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State1); + {ok, Method, NewAState} -> + rabbit_channel:do(ChPid, Method), + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State1); + {ok, Method, Content, NewAState} -> + rabbit_channel:do_flow(ChPid, Method, Content), + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, control_throttle(State1)); + {error, Reason} -> + handle_exception(State1, Channel, Reason) + end end. post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> - channel_cleanup(ChPid), + {_, State1} = channel_cleanup(ChPid, State), %% This is not strictly necessary, but more obviously %% correct. Also note that we do not need to call maybe_close/1 %% since we cannot possibly be in the 'closing' state. - control_throttle(State); + control_throttle(State1); post_process_frame({content_header, _, _, _, _}, _ChPid, State) -> maybe_block(State); post_process_frame({content_body, _}, _ChPid, State) -> @@ -843,38 +855,33 @@ handle_method0(#'connection.secure_ok'{response = Response}, State = #v1{connection_state = securing}) -> auth_phase(Response, State); -handle_method0(#'connection.tune_ok'{frame_max = FrameMax, - heartbeat = ClientHeartbeat}, +handle_method0(#'connection.tune_ok'{frame_max = FrameMax, + channel_max = ChannelMax, + heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, connection = Connection, helper_sup = SupPid, sock = Sock}) -> - ServerFrameMax = server_frame_max(), - if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE -> - rabbit_misc:protocol_error( - not_allowed, "frame_max=~w < ~w min size", - [FrameMax, ?FRAME_MIN_SIZE]); - ServerFrameMax /= 0 andalso FrameMax > ServerFrameMax -> - rabbit_misc:protocol_error( - not_allowed, "frame_max=~w > ~w max size", - [FrameMax, ServerFrameMax]); - true -> - {ok, Collector} = - rabbit_connection_helper_sup:start_queue_collector(SupPid), - Frame = rabbit_binary_generator:build_heartbeat_frame(), - SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, - Parent = self(), - ReceiveFun = fun() -> Parent ! heartbeat_timeout end, - Heartbeater = - rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat, - SendFun, ClientHeartbeat, ReceiveFun), - State#v1{connection_state = opening, - connection = Connection#connection{ - timeout_sec = ClientHeartbeat, - frame_max = FrameMax}, - queue_collector = Collector, - heartbeater = Heartbeater} - end; + ok = validate_negotiated_integer_value( + frame_max, ?FRAME_MIN_SIZE, FrameMax), + ok = validate_negotiated_integer_value( + channel_max, ?CHANNEL_MIN, ChannelMax), + {ok, Collector} = + rabbit_connection_helper_sup:start_queue_collector(SupPid), + Frame = rabbit_binary_generator:build_heartbeat_frame(), + SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, + Parent = self(), + ReceiveFun = fun() -> Parent ! heartbeat_timeout end, + Heartbeater = + rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat, + SendFun, ClientHeartbeat, ReceiveFun), + State#v1{connection_state = opening, + connection = Connection#connection{ + frame_max = FrameMax, + channel_max = ChannelMax, + timeout_sec = ClientHeartbeat}, + queue_collector = Collector, + heartbeater = Heartbeater}; handle_method0(#'connection.open'{virtual_host = VHostPath}, State = #v1{connection_state = opening, @@ -922,13 +929,28 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -server_frame_max() -> - {ok, FrameMax} = application:get_env(rabbit, frame_max), - FrameMax. +validate_negotiated_integer_value(Field, Min, ClientValue) -> + ServerValue = get_env(Field), + if ClientValue /= 0 andalso ClientValue < Min -> + fail_negotiation(Field, min, ServerValue, ClientValue); + ServerValue /= 0 andalso ClientValue > ServerValue -> + fail_negotiation(Field, max, ServerValue, ClientValue); + true -> + ok + end. + +fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) -> + {S1, S2} = case MinOrMax of + min -> {lower, minimum}; + max -> {higher, maximum} + end, + rabbit_misc:protocol_error( + not_allowed, "negotiated ~w = ~w is ~w than the ~w allowed value (~w)", + [Field, ClientValue, S1, S2, ServerValue], 'connection.tune'). -server_heartbeat() -> - {ok, Heartbeat} = application:get_env(rabbit, heartbeat), - Heartbeat. +get_env(Key) -> + {ok, Value} = application:get_env(rabbit, Key), + Value. send_on_channel0(Sock, Method, Protocol) -> ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). @@ -994,9 +1016,9 @@ auth_phase(Response, State#v1{connection = Connection#connection{ auth_state = AuthState1}}; {ok, User} -> - Tune = #'connection.tune'{channel_max = 0, - frame_max = server_frame_max(), - heartbeat = server_heartbeat()}, + Tune = #'connection.tune'{frame_max = get_env(frame_max), + channel_max = get_env(channel_max), + heartbeat = get_env(heartbeat)}, ok = send_on_channel0(Sock, Tune, Protocol), State#v1{connection_state = tuning, connection = Connection#connection{user = User, @@ -1023,13 +1045,15 @@ i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S); i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S); i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S); i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S); -i(state, #v1{connection_state = CS}) -> CS; -i(last_blocked_by, #v1{throttle = #throttle{last_blocked_by = By}}) -> By; -i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) -> - infinity; -i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) -> - timer:now_diff(erlang:now(), T) / 1000000; -i(channels, #v1{}) -> length(all_channels()); +i(channels, #v1{channel_count = ChannelCount}) -> ChannelCount; +i(state, #v1{connection_state = ConnectionState, + throttle = #throttle{last_blocked_by = BlockedBy, + last_blocked_at = T}}) -> + Recent = T =/= never andalso timer:now_diff(erlang:now(), T) < 5000000, + case {BlockedBy, Recent} of + {flow, true} -> flow; + {_, _} -> ConnectionState + end; i(Item, #v1{connection = Conn}) -> ic(Item, Conn). ic(name, #connection{name = Name}) -> Name; @@ -1044,6 +1068,7 @@ ic(user, #connection{user = U}) -> U#user.username; ic(vhost, #connection{vhost = VHost}) -> VHost; ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout; ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax; +ic(channel_max, #connection{channel_max = ChMax}) -> ChMax; ic(client_properties, #connection{client_properties = CP}) -> CP; ic(auth_mechanism, #connection{auth_mechanism = none}) -> none; ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name; @@ -1084,10 +1109,8 @@ emit_stats(State) -> %% If we emit an event which looks like we are in flow control, it's not a %% good idea for it to be our last even if we go idle. Keep emitting %% events, either we stay busy or we drop out of flow control. - %% The 5 is to match the test in formatters.js:fmt_connection_state(). - %% This magic number will go away when bug 24829 is merged. - case proplists:get_value(last_blocked_age, Infos) < 5 of - true -> ensure_stats_timer(State1); + case proplists:get_value(state, Infos) of + flow -> ensure_stats_timer(State1); _ -> State1 end. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 6f95ef60..90372461 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -46,6 +46,7 @@ -rabbit_upgrade({exchange_decorators, mnesia, [policy]}). -rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}). -rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}). +-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}). %% ------------------------------------------------------------------- @@ -74,6 +75,7 @@ -spec(exchange_decorators/0 :: () -> 'ok'). -spec(policy_apply_to/0 :: () -> 'ok'). -spec(queue_decorators/0 :: () -> 'ok'). +-spec(internal_system_x/0 :: () -> 'ok'). -endif. @@ -340,6 +342,19 @@ queue_decorators(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, sync_slave_pids, policy, gm_pids, decorators]). +internal_system_x() -> + transform( + rabbit_durable_exchange, + fun ({exchange, Name = {resource, _, _, <<"amq.rabbitmq.", _/binary>>}, + Type, Dur, AutoDel, _Int, Args, Scratches, Policy, Decorators}) -> + {exchange, Name, Type, Dur, AutoDel, true, Args, Scratches, + Policy, Decorators}; + (X) -> + X + end, + [name, type, durable, auto_delete, internal, arguments, scratches, policy, + decorators]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 8d013d43..047bce77 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -60,15 +60,17 @@ add(VHostPath) -> (ok, false) -> [rabbit_exchange:declare( rabbit_misc:r(VHostPath, exchange, Name), - Type, true, false, false, []) || - {Name,Type} <- - [{<<"">>, direct}, - {<<"amq.direct">>, direct}, - {<<"amq.topic">>, topic}, - {<<"amq.match">>, headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, headers}, %% per 0-9-1 xml - {<<"amq.fanout">>, fanout}, - {<<"amq.rabbitmq.trace">>, topic}]], + Type, true, false, Internal, []) || + {Name, Type, Internal} <- + [{<<"">>, direct, false}, + {<<"amq.direct">>, direct, false}, + {<<"amq.topic">>, topic, false}, + %% per 0-9-1 pdf + {<<"amq.match">>, headers, false}, + %% per 0-9-1 xml + {<<"amq.headers">>, headers, false}, + {<<"amq.fanout">>, fanout, false}, + {<<"amq.rabbitmq.trace">>, topic, true}]], ok end), rabbit_event:notify(vhost_created, info(VHostPath)), |