diff options
author | Alvaro Videla <alvaro@rabbitmq.com> | 2013-12-11 15:38:57 +0000 |
---|---|---|
committer | Alvaro Videla <alvaro@rabbitmq.com> | 2013-12-11 15:38:57 +0000 |
commit | a76807d6df02b23b20148f4582b61af4f083462b (patch) | |
tree | f6af43bccc91333ca71f0663f94cd68e14261015 | |
parent | ddbf072e71babfda40d2a980d5eb460bf9863301 (diff) | |
parent | 6392ac1f070bf039b9eb37a49decff3230794ec6 (diff) | |
download | rabbitmq-server-a76807d6df02b23b20148f4582b61af4f083462b.tar.gz |
merge default into bug25817
42 files changed, 705 insertions, 384 deletions
@@ -4,6 +4,7 @@ syntax: glob *.swp *.patch *.orig +*.tmp erl_crash.dump deps.mk @@ -1 +1 @@ -Please see http://www.rabbitmq.com/build-server.html for build instructions. +Please see http://www.rabbitmq.com/build-server.html for build instructions.
\ No newline at end of file @@ -133,16 +133,20 @@ process_analysis(Query, Tag, Severity, Analysis) when is_list(Query) -> checks() -> [{"(XXL)(Lin) ((XC - UC) || (XU - X - B))", "has call to undefined function(s)", - error, filters()}, - {"(Lin) (L - LU)", "has unused local function(s)", - error, filters()}, + error, filters()}, + {"(Lin) (L - LU)", + "has unused local function(s)", + error, filters()}, + {"(E | \"(rabbit|amqp).*\":_/_ || \"gen_server2?\":call/2)", + "has 5 sec timeout in", + error, filters()}, {"(Lin) (LU * (X - XU))", - "has exported function(s) only used locally", - warning, filters()}, + "has exported function(s) only used locally", + warning, filters()}, {"(Lin) (DF * (XU + LU))", "used deprecated function(s)", - warning, filters()}]. -% {"(Lin) (X - XU)", "possibly unused export", -% warning, fun filter_unused/1}]. + warning, filters()}]. +%% {"(Lin) (X - XU)", "possibly unused export", +%% warning, fun filter_unused/1}]. %% %% noise filters (can be disabled with -X) - strip uninteresting analyses @@ -187,7 +187,7 @@ def genErl(spec): elif type == 'table': return p+'Len:32/unsigned, '+p+'Tab:'+p+'Len/binary' - def genFieldPostprocessing(packed): + def genFieldPostprocessing(packed, hasContent): for f in packed: type = erlType(f.domain) if type == 'bit': @@ -199,6 +199,10 @@ def genErl(spec): elif type == 'table': print " F%d = rabbit_binary_parser:parse_table(F%dTab)," % \ (f.index, f.index) + # We skip the check on content-bearing methods for + # speed. This is a sanity check, not a security thing. + elif type == 'shortstr' and not hasContent: + print " rabbit_binary_parser:assert_utf8(F%d)," % (f.index) else: pass @@ -214,7 +218,7 @@ def genErl(spec): restSeparator = '' recordConstructorExpr = '#%s{%s}' % (m.erlangName(), fieldMapList(m.arguments)) print "decode_method_fields(%s, <<%s>>) ->" % (m.erlangName(), binaryPattern) - genFieldPostprocessing(packedFields) + genFieldPostprocessing(packedFields, m.hasContent) print " %s;" % (recordConstructorExpr,) def genDecodeProperties(c): diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index 00aef8d9..c0d6cc70 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -138,6 +138,11 @@ %% %% {frame_max, 131072}, + %% Set the max permissible number of channels per connection. + %% 0 means "no limit". + %% + %% {channel_max, 128}, + %% Customising Socket Options. %% %% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for @@ -169,7 +174,7 @@ %% lower bound, a disk alarm will be set - see the documentation %% listed above for more details. %% - %% {disk_free_limit, 1000000000}, + %% {disk_free_limit, 50000000}, %% Alternatively, we can set a limit relative to total available RAM. %% diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index d7c93924..d2a3f7c7 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1135,25 +1135,11 @@ <listitem><para>Number of consumers.</para></listitem> </varlistentry> <varlistentry> - <term>active_consumers</term> - <listitem> - <para> - Number of active consumers. An active consumer is - one which could immediately receive any messages - sent to the queue - i.e. it is not limited by its - prefetch count, TCP congestion, flow control, or - because it has issued channel.flow. At least one - of messages_ready and active_consumers must always - be zero. - </para> - <para> - Note that this value is an instantaneous snapshot - - when consumers are restricted by their prefetch - count they may only appear to be active for small - fractions of a second until more messages are sent - out. - </para> - </listitem> + <term>consumer_utilisation</term> + <listitem><para>Fraction of the time (between 0.0 and 1.0) + that the queue is able to immediately deliver messages to + consumers. This can be less than 1.0 if consumers are limited + by network congestion or prefetch count.</para></listitem> </varlistentry> <varlistentry> <term>memory</term> @@ -1411,24 +1397,9 @@ </varlistentry> <varlistentry> - <term>last_blocked_by</term> - <listitem><para>The reason for which this connection - was last blocked. One of 'resource' - due to a memory - or disk alarm, 'flow' - due to internal flow control, or - 'none' if the connection was never - blocked.</para></listitem> - </varlistentry> - <varlistentry> - <term>last_blocked_age</term> - <listitem><para>Time, in seconds, since this - connection was last blocked, or - 'infinity'.</para></listitem> - </varlistentry> - - <varlistentry> <term>state</term> <listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>, - <command>opening</command>, <command>running</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem> + <command>opening</command>, <command>running</command>, <command>flow</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem> </varlistentry> <varlistentry> <term>channels</term> @@ -1459,6 +1430,10 @@ <listitem><para>Maximum frame size (bytes).</para></listitem> </varlistentry> <varlistentry> + <term>channel_max</term> + <listitem><para>Maximum number of channels on this connection.</para></listitem> + </varlistentry> + <varlistentry> <term>client_properties</term> <listitem><para>Informational properties transmitted by the client during connection establishment.</para></listitem> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 6ee0115b..29f06e79 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -1,4 +1,4 @@ -{application, rabbit, %% -*- erlang -*- +{application, rabbit, %% -*- erlang -*- [{description, "RabbitMQ"}, {id, "RabbitMQ"}, {vsn, "%%VSN%%"}, @@ -25,7 +25,8 @@ %% 0 ("no limit") would make a better default, but that %% breaks the QPid Java client {frame_max, 131072}, - {heartbeat, 600}, + {channel_max, 0}, + {heartbeat, 580}, {msg_store_file_size_limit, 16777216}, {queue_index_max_journal_entries, 65536}, {default_user, <<"guest">>}, @@ -52,6 +53,7 @@ {nodelay, true}, {linger, {true, 0}}, {exit_on_close, false}]}, + {halt_on_upgrade_failure, true}, {hipe_compile, false}, %% see bug 24513 for how this list was created {hipe_modules, diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index d1e3e380..05140e3c 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -10,10 +10,11 @@ Source1: rabbitmq-server.init Source2: rabbitmq-script-wrapper Source3: rabbitmq-server.logrotate Source4: rabbitmq-server.ocf +Source5: README URL: http://www.rabbitmq.com/ BuildArch: noarch -BuildRequires: erlang >= R13B03, python-simplejson, xmlto, libxslt -Requires: erlang >= R13B03, logrotate +BuildRequires: erlang >= R13B-03, python-simplejson, xmlto, libxslt +Requires: erlang >= R13B-03, logrotate BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root Summary: The RabbitMQ server Requires(post): %%REQUIRES%% @@ -41,6 +42,7 @@ scalable implementation of an AMQP broker. %build cp %{S:2} %{_rabbit_wrapper} cp %{S:4} %{_rabbit_server_ocf} +cp %{S:5} %{_builddir}/rabbitmq-server-%{version}/README make %{?_smp_mflags} %install @@ -121,12 +123,16 @@ done %{_initrddir}/rabbitmq-server %config(noreplace) %{_sysconfdir}/logrotate.d/rabbitmq-server %doc LICENSE* +%doc README %doc docs/rabbitmq.config.example %clean rm -rf %{buildroot} %changelog +* Wed Oct 23 2013 emile@rabbitmq.com 3.2.0-1 +- New Upstream Release + * Thu Aug 15 2013 simon@rabbitmq.com 3.1.5-1 - New Upstream Release diff --git a/packaging/common/README b/packaging/common/README new file mode 100644 index 00000000..0a29ee27 --- /dev/null +++ b/packaging/common/README @@ -0,0 +1,20 @@ +This is rabbitmq-server, a message broker implementing AMQP, STOMP and MQTT. + +Most of the documentation for RabbitMQ is provided on the RabbitMQ web +site. You can see documentation for the current version at: + +http://www.rabbitmq.com/documentation.html + +and for previous versions at: + +http://www.rabbitmq.com/previous.html + +Man pages are installed with this package. Of particular interest are +rabbitmqctl(1), to interact with a running RabbitMQ server, and +rabbitmq-plugins(1), to enable and disable plugins. These should be +run as the superuser. + +An example configuration file is provided in the same directory as +this README. Copy it to /etc/rabbitmq/rabbitmq.config to use it. The +RabbitMQ server must be restarted after changing the configuration +file or enabling or disabling plugins. diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 3212514e..6c63478f 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (3.2.0-1) unstable; urgency=low + + * New Upstream Release + + -- Emile Joubert <emile@rabbitmq.com> Wed, 23 Oct 2013 12:44:10 +0100 + rabbitmq-server (3.1.5-1) unstable; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index a1498979..b3c96069 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -9,6 +9,7 @@ RABBIT_BIN=$(DEB_DESTDIR)usr/lib/rabbitmq/bin/ DOCDIR=$(DEB_DESTDIR)usr/share/doc/rabbitmq-server/ DEB_MAKE_INSTALL_TARGET := install TARGET_DIR=$(RABBIT_LIB) SBIN_DIR=$(RABBIT_BIN) DOC_INSTALL_DIR=$(DOCDIR) MAN_DIR=$(DEB_DESTDIR)usr/share/man/ DEB_MAKE_CLEAN_TARGET:= distclean +DEB_INSTALL_DOCS_ALL=debian/README install/rabbitmq-server:: mkdir -p $(DOCDIR) diff --git a/src/credit_flow.erl b/src/credit_flow.erl index d48d649e..39a257ac 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -30,7 +30,7 @@ -define(DEFAULT_CREDIT, {200, 50}). --export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]). +-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0, state/0]). -export([peer_down/1]). %%---------------------------------------------------------------------------- @@ -110,6 +110,18 @@ blocked() -> case get(credit_blocked) of _ -> true end. +state() -> case blocked() of + true -> flow; + false -> case get(credit_blocked_at) of + undefined -> running; + B -> Diff = timer:now_diff(erlang:now(), B), + case Diff < 5000000 of + true -> flow; + false -> running + end + end + end. + peer_down(Peer) -> %% In theory we could also remove it from credit_deferred here, but it %% doesn't really matter; at some point later we will drain @@ -128,7 +140,12 @@ grant(To, Quantity) -> true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred]) end. -block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]). +block(From) -> + case blocked() of + false -> put(credit_blocked_at, erlang:now()); + true -> ok + end, + ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]). unblock(From) -> ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]), diff --git a/src/rabbit.erl b/src/rabbit.erl index 1b7fe6da..045c5d58 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -347,19 +347,25 @@ handle_app_error(App, Reason) -> start_it(StartFun) -> Marker = spawn_link(fun() -> receive stop -> ok end end), - register(rabbit_boot, Marker), - try - StartFun() - catch - throw:{could_not_start, _App, _Reason}=Err -> - boot_error(Err, not_available); - _:Reason -> - boot_error(Reason, erlang:get_stacktrace()) - after - unlink(Marker), - Marker ! stop, - %% give the error loggers some time to catch up - timer:sleep(100) + case catch register(rabbit_boot, Marker) of + true -> try + case is_running() of + true -> ok; + false -> StartFun() + end + catch + throw:{could_not_start, _App, _Reason}=Err -> + boot_error(Err, not_available); + _:Reason -> + boot_error(Reason, erlang:get_stacktrace()) + after + unlink(Marker), + Marker ! stop, + %% give the error loggers some time to catch up + timer:sleep(100) + end; + _ -> unlink(Marker), + Marker ! stop end. stop() -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8a84c9f4..8306f134 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -466,10 +466,16 @@ check_dlxrk_arg({Type, _}, _Args) -> list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). +%% Not dirty_match_object since that would not be transactional when used in a +%% tx context list(VHostPath) -> - mnesia:dirty_match_object( - rabbit_queue, - #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}). + mnesia:async_dirty( + fun () -> + mnesia:match_object( + rabbit_queue, + #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}, + read) + end). info_keys() -> rabbit_amqqueue_process:info_keys(). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3166c4ab..7002fd36 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,6 +39,7 @@ backing_queue, backing_queue_state, active_consumers, + consumer_use, expires, sync_timer_ref, rate_timer_ref, @@ -95,11 +96,12 @@ messages_unacknowledged, messages, consumers, + consumer_utilisation, memory, slave_pids, synchronised_slave_pids, backing_queue_status, - status + state ]). -define(CREATION_EVENT_KEYS, @@ -149,6 +151,7 @@ init_state(Q) -> exclusive_consumer = none, has_had_consumers = false, active_consumers = priority_queue:new(), + consumer_use = {inactive, now_micros(), 0, 0.0}, senders = pmon:new(delegate), msg_id_to_channel = gb_trees:empty(), status = running, @@ -267,15 +270,16 @@ recovery_barrier(BarrierPid) -> process_args_policy(State = #q{q = Q, args_policy_version = N}) -> - lists:foldl( - fun({Name, Resolve, Fun}, StateN) -> - Fun(args_policy_lookup(Name, Resolve, Q), StateN) - end, State#q{args_policy_version = N + 1}, - [{<<"expires">>, fun res_min/2, fun init_exp/2}, - {<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2}, - {<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2}, - {<<"message-ttl">>, fun res_min/2, fun init_ttl/2}, - {<<"max-length">>, fun res_min/2, fun init_max_length/2}]). + ArgsTable = + [{<<"expires">>, fun res_min/2, fun init_exp/2}, + {<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2}, + {<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2}, + {<<"message-ttl">>, fun res_min/2, fun init_ttl/2}, + {<<"max-length">>, fun res_min/2, fun init_max_length/2}], + drop_expired_msgs( + lists:foldl(fun({Name, Resolve, Fun}, StateN) -> + Fun(args_policy_lookup(Name, Resolve, Q), StateN) + end, State#q{args_policy_version = N + 1}, ArgsTable)). args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) -> AName = <<"x-", Name/binary>>, @@ -297,8 +301,7 @@ init_exp(Expires, State) -> State1 = init_exp(undefined, State), ensure_expiry_timer(State1#q{expires = Expires}). init_ttl(undefined, State) -> stop_ttl_timer(State#q{ttl = undefined}); -init_ttl(TTL, State) -> State1 = init_ttl(undefined, State), - drop_expired_msgs(State1#q{ttl = TTL}). +init_ttl(TTL, State) -> (init_ttl(undefined, State))#q{ttl = TTL}. init_dlx(undefined, State) -> State#q{dlx = undefined}; @@ -482,10 +485,12 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; deliver_msgs_to_consumers(DeliverFun, false, - State = #q{active_consumers = ActiveConsumers}) -> + State = #q{active_consumers = ActiveConsumers, + consumer_use = CUInfo}) -> case priority_queue:out_p(ActiveConsumers) of {empty, _} -> - {false, State}; + {false, + State#q{consumer_use = update_consumer_use(CUInfo, inactive)}}; {{value, QEntry, Priority}, Tail} -> {Stop, State1} = deliver_msg_to_consumer( DeliverFun, QEntry, Priority, @@ -536,6 +541,26 @@ deliver_from_queue_deliver(AckRequired, State) -> {Result, State1} = fetch(AckRequired, State), {Result, is_empty(State1), State1}. +update_consumer_use({inactive, _, _, _} = CUInfo, inactive) -> + CUInfo; +update_consumer_use({active, _, _} = CUInfo, active) -> + CUInfo; +update_consumer_use({active, Since, Avg}, inactive) -> + Now = now_micros(), + {inactive, Now, Now - Since, Avg}; +update_consumer_use({inactive, Since, Active, Avg}, active) -> + Now = now_micros(), + {active, Now, consumer_use_avg(Active, Now - Since, Avg)}. + +consumer_use_avg(Active, Inactive, Avg) -> + Time = Inactive + Active, + Ratio = Active / Time, + Weight = erlang:min(1, Time / 1000000), + case Avg of + undefined -> Ratio; + _ -> Ratio * Weight + Avg * (1 - Weight) + end. + confirm_messages([], State) -> State; confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> @@ -713,7 +738,7 @@ possibly_unblock(State, ChPid, Update) -> end end. -unblock(State, C = #cr{limiter = Limiter}) -> +unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) -> case lists:partition( fun({_P, {_ChPid, #consumer{tag = CTag}}}) -> rabbit_limiter:is_consumer_blocked(Limiter, CTag) @@ -725,12 +750,14 @@ unblock(State, C = #cr{limiter = Limiter}) -> BlockedQ = priority_queue:from_list(Blocked), UnblockedQ = priority_queue:from_list(Unblocked), update_ch_record(C#cr{blocked_consumers = BlockedQ}), - AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ), - State1 = State#q{active_consumers = AC1}, + State1 = State#q{consumer_use = + update_consumer_use(CUInfo, active)}, + AC1 = priority_queue:join(State1#q.active_consumers, UnblockedQ), + State2 = State1#q{active_consumers = AC1}, [notify_decorators( - consumer_unblocked, [{consumer_tag, CTag}], State1) || + consumer_unblocked, [{consumer_tag, CTag}], State2) || {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked], - run_message_queue(State1) + run_message_queue(State2) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; @@ -1037,6 +1064,16 @@ i(messages, State) -> messages_unacknowledged]]); i(consumers, _) -> consumer_count(); +i(consumer_utilisation, #q{consumer_use = ConsumerUse}) -> + case consumer_count() of + 0 -> ''; + _ -> case ConsumerUse of + {active, Since, Avg} -> + consumer_use_avg(now_micros() - Since, 0, Avg); + {inactive, Since, Active, Avg} -> + consumer_use_avg(Active, now_micros() - Since, Avg) + end + end; i(memory, _) -> {memory, M} = process_info(self(), memory), M; @@ -1054,8 +1091,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> false -> ''; true -> SSPids end; -i(status, #q{status = Status}) -> - Status; +i(state, #q{status = running}) -> credit_flow:state(); +i(state, #q{status = State}) -> State; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); i(Item, _) -> @@ -1076,7 +1113,10 @@ emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> - rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). + ExtraKs = [K || {K, _} <- Extra], + Infos = [{K, V} || {K, V} <- infos(?STATISTICS_KEYS, State), + not lists:member(K, ExtraKs)], + rabbit_event:notify(queue_stats, Extra ++ Infos). emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args) -> rabbit_event:notify(consumer_created, @@ -1537,7 +1577,8 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, BQS3 = BQ:handle_pre_hibernate(BQS2), rabbit_event:if_enabled( State, #q.stats_timer, - fun () -> emit_stats(State, [{idle_since, now()}]) end), + fun () -> emit_stats(State, [{idle_since, now()}, + {consumer_utilisation, ''}]) end), State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3}, #q.stats_timer), {hibernate, stop_rate_timer(State1)}. diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index dc6d090f..088ad0e5 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -20,6 +20,7 @@ -export([parse_table/1]). -export([ensure_content_decoded/1, clear_decoded_content/1]). +-export([validate_utf8/1, assert_utf8/1]). %%---------------------------------------------------------------------------- @@ -30,6 +31,8 @@ (rabbit_types:content()) -> rabbit_types:decoded_content()). -spec(clear_decoded_content/1 :: (rabbit_types:content()) -> rabbit_types:undecoded_content()). +-spec(validate_utf8/1 :: (binary()) -> 'ok' | 'error'). +-spec(assert_utf8/1 :: (binary()) -> 'ok'). -endif. @@ -99,3 +102,18 @@ clear_decoded_content(Content = #content{properties_bin = none}) -> Content; clear_decoded_content(Content = #content{}) -> Content#content{properties = none}. + +assert_utf8(B) -> + case validate_utf8(B) of + ok -> ok; + error -> rabbit_misc:protocol_error( + frame_error, "Malformed UTF-8 in shortstr", []) + end. + +validate_utf8(Bin) -> + try + xmerl_ucs:from_utf8(Bin), + ok + catch exit:{ucs, _} -> + error + end. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 98b427c7..11e6bd38 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -281,9 +281,10 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end). has_for_source(SrcName) -> Match = #route{binding = #binding{source = SrcName, _ = '_'}}, - %% we need to check for durable routes here too in case a bunch of - %% routes to durable queues have been removed temporarily as a - %% result of a node failure + %% we need to check for semi-durable routes (which subsumes + %% durable routes) here too in case a bunch of routes to durable + %% queues have been removed temporarily as a result of a node + %% failure contains(rabbit_route, Match) orelse contains(rabbit_semi_durable_route, Match). @@ -291,8 +292,9 @@ remove_for_source(SrcName) -> lock_route_tables(), Match = #route{binding = #binding{source = SrcName, _ = '_'}}, remove_routes( - lists:usort(mnesia:match_object(rabbit_route, Match, write) ++ - mnesia:match_object(rabbit_durable_route, Match, write))). + lists:usort( + mnesia:match_object(rabbit_route, Match, write) ++ + mnesia:match_object(rabbit_semi_durable_route, Match, write))). remove_for_destination(DstName) -> remove_for_destination(DstName, fun remove_routes/1). @@ -323,8 +325,6 @@ delete_object(Tab, Record, LockKind) -> [_] -> mnesia:delete_object(Tab, Record, LockKind) end. -sync_route(R, Fun) -> sync_route(R, true, true, Fun). - sync_route(Route, true, true, Fun) -> ok = Fun(rabbit_durable_route, Route, write), sync_route(Route, false, true, Fun); @@ -407,14 +407,17 @@ lock_route_tables() -> remove_routes(Routes) -> %% This partitioning allows us to suppress unnecessary delete %% operations on disk tables, which require an fsync. - {TransientRoutes, DurableRoutes} = + {RamRoutes, DiskRoutes} = lists:partition(fun (R) -> mnesia:match_object( rabbit_durable_route, R, write) == [] end, Routes), - [ok = sync_transient_route(R, fun mnesia:delete_object/3) || - R <- TransientRoutes], - [ok = sync_route(R, fun mnesia:delete_object/3) || - R <- DurableRoutes], + %% Of course the destination might not really be durable but it's + %% just as easy to try to delete it from the semi-durable table + %% than check first + [ok = sync_route(R, false, true, fun mnesia:delete_object/3) || + R <- RamRoutes], + [ok = sync_route(R, true, true, fun mnesia:delete_object/3) || + R <- DiskRoutes], [R#route.binding || R <- Routes]. remove_transient_routes(Routes) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ab977883..f71d85c4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -53,7 +53,8 @@ messages_uncommitted, acks_uncommitted, prefetch_count, - client_flow_blocked]). + client_flow_blocked, + state]). -define(CREATION_EVENT_KEYS, [pid, @@ -177,7 +178,8 @@ info_all(Items) -> refresh_config_local() -> rabbit_misc:upmap( - fun (C) -> gen_server2:call(C, refresh_config) end, list_local()), + fun (C) -> gen_server2:call(C, refresh_config, infinity) end, + list_local()), ok. ready_for_close(Pid) -> @@ -549,6 +551,14 @@ check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) -> check_not_default_exchange(_) -> ok. +check_exchange_deletion(XName = #resource{name = <<"amq.rabbitmq.", _/binary>>, + kind = exchange}) -> + rabbit_misc:protocol_error( + access_refused, "deletion of system ~s not allowed", + [rabbit_misc:rs(XName)]); +check_exchange_deletion(_) -> + ok. + %% check that an exchange/queue name does not contain the reserved %% "amq." prefix. %% @@ -591,7 +601,11 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> record_confirms(MXs, State#ch{unconfirmed = UC1}). handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> - {reply, #'channel.open_ok'{}, State#ch{state = running}}; + %% Don't leave "starting" as the state for 5s. TODO is this TRTTD? + State1 = State#ch{state = running}, + rabbit_event:if_enabled(State1, #ch.stats_timer, + fun() -> emit_stats(State1) end), + {reply, #'channel.open_ok'{}, State1}; handle_method(#'channel.open'{}, _, _State) -> rabbit_misc:protocol_error( @@ -934,6 +948,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, _, State = #ch{virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_not_default_exchange(ExchangeName), + check_exchange_deletion(ExchangeName), check_configure_permitted(ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of {error, not_found} -> @@ -1273,7 +1288,7 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid }) -> {OrigDName, ActualRoutingKey} = - expand_binding(DestinationType, DestinationNameBin, RoutingKey, State), + expand_binding(DestinationType, DestinationNameBin, RoutingKey, State), DestinationName = intercept_binding_method(OrigDName, DestinationType, ReturnMethod), check_write_permitted(DestinationName, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), @@ -1621,6 +1636,8 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs); i(messages_uncommitted, #ch{}) -> 0; i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; +i(state, #ch{state = running}) -> credit_flow:state(); +i(state, #ch{state = State}) -> State; i(prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_prefetch_limit(Limiter); i(client_flow_blocked, #ch{limiter = Limiter}) -> @@ -1682,10 +1699,10 @@ intercept_binding_method(OrigDName, _DestinationType, _Method) -> intercept_method(M, Q) -> case rabbit_channel_interceptor:run_filter_chain(M, Q, rabbit_channel_interceptor:select(Q, M)) of - {ok, QN} -> + {ok, QN} -> QN; {error, Reason} -> rabbit_misc:protocol_error( internal_error, "~s", [Reason]) - end.
\ No newline at end of file + end. diff --git a/src/rabbit_intermediate_sup.erl b/src/rabbit_connection_helper_sup.erl index a9381f20..e51615e8 100644 --- a/src/rabbit_intermediate_sup.erl +++ b/src/rabbit_connection_helper_sup.erl @@ -11,21 +11,27 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2013-2013 GoPivotal, Inc. All rights reserved. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. %% --module(rabbit_intermediate_sup). +-module(rabbit_connection_helper_sup). -behaviour(supervisor2). -export([start_link/0]). +-export([start_channel_sup_sup/1, + start_queue_collector/1]). -export([init/1]). +-include("rabbit.hrl"). + %%---------------------------------------------------------------------------- -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_channel_sup_sup/1 :: (pid()) -> rabbit_types:ok_pid_or_error()). +-spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()). -endif. %%---------------------------------------------------------------------------- @@ -33,7 +39,20 @@ start_link() -> supervisor2:start_link(?MODULE, []). +start_channel_sup_sup(SupPid) -> + supervisor2:start_child( + SupPid, + {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}). + +start_queue_collector(SupPid) -> + supervisor2:start_child( + SupPid, + {collector, {rabbit_queue_collector, start_link, []}, + intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}). + %%---------------------------------------------------------------------------- init([]) -> {ok, {{one_for_one, 10, 10}, []}}. + diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index c1fa17aa..9ed5dc77 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -37,27 +37,25 @@ start_link() -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), - {ok, Collector} = - supervisor2:start_child( - SupPid, - {collector, {rabbit_queue_collector, start_link, []}, - intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), %% We need to get channels in the hierarchy here so they get shut %% down after the reader, so the reader gets a chance to terminate %% them cleanly. But for 1.0 readers we can't start the real %% ch_sup_sup (because we don't know if we will be 0-9-1 or 1.0) - %% so we add another supervisor into the hierarchy. - {ok, ChannelSup3Pid} = + %% + %% This supervisor also acts as an intermediary for heartbeaters and + %% the queue collector process, since these must not be siblings of the + %% reader due to the potential for deadlock if they are added/restarted + %% whilst the supervision tree is shutting down. + {ok, HelperSup} = supervisor2:start_child( SupPid, - {channel_sup3, {rabbit_intermediate_sup, start_link, []}, - intrinsic, infinity, supervisor, [rabbit_intermediate_sup]}), + {helper_sup, {rabbit_connection_helper_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_connection_helper_sup]}), {ok, ReaderPid} = supervisor2:start_child( SupPid, - {reader, {rabbit_reader, start_link, - [ChannelSup3Pid, Collector, - rabbit_heartbeat:start_heartbeat_fun(SupPid)]}, + {reader, {rabbit_reader, start_link, [HelperSup]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 6f36f99d..f3463286 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -706,7 +706,14 @@ unsafe_rpc(Node, Mod, Fun, Args) -> end. call(Node, {Mod, Fun, Args}) -> - rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary/1, Args)). + rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)). + +list_to_binary_utf8(L) -> + B = list_to_binary(L), + case rabbit_binary_parser:validate_utf8(B) of + ok -> B; + error -> throw({error, {not_utf_8, L}}) + end. rpc_call(Node, Mod, Fun, Args) -> rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT). diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 17ed8563..ab8c62fe 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -51,7 +51,7 @@ stop() -> init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), - topic, true, false, false, []), + topic, true, false, true, []), {ok, #resource{virtual_host = DefaultVHost, kind = exchange, name = ?LOG_EXCH_NAME}}. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index fc131519..bb5b63e9 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -244,10 +244,16 @@ lookup_or_die(Name) -> {error, not_found} -> rabbit_misc:not_found(Name) end. +%% Not dirty_match_object since that would not be transactional when used in a +%% tx context list(VHostPath) -> - mnesia:dirty_match_object( - rabbit_exchange, - #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). + mnesia:async_dirty( + fun () -> + mnesia:match_object( + rabbit_exchange, + #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}, + read) + end). lookup_scratch(Name, App) -> case lookup(Name) of diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index 4cf314ca..1a766b05 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -181,8 +181,8 @@ with_synced_copy(Path, Modes, Fun) -> {ok, Hdl} -> try Result = Fun(Hdl), - ok = prim_file:rename(Bak, Path), ok = prim_file:sync(Hdl), + ok = prim_file:rename(Bak, Path), Result after prim_file:close(Hdl) diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index fac74edb..ca67254b 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -16,8 +16,9 @@ -module(rabbit_heartbeat). +-export([start/6]). -export([start_heartbeat_sender/3, start_heartbeat_receiver/3, - start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]). + pause_monitor/1, resume_monitor/1]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -28,16 +29,15 @@ -ifdef(use_specs). -export_type([heartbeaters/0]). --export_type([start_heartbeat_fun/0]). -type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}). -type(heartbeat_callback() :: fun (() -> any())). --type(start_heartbeat_fun() :: - fun((rabbit_net:socket(), non_neg_integer(), heartbeat_callback(), - non_neg_integer(), heartbeat_callback()) -> - no_return())). +-spec(start/6 :: + (pid(), rabbit_net:socket(), + non_neg_integer(), heartbeat_callback(), + non_neg_integer(), heartbeat_callback()) -> heartbeaters()). -spec(start_heartbeat_sender/3 :: (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> @@ -46,10 +46,6 @@ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> rabbit_types:ok(pid())). --spec(start_heartbeat_fun/1 :: - (pid()) -> start_heartbeat_fun()). - - -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). -spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -61,6 +57,17 @@ %%---------------------------------------------------------------------------- +start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> + {ok, Sender} = + start_heartbeater(SendTimeoutSec, SupPid, Sock, + SendFun, heartbeat_sender, + start_heartbeat_sender), + {ok, Receiver} = + start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, + ReceiveFun, heartbeat_receiver, + start_heartbeat_receiver), + {Sender, Receiver}. + start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case @@ -75,19 +82,6 @@ start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> ReceiveFun(), stop end}). -start_heartbeat_fun(SupPid) -> - fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> - {ok, Sender} = - start_heartbeater(SendTimeoutSec, SupPid, Sock, - SendFun, heartbeat_sender, - start_heartbeat_sender), - {ok, Receiver} = - start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, - ReceiveFun, heartbeat_receiver, - start_heartbeat_receiver), - {Sender, Receiver} - end. - pause_monitor({_Sender, none}) -> ok; pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 12a13c00..22da465b 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -197,24 +197,25 @@ start_link() -> gen_server2:start_link(?MODULE, [], []). new(Pid) -> %% this a 'call' to ensure that it is invoked at most once. - ok = gen_server:call(Pid, {new, self()}), + ok = gen_server:call(Pid, {new, self()}, infinity), #lstate{pid = Pid, prefetch_limited = false, blocked = false}. limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 -> - ok = gen_server:call(L#lstate.pid, - {limit_prefetch, PrefetchCount, UnackedCount}), + ok = gen_server:call( + L#lstate.pid, + {limit_prefetch, PrefetchCount, UnackedCount}, infinity), L#lstate{prefetch_limited = true}. unlimit_prefetch(L) -> - ok = gen_server:call(L#lstate.pid, unlimit_prefetch), + ok = gen_server:call(L#lstate.pid, unlimit_prefetch, infinity), L#lstate{prefetch_limited = false}. block(L) -> - ok = gen_server:call(L#lstate.pid, block), + ok = gen_server:call(L#lstate.pid, block, infinity), L#lstate{blocked = true}. unblock(L) -> - ok = gen_server:call(L#lstate.pid, unblock), + ok = gen_server:call(L#lstate.pid, unblock, infinity), L#lstate{blocked = false}. is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited. @@ -224,7 +225,8 @@ is_blocked(#lstate{blocked = Blocked}) -> Blocked. is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L). get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0; -get_prefetch_limit(L) -> gen_server:call(L#lstate.pid, get_prefetch_limit). +get_prefetch_limit(L) -> + gen_server:call(L#lstate.pid, get_prefetch_limit, infinity). ack(#lstate{prefetch_limited = false}, _AckCount) -> ok; ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 3abd81f5..d9cef642 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -110,7 +110,13 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]}) end), {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), - rabbit_mirror_queue_misc:add_mirrors(QName, SNodes), + %% We need synchronous add here (i.e. do not return until the + %% slave is running) so that when queue declaration is finished + %% all slaves are up; we don't want to end up with unsynced slaves + %% just by declaring a new queue. But add can't be synchronous all + %% the time as it can be called by slaves and that's + %% deadlock-prone. + rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync), #state { name = QName, gm = GM, coordinator = CPid, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 8ad7c62f..ca495733 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -17,9 +17,10 @@ -module(rabbit_mirror_queue_misc). -behaviour(rabbit_policy_validator). --export([remove_from_queue/3, on_node_up/0, add_mirrors/2, add_mirror/2, +-export([remove_from_queue/3, on_node_up/0, add_mirrors/3, report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, - is_mirrored/1, update_mirrors/2, validate_policy/1]). + is_mirrored/1, update_mirrors/2, validate_policy/1, + maybe_auto_sync/1]). %% for testing only -export([module/1]). @@ -45,10 +46,8 @@ (rabbit_amqqueue:name(), pid(), [pid()]) -> {'ok', pid(), [pid()]} | {'error', 'not_found'}). -spec(on_node_up/0 :: () -> 'ok'). --spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok'). --spec(add_mirror/2 :: - (rabbit_amqqueue:name(), node()) -> - {'ok', atom()} | rabbit_types:error(any())). +-spec(add_mirrors/3 :: (rabbit_amqqueue:name(), [node()], 'sync' | 'async') + -> 'ok'). -spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). -spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) -> @@ -56,6 +55,7 @@ -spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()). -spec(update_mirrors/2 :: (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). +-spec(maybe_auto_sync/1 :: (rabbit_types:amqqueue()) -> 'ok'). -endif. @@ -94,10 +94,15 @@ remove_from_queue(QueueName, Self, LiveGMPids) -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. - store_updated_slaves( - Q #amqqueue { pid = QPid1, + Q1 = Q#amqqueue{pid = QPid1, slave_pids = SPids1, - gm_pids = GMPids1 }), + gm_pids = GMPids1}, + store_updated_slaves(Q1), + %% If we add and remove nodes at the same time we + %% might tell the old master we need to sync and + %% then shut it down. So let's check if the new + %% master needs to sync. + maybe_auto_sync(Q1), {ok, QPid1, [QPid | SPids] -- Alive}; _ -> %% Master has changed, and we're not it, @@ -135,7 +140,7 @@ on_node_up() -> end end, [], rabbit_queue) end), - [add_mirror(QName, node()) || QName <- QNames], + [add_mirror(QName, node(), async) || QName <- QNames], ok. drop_mirrors(QName, Nodes) -> @@ -160,45 +165,35 @@ drop_mirror(QName, MirrorNode) -> end end). -add_mirrors(QName, Nodes) -> - [add_mirror(QName, Node) || Node <- Nodes], +add_mirrors(QName, Nodes, SyncMode) -> + [add_mirror(QName, Node, SyncMode) || Node <- Nodes], ok. -add_mirror(QName, MirrorNode) -> +add_mirror(QName, MirrorNode, SyncMode) -> rabbit_amqqueue:with( QName, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> - start_child(Name, MirrorNode, Q); + start_child(Name, MirrorNode, Q, SyncMode); [SPid] -> case rabbit_misc:is_process_alive(SPid) of true -> {ok, already_mirrored}; - false -> start_child(Name, MirrorNode, Q) + false -> start_child(Name, MirrorNode, Q, SyncMode) end end end). -start_child(Name, MirrorNode, Q) -> +start_child(Name, MirrorNode, Q, SyncMode) -> case rabbit_misc:with_exit_handler( - rabbit_misc:const({ok, down}), + rabbit_misc:const(down), fun () -> rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) end) of - {ok, SPid} when is_pid(SPid) -> - maybe_auto_sync(Q), - rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, SPid]), - {ok, started}; - {error, {{stale_master_pid, StalePid}, _}} -> - rabbit_log:warning("Detected stale HA master while adding " - "mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, StalePid]), - {ok, stale_master}; - {error, {{duplicate_live_master, _}=Err, _}} -> - Err; - Other -> - Other + {ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, SPid]), + rabbit_mirror_queue_slave:go(SPid, SyncMode); + _ -> ok end. report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> @@ -312,8 +307,10 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ), OldNodes = [OldMNode | OldSNodes], NewNodes = [NewMNode | NewSNodes], - add_mirrors (QName, NewNodes -- OldNodes), + add_mirrors (QName, NewNodes -- OldNodes, async), drop_mirrors(QName, OldNodes -- NewNodes), + %% This is for the case where no extra nodes were added but we changed to + %% a policy requiring auto-sync. maybe_auto_sync(NewQ), ok. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index b1a86493..96f89ecc 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -24,7 +24,7 @@ %% All instructions from the GM group must be processed in the order %% in which they're received. --export([start_link/1, set_maximum_since_use/2, info/1]). +-export([start_link/1, set_maximum_since_use/2, info/1, go/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/4, @@ -78,7 +78,15 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). -init(Q = #amqqueue { name = QName }) -> +init(Q) -> + {ok, {not_started, Q}, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}}. + +go(SPid, sync) -> gen_server2:call(SPid, go, infinity); +go(SPid, async) -> gen_server2:cast(SPid, go). + +handle_go(Q = #amqqueue{name = QName}) -> %% We join the GM group before we add ourselves to the amqqueue %% record. As a result: %% 1. We can receive msgs from GM that correspond to messages we will @@ -124,22 +132,27 @@ init(Q = #amqqueue { name = QName }) -> }, ok = gm:broadcast(GM, request_depth), ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]), - {ok, State, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, - ?DESIRED_HIBERNATE}}; + rabbit_mirror_queue_misc:maybe_auto_sync(Q1), + {ok, State}; {stale, StalePid} -> - {stop, {stale_master_pid, StalePid}}; + rabbit_log:warning("Detected stale HA master while adding " + "mirror of ~s: ~p~n", + [rabbit_misc:rs(QName), StalePid]), + gm:leave(GM), + {error, {stale_master_pid, StalePid}}; duplicate_live_master -> - {stop, {duplicate_live_master, Node}}; + gm:leave(GM), + {error, {duplicate_live_master, Node}}; existing -> gm:leave(GM), - ignore; + {error, normal}; master_in_recovery -> + gm:leave(GM), %% The queue record vanished - we must have a master starting %% concurrently with us. In that case we can safely decide to do %% nothing here, and the master will start us in %% master:init_with_existing_bq/3 - ignore + {error, normal} end. init_it(Self, GM, Node, QName) -> @@ -173,6 +186,12 @@ add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) -> rabbit_mirror_queue_misc:store_updated_slaves( Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}). +handle_call(go, _From, {not_started, Q} = NotStarted) -> + case handle_go(Q) of + {ok, State} -> {reply, ok, State}; + {error, Error} -> {stop, Error, NotStarted} + end; + handle_call({deliver, Delivery, true}, From, State) -> %% Synchronous, "mandatory" deliver mode. gen_server2:reply(From, ok), @@ -208,6 +227,12 @@ handle_call({gm_deaths, LiveGMPids}, From, handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State). +handle_cast(go, {not_started, Q} = NotStarted) -> + case handle_go(Q) of + {ok, State} -> {noreply, State}; + {error, Error} -> {stop, Error, NotStarted} + end; + handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -293,32 +318,43 @@ handle_info({bump_credit, Msg}, State) -> handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. -%% If the Reason is shutdown, or {shutdown, _}, it is not the queue -%% being deleted: it's just the node going down. Even though we're a -%% slave, we have no idea whether or not we'll be the only copy coming -%% back up. Thus we must assume we will be, and preserve anything we -%% have on disk. +terminate(normal, {not_started, _Q}) -> + ok; terminate(_Reason, #state { backing_queue_state = undefined }) -> %% We've received a delete_and_terminate from gm, thus nothing to %% do here. ok; -terminate({shutdown, dropped} = R, #state { backing_queue = BQ, - backing_queue_state = BQS }) -> +terminate({shutdown, dropped} = R, State = #state{backing_queue = BQ, + backing_queue_state = BQS}) -> %% See rabbit_mirror_queue_master:terminate/2 + terminate_common(State), BQ:delete_and_terminate(R, BQS); -terminate(Reason, #state { q = Q, - gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - rate_timer_ref = RateTRef }) -> - ok = gm:leave(GM), - QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( - Q, BQ, BQS, RateTRef, [], pmon:new(), dict:new()), - rabbit_amqqueue_process:terminate(Reason, QueueState); +terminate(shutdown, State) -> + terminate_shutdown(shutdown, State); +terminate({shutdown, _} = R, State) -> + terminate_shutdown(R, State); +terminate(Reason, State = #state{backing_queue = BQ, + backing_queue_state = BQS}) -> + terminate_common(State), + BQ:delete_and_terminate(Reason, BQS); terminate([_SPid], _Reason) -> %% gm case ok. +%% If the Reason is shutdown, or {shutdown, _}, it is not the queue +%% being deleted: it's just the node going down. Even though we're a +%% slave, we have no idea whether or not we'll be the only copy coming +%% back up. Thus we must assume we will be, and preserve anything we +%% have on disk. +terminate_shutdown(Reason, State = #state{backing_queue = BQ, + backing_queue_state = BQS}) -> + terminate_common(State), + BQ:terminate(Reason, BQS). + +terminate_common(State) -> + ok = rabbit_memory_monitor:deregister(self()), + stop_rate_timer(stop_sync_timer(State)). + code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -394,7 +430,9 @@ handle_msg([SPid], _From, Msg) -> ok = gen_server2:cast(SPid, {gm, Msg}). inform_deaths(SPid, Live) -> - case gen_server2:call(SPid, {gm_deaths, Live}, infinity) of + case rabbit_misc:with_exit_handler( + rabbit_misc:const(ok), + fun() -> gen_server2:call(SPid, {gm_deaths, Live}, infinity) end) of ok -> ok; {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]} end. @@ -663,8 +701,13 @@ confirm_sender_death(Pid) -> ok. forget_sender(_, running) -> false; +forget_sender(down_from_gm, down_from_gm) -> false; %% [1] forget_sender(Down1, Down2) when Down1 =/= Down2 -> true. +%% [1] If another slave goes through confirm_sender_death/1 before we +%% do we can get two GM sender_death messages in a row for the same +%% channel - don't treat that as anything special. + %% Record and process lifetime events from channels. Forget all about a channel %% only when down notifications are received from both the channel and from gm. maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ, diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 3a8fae7f..f27f77c6 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -434,10 +434,8 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) -> %% First disc node up maybe_force_load(), ok; - {[AnotherNode | _], _, _} -> + {[_ | _], _, _} -> %% Subsequent node in cluster, catch up - ensure_version_ok( - rpc:call(AnotherNode, rabbit_version, recorded, [])), maybe_force_load(), ok = rabbit_table:wait_for_replicated(), ok = rabbit_table:create_local_copy(NodeType) @@ -639,15 +637,6 @@ schema_ok_or_move() -> ok = create_schema() end. -ensure_version_ok({ok, DiscVersion}) -> - DesiredVersion = rabbit_version:desired(), - case rabbit_version:matches(DesiredVersion, DiscVersion) of - true -> ok; - false -> throw({error, {version_mismatch, DesiredVersion, DiscVersion}}) - end; -ensure_version_ok({error, _}) -> - ok = rabbit_version:record_desired(). - %% We only care about disc nodes since ram nodes are supposed to catch %% up only create_schema() -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 46cfabe3..91be4dcb 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -149,14 +149,22 @@ ensure_ssl() -> ok = app_utils:start_applications(SslAppsConfig), {ok, SslOptsConfig} = application:get_env(rabbit, ssl_options), - % unknown_ca errors are silently ignored prior to R14B unless we - % supply this verify_fun - remove when at least R14B is required - case proplists:get_value(verify, SslOptsConfig, verify_none) of - verify_none -> SslOptsConfig; - verify_peer -> [{verify_fun, fun([]) -> true; - ([_|_]) -> false - end} - | SslOptsConfig] + case rabbit_misc:pget(verify_fun, SslOptsConfig) of + {Module, Function} -> + rabbit_misc:pset(verify_fun, + fun (ErrorList) -> + Module:Function(ErrorList) + end, SslOptsConfig); + undefined -> + % unknown_ca errors are silently ignored prior to R14B unless we + % supply this verify_fun - remove when at least R14B is required + case proplists:get_value(verify, SslOptsConfig, verify_none) of + verify_none -> SslOptsConfig; + verify_peer -> [{verify_fun, fun([]) -> true; + ([_|_]) -> false + end} + | SslOptsConfig] + end end. ssl_transform_fun(SslOpts) -> diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index de10b30f..cd55381a 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -208,10 +208,16 @@ notify_clear(VHost, <<"policy">>, _Name) -> %%---------------------------------------------------------------------------- +%% [1] We need to prevent this from becoming O(n^2) in a similar +%% manner to rabbit_binding:remove_for_{source,destination}. So see +%% the comment in rabbit_binding:lock_route_tables/0 for more rationale. update_policies(VHost) -> - Policies = list(VHost), + Tabs = [rabbit_queue, rabbit_durable_queue, + rabbit_exchange, rabbit_durable_exchange], {Xs, Qs} = rabbit_misc:execute_mnesia_transaction( fun() -> + [mnesia:lock({table, T}, write) || T <- Tabs], %% [1] + Policies = list(VHost), {[update_exchange(X, Policies) || X <- rabbit_exchange:list(VHost)], [update_queue(Q, Policies) || diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 157b8270..67effab0 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -18,12 +18,12 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/3, info_keys/0, info/1, info/2, force_event_refresh/1, +-export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/1, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/4, mainloop/2, recvloop/2]). +-export([init/2, mainloop/2, recvloop/2]). -export([conserve_resources/3, server_properties/1]). @@ -32,16 +32,16 @@ -define(CLOSING_TIMEOUT, 30). -define(CHANNEL_TERMINATION_TIMEOUT, 3). -define(SILENT_CLOSE_DELAY, 3). +-define(CHANNEL_MIN, 1). %%-------------------------------------------------------------------------- -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, - connection_state, queue_collector, heartbeater, stats_timer, - ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun, - buf, buf_len, throttle}). + connection_state, helper_sup, queue_collector, heartbeater, + stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}). -record(connection, {name, host, peer_host, port, peer_port, - protocol, user, timeout_sec, frame_max, vhost, + protocol, user, timeout_sec, frame_max, channel_max, vhost, client_properties, capabilities, auth_mechanism, auth_state}). @@ -49,15 +49,14 @@ blocked_sent}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, - send_pend, state, last_blocked_by, last_blocked_age, - channels]). + send_pend, state, channels]). -define(CREATION_EVENT_KEYS, [pid, name, port, peer_port, host, peer_host, ssl, peer_cert_subject, peer_cert_issuer, peer_cert_validity, auth_mechanism, ssl_protocol, ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost, - timeout, frame_max, client_properties]). + timeout, frame_max, channel_max, client_properties]). -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). @@ -74,8 +73,7 @@ -ifdef(use_specs). --spec(start_link/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) -> - rabbit_types:ok(pid())). +-spec(start_link/1 :: (pid()) -> rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). @@ -86,11 +84,9 @@ rabbit_framing:amqp_table()). %% These specs only exists to add no_return() to keep dialyzer happy --spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) - -> no_return()). --spec(start_connection/7 :: - (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(), - rabbit_net:socket(), +-spec(init/2 :: (pid(), pid()) -> no_return()). +-spec(start_connection/5 :: + (pid(), pid(), any(), rabbit_net:socket(), fun ((rabbit_net:socket()) -> rabbit_types:ok_or_error2( rabbit_net:socket(), any()))) -> no_return()). @@ -104,20 +100,17 @@ %%-------------------------------------------------------------------------- -start_link(ChannelSup3Pid, Collector, StartHeartbeatFun) -> - {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSup3Pid, - Collector, StartHeartbeatFun])}. +start_link(HelperSup) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), HelperSup])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent, ChSup3Pid, Collector, StartHeartbeatFun) -> +init(Parent, HelperSup) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> - start_connection( - Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock, - SockTransform) + start_connection(Parent, HelperSup, Deb, Sock, SockTransform) end. system_continue(Parent, Deb, State) -> @@ -205,8 +198,7 @@ socket_op(Sock, Fun) -> exit(normal) end. -start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, - Sock, SockTransform) -> +start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), Name = case rabbit_net:connection_string(Sock, inbound) of {ok, Str} -> Str; @@ -242,11 +234,10 @@ start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, recv_len = 0, pending_recv = false, connection_state = pre_init, - queue_collector = Collector, + queue_collector = undefined, %% started on tune-ok + helper_sup = HelperSup, heartbeater = none, - ch_sup3_pid = ChSup3Pid, channel_sup_sup_pid = none, - start_heartbeat_fun = StartHeartbeatFun, buf = [], buf_len = 0, throttle = #throttle{ @@ -615,17 +606,26 @@ create_channel(Channel, State) -> connection = #connection{name = Name, protocol = Protocol, frame_max = FrameMax, + channel_max = ChannelMax, user = User, vhost = VHost, capabilities = Capabilities}} = State, - {ok, _ChSupPid, {ChPid, AState}} = - rabbit_channel_sup_sup:start_channel( - ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, - Protocol, User, VHost, Capabilities, Collector}), - MRef = erlang:monitor(process, ChPid), - put({ch_pid, ChPid}, {Channel, MRef}), - put({channel, Channel}, {ChPid, AState}), - {ChPid, AState}. + N = length(all_channels()), + case ChannelMax == 0 orelse N < ChannelMax of + true -> {ok, _ChSupPid, {ChPid, AState}} = + rabbit_channel_sup_sup:start_channel( + ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, + Protocol, User, VHost, Capabilities, + Collector}), + MRef = erlang:monitor(process, ChPid), + put({ch_pid, ChPid}, {Channel, MRef}), + put({channel, Channel}, {ChPid, AState}), + {ok, {ChPid, AState}}; + false -> {error, rabbit_misc:amqp_error( + not_allowed, "number of channels opened (~w) has " + "reached the negotiated channel_max (~w)", + [N, ChannelMax], 'none')} + end. channel_cleanup(ChPid) -> case get({ch_pid, ChPid}) of @@ -673,24 +673,28 @@ handle_frame(Type, Channel, Payload, State) -> process_frame(Frame, Channel, State) -> ChKey = {channel, Channel}, - {ChPid, AState} = case get(ChKey) of - undefined -> create_channel(Channel, State); - Other -> Other - end, - case rabbit_command_assembler:process(Frame, AState) of - {ok, NewAState} -> - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {ok, Method, NewAState} -> - rabbit_channel:do(ChPid, Method), - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, State); - {ok, Method, Content, NewAState} -> - rabbit_channel:do_flow(ChPid, Method, Content), - put(ChKey, {ChPid, NewAState}), - post_process_frame(Frame, ChPid, control_throttle(State)); - {error, Reason} -> - handle_exception(State, Channel, Reason) + case (case get(ChKey) of + undefined -> create_channel(Channel, State); + Other -> {ok, Other} + end) of + {error, Error} -> + handle_exception(State, Channel, Error); + {ok, {ChPid, AState}} -> + case rabbit_command_assembler:process(Frame, AState) of + {ok, NewAState} -> + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + {ok, Method, NewAState} -> + rabbit_channel:do(ChPid, Method), + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + {ok, Method, Content, NewAState} -> + rabbit_channel:do_flow(ChPid, Method, Content), + put(ChKey, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, control_throttle(State)); + {error, Reason} -> + handle_exception(State, Channel, Reason) + end end. post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> @@ -847,41 +851,40 @@ handle_method0(#'connection.secure_ok'{response = Response}, State = #v1{connection_state = securing}) -> auth_phase(Response, State); -handle_method0(#'connection.tune_ok'{frame_max = FrameMax, - heartbeat = ClientHeartbeat}, +handle_method0(#'connection.tune_ok'{frame_max = FrameMax, + channel_max = ChannelMax, + heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, connection = Connection, - sock = Sock, - start_heartbeat_fun = SHF}) -> - ServerFrameMax = server_frame_max(), - if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE -> - rabbit_misc:protocol_error( - not_allowed, "frame_max=~w < ~w min size", - [FrameMax, ?FRAME_MIN_SIZE]); - ServerFrameMax /= 0 andalso FrameMax > ServerFrameMax -> - rabbit_misc:protocol_error( - not_allowed, "frame_max=~w > ~w max size", - [FrameMax, ServerFrameMax]); - true -> - Frame = rabbit_binary_generator:build_heartbeat_frame(), - SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, - Parent = self(), - ReceiveFun = fun() -> Parent ! heartbeat_timeout end, - Heartbeater = SHF(Sock, ClientHeartbeat, SendFun, - ClientHeartbeat, ReceiveFun), - State#v1{connection_state = opening, - connection = Connection#connection{ - timeout_sec = ClientHeartbeat, - frame_max = FrameMax}, - heartbeater = Heartbeater} - end; + helper_sup = SupPid, + sock = Sock}) -> + ok = validate_negotiated_integer_value( + frame_max, ?FRAME_MIN_SIZE, FrameMax), + ok = validate_negotiated_integer_value( + channel_max, ?CHANNEL_MIN, ChannelMax), + {ok, Collector} = + rabbit_connection_helper_sup:start_queue_collector(SupPid), + Frame = rabbit_binary_generator:build_heartbeat_frame(), + SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, + Parent = self(), + ReceiveFun = fun() -> Parent ! heartbeat_timeout end, + Heartbeater = + rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat, + SendFun, ClientHeartbeat, ReceiveFun), + State#v1{connection_state = opening, + connection = Connection#connection{ + frame_max = FrameMax, + channel_max = ChannelMax, + timeout_sec = ClientHeartbeat}, + queue_collector = Collector, + heartbeater = Heartbeater}; handle_method0(#'connection.open'{virtual_host = VHostPath}, State = #v1{connection_state = opening, connection = Connection = #connection{ user = User, protocol = Protocol}, - ch_sup3_pid = ChSup3Pid, + helper_sup = SupPid, sock = Sock, throttle = Throttle}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), @@ -890,10 +893,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), Throttle1 = Throttle#throttle{alarmed_by = Conserve}, {ok, ChannelSupSupPid} = - supervisor2:start_child( - ChSup3Pid, - {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, - intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), + rabbit_connection_helper_sup:start_channel_sup_sup(SupPid), State1 = control_throttle( State#v1{connection_state = running, connection = NewConnection, @@ -925,13 +925,28 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -server_frame_max() -> - {ok, FrameMax} = application:get_env(rabbit, frame_max), - FrameMax. +validate_negotiated_integer_value(Field, Min, ClientValue) -> + ServerValue = get_env(Field), + if ClientValue /= 0 andalso ClientValue < Min -> + fail_negotiation(Field, min, ServerValue, ClientValue); + ServerValue /= 0 andalso ClientValue > ServerValue -> + fail_negotiation(Field, max, ServerValue, ClientValue); + true -> + ok + end. + +fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) -> + {S1, S2} = case MinOrMax of + min -> {lower, minimum}; + max -> {higher, maximum} + end, + rabbit_misc:protocol_error( + not_allowed, "negotiated ~w = ~w is ~w than the ~w allowed value (~w)", + [Field, ClientValue, S1, S2, ServerValue], 'connection.tune'). -server_heartbeat() -> - {ok, Heartbeat} = application:get_env(rabbit, heartbeat), - Heartbeat. +get_env(Key) -> + {ok, Value} = application:get_env(rabbit, Key), + Value. send_on_channel0(Sock, Method, Protocol) -> ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). @@ -997,9 +1012,9 @@ auth_phase(Response, State#v1{connection = Connection#connection{ auth_state = AuthState1}}; {ok, User} -> - Tune = #'connection.tune'{channel_max = 0, - frame_max = server_frame_max(), - heartbeat = server_heartbeat()}, + Tune = #'connection.tune'{frame_max = get_env(frame_max), + channel_max = get_env(channel_max), + heartbeat = get_env(heartbeat)}, ok = send_on_channel0(Sock, Tune, Protocol), State#v1{connection_state = tuning, connection = Connection#connection{user = User, @@ -1026,13 +1041,17 @@ i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S); i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S); i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S); i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S); -i(state, #v1{connection_state = CS}) -> CS; -i(last_blocked_by, #v1{throttle = #throttle{last_blocked_by = By}}) -> By; -i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) -> - infinity; -i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) -> - timer:now_diff(erlang:now(), T) / 1000000; i(channels, #v1{}) -> length(all_channels()); +i(state, #v1{connection_state = ConnectionState, + throttle = #throttle{last_blocked_by = BlockedBy, + last_blocked_at = T}}) -> + Recent = T =/= never andalso timer:now_diff(erlang:now(), T) < 5000000, + case {BlockedBy, ConnectionState, Recent} of + {resourse, blocked, _} -> blocked; + {_, blocking, _} -> blocking; + {flow, _, true} -> flow; + {_, _, _} -> ConnectionState + end; i(Item, #v1{connection = Conn}) -> ic(Item, Conn). ic(name, #connection{name = Name}) -> Name; @@ -1047,6 +1066,7 @@ ic(user, #connection{user = U}) -> U#user.username; ic(vhost, #connection{vhost = VHost}) -> VHost; ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout; ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax; +ic(channel_max, #connection{channel_max = ChMax}) -> ChMax; ic(client_properties, #connection{client_properties = CP}) -> CP; ic(auth_mechanism, #connection{auth_mechanism = none}) -> none; ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name; @@ -1081,8 +1101,16 @@ maybe_emit_stats(State) -> fun() -> emit_stats(State) end). emit_stats(State) -> - rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), - rabbit_event:reset_stats_timer(State, #v1.stats_timer). + Infos = infos(?STATISTICS_KEYS, State), + rabbit_event:notify(connection_stats, Infos), + State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer), + %% If we emit an event which looks like we are in flow control, it's not a + %% good idea for it to be our last even if we go idle. Keep emitting + %% events, either we stay busy or we drop out of flow control. + case proplists:get_value(state, Infos) of + flow -> ensure_stats_timer(State1); + _ -> State1 + end. %% 1.0 stub -ifdef(use_specs). @@ -1106,10 +1134,7 @@ pack_for_1_0(#v1{parent = Parent, sock = Sock, recv_len = RecvLen, pending_recv = PendingRecv, - queue_collector = QueueCollector, - ch_sup3_pid = ChSup3Pid, - start_heartbeat_fun = SHF, + helper_sup = SupPid, buf = Buf, buf_len = BufLen}) -> - {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ChSup3Pid, SHF, - Buf, BufLen}. + {Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen}. diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index c13c333e..bcde0078 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -139,15 +139,21 @@ list() -> list(VHost) -> list(VHost, '_'). list_component(Component) -> list('_', Component). +%% Not dirty_match_object since that would not be transactional when used in a +%% tx context list(VHost, Component) -> - case VHost of - '_' -> ok; - _ -> rabbit_vhost:assert(VHost) - end, - Match = #runtime_parameters{key = {VHost, Component, '_'}, _ = '_'}, - [p(P) || #runtime_parameters{key = {_VHost, Comp, _Name}} = P <- - mnesia:dirty_match_object(?TABLE, Match), - Comp =/= <<"policy">> orelse Component =:= <<"policy">>]. + mnesia:async_dirty( + fun () -> + case VHost of + '_' -> ok; + _ -> rabbit_vhost:assert(VHost) + end, + Match = #runtime_parameters{key = {VHost, Component, '_'}, + _ = '_'}, + [p(P) || #runtime_parameters{key = {_VHost, Comp, _Name}} = P <- + mnesia:match_object(?TABLE, Match, read), + Comp =/= <<"policy">> orelse Component =:= <<"policy">>] + end). list_formatted(VHost) -> [pset(value, format(pget(value, P)), P) || P <- list(VHost)]. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 76421d1a..5fe319d3 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -808,6 +808,7 @@ test_log_management_during_startup() -> %% start application with logging to non-existing directory TmpLog = "/tmp/rabbit-tests/test.log", delete_file(TmpLog), + ok = control_action(stop_app, []), ok = application:set_env(rabbit, error_logger, {file, TmpLog}), ok = delete_log_handlers([rabbit_error_logger_file_h]), @@ -816,6 +817,7 @@ test_log_management_during_startup() -> %% start application with logging to directory with no %% write permissions + ok = control_action(stop_app, []), TmpDir = "/tmp/rabbit-tests", ok = set_permissions(TmpDir, 8#00400), ok = delete_log_handlers([rabbit_error_logger_file_h]), @@ -830,6 +832,7 @@ test_log_management_during_startup() -> %% start application with logging to a subdirectory which %% parent directory has no write permissions + ok = control_action(stop_app, []), TmpTestDir = "/tmp/rabbit-tests/no-permission/test/log", ok = application:set_env(rabbit, error_logger, {file, TmpTestDir}), ok = add_log_handlers([{error_logger_file_h, MainLog}]), @@ -849,12 +852,13 @@ test_log_management_during_startup() -> %% start application with standard error_logger_file_h %% handler not installed + ok = control_action(stop_app, []), ok = application:set_env(rabbit, error_logger, {file, MainLog}), ok = control_action(start_app, []), - ok = control_action(stop_app, []), %% start application with standard sasl handler not installed %% and rabbit main log handler installed correctly + ok = control_action(stop_app, []), ok = delete_log_handlers([rabbit_sasl_report_file_h]), ok = control_action(start_app, []), passed. diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 1047b823..c1f142d7 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -191,9 +191,14 @@ die(Msg, Args) -> %% straight out into do_boot, generating an erl_crash.dump %% and displaying any error message in a confusing way. error_logger:error_msg(Msg, Args), - io:format("~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args), + Str = rabbit_misc:format( + "~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args), + io:format(Str), error_logger:logfile(close), - halt(1). + case application:get_env(rabbit, halt_on_upgrade_failure) of + {ok, false} -> throw({upgrade_error, Str}); + _ -> halt(1) %% i.e. true or undefined + end. primary_upgrade(Upgrades, Nodes) -> Others = Nodes -- [node()], diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 6f95ef60..90372461 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -46,6 +46,7 @@ -rabbit_upgrade({exchange_decorators, mnesia, [policy]}). -rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}). -rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}). +-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}). %% ------------------------------------------------------------------- @@ -74,6 +75,7 @@ -spec(exchange_decorators/0 :: () -> 'ok'). -spec(policy_apply_to/0 :: () -> 'ok'). -spec(queue_decorators/0 :: () -> 'ok'). +-spec(internal_system_x/0 :: () -> 'ok'). -endif. @@ -340,6 +342,19 @@ queue_decorators(Table) -> [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, sync_slave_pids, policy, gm_pids, decorators]). +internal_system_x() -> + transform( + rabbit_durable_exchange, + fun ({exchange, Name = {resource, _, _, <<"amq.rabbitmq.", _/binary>>}, + Type, Dur, AutoDel, _Int, Args, Scratches, Policy, Decorators}) -> + {exchange, Name, Type, Dur, AutoDel, true, Args, Scratches, + Policy, Decorators}; + (X) -> + X + end, + [name, type, durable, auto_delete, internal, arguments, scratches, policy, + decorators]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 8d013d43..047bce77 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -60,15 +60,17 @@ add(VHostPath) -> (ok, false) -> [rabbit_exchange:declare( rabbit_misc:r(VHostPath, exchange, Name), - Type, true, false, false, []) || - {Name,Type} <- - [{<<"">>, direct}, - {<<"amq.direct">>, direct}, - {<<"amq.topic">>, topic}, - {<<"amq.match">>, headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, headers}, %% per 0-9-1 xml - {<<"amq.fanout">>, fanout}, - {<<"amq.rabbitmq.trace">>, topic}]], + Type, true, false, Internal, []) || + {Name, Type, Internal} <- + [{<<"">>, direct, false}, + {<<"amq.direct">>, direct, false}, + {<<"amq.topic">>, topic, false}, + %% per 0-9-1 pdf + {<<"amq.match">>, headers, false}, + %% per 0-9-1 xml + {<<"amq.headers">>, headers, false}, + {<<"amq.fanout">>, fanout, false}, + {<<"amq.rabbitmq.trace">>, topic, true}]], ok end), rabbit_event:notify(vhost_created, info(VHostPath)), diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index bf6964d8..34dd3d3b 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -25,6 +25,7 @@ -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, send_command_and_notify/4, send_command_and_notify/5, + send_command_flow/2, send_command_flow/3, flush/1]). -export([internal_send_command/4, internal_send_command/6]). @@ -78,6 +79,11 @@ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). +-spec(send_command_flow/2 :: + (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). +-spec(send_command_flow/3 :: + (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) + -> 'ok'). -spec(flush/1 :: (pid()) -> 'ok'). -spec(internal_send_command/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), @@ -165,6 +171,12 @@ handle_message({send_command, MethodRecord}, State) -> internal_send_command_async(MethodRecord, State); handle_message({send_command, MethodRecord, Content}, State) -> internal_send_command_async(MethodRecord, Content, State); +handle_message({send_command_flow, MethodRecord, Sender}, State) -> + credit_flow:ack(Sender), + internal_send_command_async(MethodRecord, State); +handle_message({send_command_flow, MethodRecord, Content, Sender}, State) -> + credit_flow:ack(Sender), + internal_send_command_async(MethodRecord, Content, State); handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> State1 = internal_flush( internal_send_command_async(MethodRecord, State)), @@ -212,6 +224,16 @@ send_command(W, MethodRecord, Content) -> W ! {send_command, MethodRecord, Content}, ok. +send_command_flow(W, MethodRecord) -> + credit_flow:send(W), + W ! {send_command_flow, MethodRecord, self()}, + ok. + +send_command_flow(W, MethodRecord, Content) -> + credit_flow:send(W), + W ! {send_command_flow, MethodRecord, Content, self()}, + ok. + send_command_sync(W, MethodRecord) -> call(W, {send_command_sync, MethodRecord}). diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index a07f6c65..369ec655 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -221,11 +221,11 @@ get_vm_limit({win32,_OSname}) -> 8 -> 8*1024*1024*1024*1024 %% 8 TB for 64 bits 2^42 end; -%% On a 32-bit machine, if you're using more than 4 gigs of RAM you're +%% On a 32-bit machine, if you're using more than 2 gigs of RAM you're %% in big trouble anyway. get_vm_limit(_OsType) -> case erlang:system_info(wordsize) of - 4 -> 4*1024*1024*1024; %% 4 GB for 32 bits 2^32 + 4 -> 2*1024*1024*1024; %% 2 GB for 32 bits 2^31 8 -> 256*1024*1024*1024*1024 %% 256 TB for 64 bits 2^48 %%http://en.wikipedia.org/wiki/X86-64#Virtual_address_space_details end. diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 488db5ec..e14c471c 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -63,7 +63,7 @@ start_link() -> submit(Fun) -> case get(worker_pool_worker) of true -> worker_pool_worker:run(Fun); - _ -> Pid = gen_server2:call(?SERVER, next_free, infinity), + _ -> Pid = gen_server2:call(?SERVER, {next_free, self()}, infinity), worker_pool_worker:submit(Pid, Fun) end. @@ -79,15 +79,17 @@ init([]) -> {ok, #state { pending = queue:new(), available = queue:new() }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call(next_free, From, State = #state { available = Avail, - pending = Pending }) -> +handle_call({next_free, CPid}, From, State = #state { available = Avail, + pending = Pending }) -> case queue:out(Avail) of {empty, _Avail} -> {noreply, - State #state { pending = queue:in({next_free, From}, Pending) }, + State#state{pending = queue:in({next_free, From, CPid}, Pending)}, hibernate}; {{value, WId}, Avail1} -> - {reply, get_worker_pid(WId), State #state { available = Avail1 }, + WPid = get_worker_pid(WId), + worker_pool_worker:next_job_from(WPid, CPid), + {reply, WPid, State #state { available = Avail1 }, hibernate} end; @@ -99,8 +101,10 @@ handle_cast({idle, WId}, State = #state { available = Avail, {noreply, case queue:out(Pending) of {empty, _Pending} -> State #state { available = queue:in(WId, Avail) }; - {{value, {next_free, From}}, Pending1} -> - gen_server2:reply(From, get_worker_pid(WId)), + {{value, {next_free, From, CPid}}, Pending1} -> + WPid = get_worker_pid(WId), + worker_pool_worker:next_job_from(WPid, CPid), + gen_server2:reply(From, WPid), State #state { pending = Pending1 }; {{value, {run_async, Fun}}, Pending1} -> worker_pool_worker:submit_async(get_worker_pid(WId), Fun), diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index a976503f..724235bf 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/1, submit/2, submit_async/2, run/1]). +-export([start_link/1, next_job_from/2, submit/2, submit_async/2, run/1]). -export([set_maximum_since_use/2]). @@ -32,6 +32,7 @@ -type(mfargs() :: {atom(), atom(), [any()]}). -spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}). +-spec(next_job_from/2 :: (pid(), pid()) -> 'ok'). -spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A). -spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok'). -spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()). @@ -44,13 +45,18 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). +-record(state, {id, next}). + %%---------------------------------------------------------------------------- start_link(WId) -> gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]). +next_job_from(Pid, CPid) -> + gen_server2:cast(Pid, {next_job_from, CPid}). + submit(Pid, Fun) -> - gen_server2:call(Pid, {submit, Fun}, infinity). + gen_server2:call(Pid, {submit, Fun, self()}, infinity). submit_async(Pid, Fun) -> gen_server2:cast(Pid, {submit_async, Fun}). @@ -70,32 +76,57 @@ init([WId]) -> [self()]), ok = worker_pool:idle(WId), put(worker_pool_worker, true), - {ok, WId, hibernate, + {ok, #state{id = WId}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; +prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7; prioritise_cast(_Msg, _Len, _State) -> 0. -handle_call({submit, Fun}, From, WId) -> +handle_call({submit, Fun, CPid}, From, State = #state{next = undefined}) -> + {noreply, State#state{next = {job, CPid, From, Fun}}, hibernate}; + +handle_call({submit, Fun, CPid}, From, State = #state{next = {from, CPid, MRef}, + id = WId}) -> + erlang:demonitor(MRef), gen_server2:reply(From, run(Fun)), ok = worker_pool:idle(WId), - {noreply, WId, hibernate}; + {noreply, State#state{next = undefined}, hibernate}; handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. -handle_cast({submit_async, Fun}, WId) -> +handle_cast({next_job_from, CPid}, State = #state{next = undefined}) -> + MRef = erlang:monitor(process, CPid), + {noreply, State#state{next = {from, CPid, MRef}}, hibernate}; + +handle_cast({next_job_from, CPid}, State = #state{next = {job, CPid, From, Fun}, + id = WId}) -> + gen_server2:reply(From, run(Fun)), + ok = worker_pool:idle(WId), + {noreply, State#state{next = undefined}, hibernate}; + +handle_cast({submit_async, Fun}, State = #state{id = WId}) -> run(Fun), ok = worker_pool:idle(WId), - {noreply, WId, hibernate}; + {noreply, State, hibernate}; -handle_cast({set_maximum_since_use, Age}, WId) -> +handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), - {noreply, WId, hibernate}; + {noreply, State, hibernate}; handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. +handle_info({'DOWN', MRef, process, CPid, _Reason}, + State = #state{id = WId, + next = {from, CPid, MRef}}) -> + ok = worker_pool:idle(WId), + {noreply, State#state{next = undefined}}; + +handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) -> + {noreply, State}; + handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. |