diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-06 17:19:59 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-06 17:19:59 +0100 |
commit | e30291815984e99874447b14b12c2a13aacb5eaa (patch) | |
tree | d115d03b586a3b4962a3f9afa2f12abc8ab55838 | |
parent | 7440218a2c41dc86f4ed9c270d2049b1634156e9 (diff) | |
parent | d1964b8f45f1692b91fea24f110d5da0eb091588 (diff) | |
download | rabbitmq-server-e30291815984e99874447b14b12c2a13aacb5eaa.tar.gz |
stable to default
49 files changed, 1681 insertions, 532 deletions
@@ -20,8 +20,6 @@ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml)) WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml $(DOCS_DIR)/rabbitmq-echopid.xml) USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-plugins.1.xml USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML))) -QC_MODULES := rabbit_backing_queue_qc -QC_TRIALS ?= 100 ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes) PYTHON=python @@ -56,6 +54,12 @@ endif #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc) +ifdef INSTRUMENT_FOR_QC +ERLC_OPTS += -DINSTR_MOD=gm_qc +else +ERLC_OPTS += -DINSTR_MOD=gm +endif + include version.mk PLUGINS_SRC_DIR?=$(shell [ -d "plugins-src" ] && echo "plugins-src" || echo ) @@ -217,7 +221,8 @@ run-tests: all echo $$OUT ; echo $$OUT | grep '^{ok, passed}$$' > /dev/null run-qc: all - $(foreach MOD,$(QC_MODULES),./quickcheck $(RABBITMQ_NODENAME) $(MOD) $(QC_TRIALS)) + ./quickcheck $(RABBITMQ_NODENAME) rabbit_backing_queue_qc 100 40 + ./quickcheck $(RABBITMQ_NODENAME) gm_qc 1000 200 start-background-node: all -rm -f $(RABBITMQ_MNESIA_DIR).pid @@ -381,3 +386,4 @@ include $(DEPS_FILE) endif .PHONY: run-qc + diff --git a/docs/rabbitmq-plugins.1.xml b/docs/rabbitmq-plugins.1.xml index 8ecb4fc8..f7be2d29 100644 --- a/docs/rabbitmq-plugins.1.xml +++ b/docs/rabbitmq-plugins.1.xml @@ -40,6 +40,7 @@ <refsynopsisdiv> <cmdsynopsis> <command>rabbitmq-plugins</command> + <arg choice="opt">-n <replaceable>node</replaceable></arg> <arg choice="req"><replaceable>command</replaceable></arg> <arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg> </cmdsynopsis> @@ -62,6 +63,16 @@ enabled. Implicitly enabled plugins are automatically disabled again when they are no longer required. </para> + + <para> + The <command>enable</command>, <command>disable</command> and + <command>set</command> commands will update the plugins file and + then attempt to connect to the broker and ensure it is running + all enabled plugins. By default if it is not possible to connect + to the running broker (for example if it is stopped) then a + warning is displayed. Specify <command>--online</command> or + <command>--offline</command> to change this behaviour. + </para> </refsect1> <refsect1> @@ -97,12 +108,14 @@ </variablelist> <para> Lists all plugins, their versions, dependencies and - descriptions. Each plugin is prefixed with a status - indicator - [ ] to indicate that the plugin is not - enabled, [E] to indicate that it is explicitly enabled, - [e] to indicate that it is implicitly enabled, and [!] to - indicate that it is enabled but missing and thus not - operational. + descriptions. Each plugin is prefixed with two status + indicator characters inside [ ]. The first indicator can + be " " to indicate that the plugin is not enabled, "E" to + indicate that it is explicitly enabled, "e" to indicate + that it is implicitly enabled, or "!" to indicate that it + is enabled but missing and thus not operational. The + second indicator can be " " to show that the plugin is not + running, or "*" to show that it is. </para> <para> If the optional pattern is given, only plugins whose @@ -130,17 +143,24 @@ </varlistentry> <varlistentry> - <term><cmdsynopsis><command>enable</command> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term> + <term><cmdsynopsis><command>enable</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> + <term>--offline</term> + <listitem><para>Just modify the enabled plugins file.</para></listitem> + </varlistentry> + <varlistentry> + <term>--online</term> + <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem> + </varlistentry> + <varlistentry> <term>plugin</term> <listitem><para>One or more plugins to enable.</para></listitem> </varlistentry> </variablelist> <para> - Enables the specified plugins and all their - dependencies. + Enables the specified plugins and all their dependencies. </para> <para role="example-prefix">For example:</para> @@ -154,17 +174,24 @@ </varlistentry> <varlistentry> - <term><cmdsynopsis><command>disable</command> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term> + <term><cmdsynopsis><command>disable</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> + <term>--offline</term> + <listitem><para>Just modify the enabled plugins file.</para></listitem> + </varlistentry> + <varlistentry> + <term>--online</term> + <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem> + </varlistentry> + <varlistentry> <term>plugin</term> <listitem><para>One or more plugins to disable.</para></listitem> </varlistentry> </variablelist> <para> - Disables the specified plugins and all plugins that - depend on them. + Disables the specified plugins and all their dependencies. </para> <para role="example-prefix">For example:</para> @@ -175,6 +202,42 @@ </para> </listitem> </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>set</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>--offline</term> + <listitem><para>Just modify the enabled plugins file.</para></listitem> + </varlistentry> + <varlistentry> + <term>--online</term> + <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem> + </varlistentry> + <varlistentry> + <term>plugin</term> + <listitem><para>Zero or more plugins to enable.</para></listitem> + </varlistentry> + </variablelist> + <para> + Enables the specified plugins and all their + dependencies. Unlike <command>rabbitmq-plugins + enable</command> this command ignores and overwrites any + existing enabled plugins. <command>rabbitmq-plugins + set</command> with no plugin arguments is a legal command + meaning "disable all plugins". + </para> + + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmq-plugins set rabbitmq_management</screen> + <para role="example"> + This command enables the <command>management</command> + plugin and its dependencies and disables everything else. + </para> + </listitem> + </varlistentry> + </variablelist> </refsect1> diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index a128bfbc..63540568 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -27,6 +27,11 @@ %% %% {ssl_listeners, [5671]}, + %% Maximum time for AMQP 0-8/0-9/0-9-1 handshake (after socket connection + %% and SSL handshake), in milliseconds. + %% + %% {handshake_timeout, 10000}, + %% Log levels (currently just used for connection logging). %% One of 'info', 'warning', 'error' or 'none', in decreasing order %% of verbosity. Defaults to 'info'. @@ -111,6 +116,10 @@ %% %% {ssl_cert_login_from, common_name}, + %% SSL handshake timeout, in milliseconds. + %% + %% {ssl_handshake_timeout, 5000}, + %% %% Default User / VHost %% ==================== @@ -221,7 +230,12 @@ %% Explicitly enable/disable hipe compilation. %% - %% {hipe_compile, true} + %% {hipe_compile, true}, + + %% Timeout used when waiting for Mnesia tables in a cluster to + %% become available. + %% + %% {mnesia_table_loading_timeout, 30000} ]}, @@ -265,9 +279,13 @@ %% {certfile, "/path/to/cert.pem"}, %% {keyfile, "/path/to/key.pem"}]}]}, + %% One of 'basic', 'detailed' or 'none'. See + %% http://www.rabbitmq.com/management.html#fine-stats for more details. + %% {rates_mode, basic}, + %% Configure how long aggregated data (such as message rates and queue %% lengths) is retained. Please read the plugin's documentation in - %% https://www.rabbitmq.com/management.html#configuration for more + %% http://www.rabbitmq.com/management.html#configuration for more %% details. %% %% {sample_retention_policies, @@ -276,14 +294,6 @@ %% {detailed, [{10, 5}]}]} ]}, - {rabbitmq_management_agent, - [%% Misc/Advanced Options - %% - %% NB: Change these only if you understand what you are doing! - %% - %% {force_fine_statistics, true} - ]}, - %% ---------------------------------------------------------------------------- %% RabbitMQ Shovel Plugin %% diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 01b024a2..908dad03 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -406,11 +406,15 @@ online, except when using the <command>--offline</command> flag. </para> <para> - When using the <command>--offline</command> flag the node you - connect to will become the canonical source for cluster metadata - (e.g. which queues exist), even if it was not before. Therefore - you should use this command on the latest node to shut down if - at all possible. + When using the <command>--offline</command> flag + rabbitmqctl will not attempt to connect to a node as + normal; instead it will temporarily become the node in + order to make the change. This is useful if the node + cannot be started normally. In this case the node will + become the canonical source for cluster metadata + (e.g. which queues exist), even if it was not + before. Therefore you should use this command on the + latest node to shut down if at all possible. </para> <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl -n hare@mcnulty forget_cluster_node rabbit@stringer</screen> @@ -454,6 +458,44 @@ </listitem> </varlistentry> <varlistentry> + <term><cmdsynopsis><command>force_boot</command></cmdsynopsis></term> + <listitem> + <para> + Ensure that the node will start next time, even if it + was not the last to shut down. + </para> + <para> + Normally when you shut down a RabbitMQ cluster + altogether, the first node you restart should be the + last one to go down, since it may have seen things + happen that other nodes did not. But sometimes + that's not possible: for instance if the entire cluster + loses power then all nodes may think they were not the + last to shut down. + </para> + <para> + In such a case you can invoke <command>rabbitmqctl + force_boot</command> while the node is down. This will + tell the node to unconditionally start next time you ask + it to. If any changes happened to the cluster after this + node shut down, they will be lost. + </para> + <para> + If the last node to go down is permanently lost then you + should use <command>rabbitmqctl forget_cluster_node + --offline</command> in preference to this command, as it + will ensure that mirrored queues which were mastered on + the lost node get promoted. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl force_boot</screen> + <para role="example"> + This will force the node not to wait for other nodes + next time it is started. + </para> + </listitem> + </varlistentry> + <varlistentry> <term><cmdsynopsis><command>sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis> </term> <listitem> @@ -1148,6 +1190,22 @@ (queue depth).</para></listitem> </varlistentry> <varlistentry> + <term>messages_ready_ram</term> + <listitem><para>Number of messages from messages_ready which are resident in ram.</para></listitem> + </varlistentry> + <varlistentry> + <term>messages_unacknowledged_ram</term> + <listitem><para>Number of messages from messages_unacknowledged which are resident in ram.</para></listitem> + </varlistentry> + <varlistentry> + <term>messages_ram</term> + <listitem><para>Total number of messages which are resident in ram.</para></listitem> + </varlistentry> + <varlistentry> + <term>messages_persistent</term> + <listitem><para>Total number of persistent messages in the queue (will always be 0 for transient queues).</para></listitem> + </varlistentry> + <varlistentry> <term>consumers</term> <listitem><para>Number of consumers.</para></listitem> </varlistentry> @@ -1475,6 +1533,10 @@ <term>send_pend</term> <listitem><para>Send queue size.</para></listitem> </varlistentry> + <varlistentry> + <term>connected_at</term> + <listitem><para>Date and time this connection was established, as timestamp.</para></listitem> + </varlistentry> </variablelist> <para> If no <command>connectioninfoitem</command>s are diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 7360208a..f26e0f77 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -39,12 +39,15 @@ {server_properties, []}, {collect_statistics, none}, {collect_statistics_interval, 5000}, + {mnesia_table_loading_timeout, 30000}, {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, {auth_backends, [rabbit_auth_backend_internal]}, {delegate_count, 16}, {trace_vhosts, []}, {log_levels, [{connection, info}]}, {ssl_cert_login_from, distinguished_name}, + {ssl_handshake_timeout, 5000}, + {handshake_timeout, 10000}, {reverse_dns_lookups, false}, {cluster_partition_handling, ignore}, {tcp_listen_options, [binary, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 5ac3197e..7a40f9eb 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -39,13 +39,25 @@ -record(resource, {virtual_host, kind, name}). --record(exchange, {name, type, durable, auto_delete, internal, arguments, - scratches, policy, decorators}). --record(exchange_serial, {name, next}). +%% fields described as 'transient' here are cleared when writing to +%% rabbit_durable_<thing> +-record(exchange, { + name, type, durable, auto_delete, internal, arguments, %% immutable + scratches, %% durable, explicitly updated via update_scratch/3 + policy, %% durable, implicitly updated when policy changes + decorators}). %% transient, recalculated in store/1 (i.e. recovery) + +-record(amqqueue, { + name, durable, auto_delete, exclusive_owner = none, %% immutable + arguments, %% immutable + pid, %% durable (just so we know home node) + slave_pids, sync_slave_pids, %% transient + down_slave_nodes, %% durable + policy, %% durable, implicit update as above + gm_pids, %% transient + decorators}). %% transient, recalculated as above --record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid, slave_pids, sync_slave_pids, policy, - gm_pids, decorators}). +-record(exchange_serial, {name, next}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). @@ -105,9 +117,6 @@ -define(DESIRED_HIBERNATE, 10000). -define(CREDIT_DISC_BOUND, {2000, 500}). -%% This is dictated by `erlang:send_after' on which we depend to implement TTL. --define(MAX_EXPIRY_TIMER, 4294967295). - -define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>). -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). -define(DELETED_HEADER, <<"BCC">>). diff --git a/packaging/common/README b/packaging/common/README index 0a29ee27..35a1523a 100644 --- a/packaging/common/README +++ b/packaging/common/README @@ -17,4 +17,4 @@ run as the superuser. An example configuration file is provided in the same directory as this README. Copy it to /etc/rabbitmq/rabbitmq.config to use it. The RabbitMQ server must be restarted after changing the configuration -file or enabling or disabling plugins. +file. @@ -7,17 +7,21 @@ %% NodeStr is a local broker node name %% ModStr is the module containing quickcheck properties %% TrialsStr is the number of trials -main([NodeStr, ModStr, TrialsStr]) -> +main([NodeStr, ModStr, NumTestsStr, MaxSizeStr]) -> {ok, Hostname} = inet:gethostname(), Node = list_to_atom(NodeStr ++ "@" ++ Hostname), Mod = list_to_atom(ModStr), - Trials = erlang:list_to_integer(TrialsStr), + NumTests = erlang:list_to_integer(NumTestsStr), + MaxSize = erlang:list_to_integer(MaxSizeStr), case rpc:call(Node, code, ensure_loaded, [proper]) of {module, proper} -> case rpc:call(Node, proper, module, - [Mod] ++ [[{numtests, Trials}, {constraint_tries, 200}]]) of + [Mod] ++ [[{numtests, NumTests}, + {max_size, MaxSize}, + {constraint_tries, 200}]]) of [] -> ok; - _ -> quit(1) + R -> io:format("~p.~n", [R]), + quit(1) end; {badrpc, Reason} -> io:format("Could not contact node ~p: ~p.~n", [Node, Reason]), diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index 69d5a9c9..74298440 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -58,3 +58,47 @@ fi ## Get configuration variables from the configure environment file [ -f ${CONF_ENV_FILE} ] && . ${CONF_ENV_FILE} || true + +##--- Set environment vars RABBITMQ_<var_name> to defaults if not set + +DEFAULT_NODE_IP_ADDRESS=auto +DEFAULT_NODE_PORT=5672 +[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} +[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} + +[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS} +[ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT} + +[ "x" = "x$RABBITMQ_DIST_PORT" ] && RABBITMQ_DIST_PORT=${DIST_PORT} +[ "x" = "x$RABBITMQ_DIST_PORT" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_DIST_PORT=$((${DEFAULT_NODE_PORT} + 20000)) +[ "x" = "x$RABBITMQ_DIST_PORT" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_DIST_PORT=$((${RABBITMQ_NODE_PORT} + 20000)) + +[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} +[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} +[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE} +[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE} +[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE} +[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS} +[ "x" = "x$RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS" ] && RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=${SERVER_ADDITIONAL_ERL_ARGS} +[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR} +[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME} + +[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${PID_FILE} +[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${RABBITMQ_MNESIA_DIR}.pid + +[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR} +[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand + +[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE} + +[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR} + +## Log rotation +[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS} +[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}.log" +[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS=${SASL_LOGS} +[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}-sasl.log" + +[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS} + +##--- End of overridden <var_name> variables diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins index bd7d0b6a..8b5b30b7 100755 --- a/scripts/rabbitmq-plugins +++ b/scripts/rabbitmq-plugins @@ -19,13 +19,6 @@ # Non-empty defaults should be set in rabbitmq-env . `dirname $0`/rabbitmq-env -##--- Set environment vars RABBITMQ_<var_name> to defaults if not set - -[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE} -[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR} - -##--- End of overridden <var_name> variables - exec ${ERL_DIR}erl \ -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ @@ -35,4 +28,5 @@ exec ${ERL_DIR}erl \ -s rabbit_plugins_main \ -enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \ -plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \ + -nodename $RABBITMQ_NODENAME \ -extra "$@" diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat index a535ebad..61e39e38 100755 --- a/scripts/rabbitmq-plugins.bat +++ b/scripts/rabbitmq-plugins.bat @@ -31,6 +31,10 @@ if "!RABBITMQ_BASE!"=="" ( set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
+if "!RABBITMQ_NODENAME!"=="" (
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
+)
+
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
@@ -51,7 +55,7 @@ if "!RABBITMQ_PLUGINS_DIR!"=="" ( set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index bd397441..686c90cf 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -19,48 +19,6 @@ # Non-empty defaults should be set in rabbitmq-env . `dirname $0`/rabbitmq-env -##--- Set environment vars RABBITMQ_<var_name> to defaults if not set - -DEFAULT_NODE_IP_ADDRESS=auto -DEFAULT_NODE_PORT=5672 -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} -[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} - -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS} -[ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT} - -[ "x" = "x$RABBITMQ_DIST_PORT" ] && RABBITMQ_DIST_PORT=${DIST_PORT} -[ "x" = "x$RABBITMQ_DIST_PORT" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_DIST_PORT=$((${DEFAULT_NODE_PORT} + 20000)) -[ "x" = "x$RABBITMQ_DIST_PORT" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_DIST_PORT=$((${RABBITMQ_NODE_PORT} + 20000)) - -[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} -[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} -[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE} -[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE} -[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE} -[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS} - -[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR} -[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME} - -[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${PID_FILE} -[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${RABBITMQ_MNESIA_DIR}.pid - -[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR} -[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand - -[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE} - -[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR} - -## Log rotation -[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS} -[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}.log" -[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS=${SASL_LOGS} -[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}-sasl.log" - -##--- End of overridden <var_name> variables - RABBITMQ_START_RABBIT= [ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT=" -noinput" [ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT="$RABBITMQ_START_RABBIT -s rabbit boot " @@ -131,6 +89,7 @@ exec ${ERL_DIR}erl \ ${RABBITMQ_CONFIG_ARG} \ +W w \ ${RABBITMQ_SERVER_ERL_ARGS} \ + ${RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS} \ ${RABBITMQ_LISTEN_ARG} \ -sasl errlog_type error \ -sasl sasl_error_logger false \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 043204fa..e2312406 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -147,6 +147,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( -kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
!RABBITMQ_SERVER_ERL_ARGS! ^
+!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
-sasl errlog_type error ^
-sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 895561d4..fb2703f2 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -235,6 +235,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^ -os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
!RABBITMQ_SERVER_START_ARGS! ^
+!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
!RABBITMQ_DIST_ARG! ^
!STARVAR!
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index 309abf2a..31fe0afc 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -19,20 +19,21 @@ # Non-empty defaults should be set in rabbitmq-env . `dirname $0`/rabbitmq-env -##--- Set environment vars RABBITMQ_<var_name> to defaults if not set - -[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} -[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS} - -##--- End of overridden <var_name> variables +# rabbitmqctl starts distribution itself, so we need to make sure epmd +# is running. +${ERL_DIR}erl -sname rabbitmqctl-prelaunch-$$ -noinput -eval 'erlang:halt().' +# We specify Mnesia dir and sasl error logger since some actions +# (e.g. forget_cluster_node --offline) require us to impersonate the +# real node. exec ${ERL_DIR}erl \ -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ -hidden \ ${RABBITMQ_CTL_ERL_ARGS} \ - -sname rabbitmqctl$$ \ -boot "${CLEAN_BOOT_FILE}" \ + -sasl errlog_type error \ + -mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \ -s rabbit_control_main \ -nodename $RABBITMQ_NODENAME \ -extra "$@" diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index 8e8ba1bd..7a2c454c 100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -23,6 +23,10 @@ set TDP0=%~dp0 set STAR=%*
setlocal enabledelayedexpansion
+if "!RABBITMQ_BASE!"=="" (
+ set RABBITMQ_BASE=!APPDATA!\RabbitMQ
+)
+
if "!COMPUTERNAME!"=="" (
set COMPUTERNAME=localhost
)
@@ -31,6 +35,14 @@ if "!RABBITMQ_NODENAME!"=="" ( set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
+if "!RABBITMQ_MNESIA_BASE!"=="" (
+ set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE!/db
+)
+
+if "!RABBITMQ_MNESIA_DIR!"=="" (
+ set RABBITMQ_MNESIA_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-mnesia
+)
+
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
@@ -43,7 +55,20 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( exit /B
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM!!TIME:~9! -s rabbit_control_main -nodename !RABBITMQ_NODENAME! -extra !STAR!
+rem rabbitmqctl starts distribution itself, so we need to make sure epmd
+rem is running.
+"!ERLANG_HOME!\bin\erl.exe" -sname rabbitmqctl-prelaunch-!RANDOM!!TIME:~9! -noinput -eval "erlang:halt()."
+
+"!ERLANG_HOME!\bin\erl.exe" ^
+-pa "!TDP0!..\ebin" ^
+-noinput ^
+-hidden ^
+!RABBITMQ_CTL_ERL_ARGS! ^
+-sasl errlog_type error ^
+-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
+-s rabbit_control_main ^
+-nodename !RABBITMQ_NODENAME! ^
+-extra !STAR!
endlocal
endlocal
diff --git a/src/app_utils.erl b/src/app_utils.erl index 0479ce66..87e6fa0b 100644 --- a/src/app_utils.erl +++ b/src/app_utils.erl @@ -17,7 +17,7 @@ -export([load_applications/1, start_applications/1, start_applications/2, stop_applications/1, stop_applications/2, app_dependency_order/2, - wait_for_applications/1]). + app_dependencies/1]). -ifdef(use_specs). @@ -28,8 +28,8 @@ -spec stop_applications([atom()]) -> 'ok'. -spec start_applications([atom()], error_handler()) -> 'ok'. -spec stop_applications([atom()], error_handler()) -> 'ok'. --spec wait_for_applications([atom()]) -> 'ok'. -spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()]. +-spec app_dependencies(atom()) -> [atom()]. -endif. @@ -68,14 +68,10 @@ stop_applications(Apps, ErrorHandler) -> ErrorHandler, Apps). - -wait_for_applications(Apps) -> - [wait_for_application(App) || App <- Apps], ok. - app_dependency_order(RootApps, StripUnreachable) -> {ok, G} = rabbit_misc:build_acyclic_graph( - fun (App, _Deps) -> [{App, App}] end, - fun (App, Deps) -> [{Dep, App} || Dep <- Deps] end, + fun ({App, _Deps}) -> [{App, App}] end, + fun ({App, Deps}) -> [{Dep, App} || Dep <- Deps] end, [{App, app_dependencies(App)} || {App, _Desc, _Vsn} <- application:loaded_applications()]), try @@ -92,13 +88,6 @@ app_dependency_order(RootApps, StripUnreachable) -> %%--------------------------------------------------------------------------- %% Private API -wait_for_application(Application) -> - case lists:keymember(Application, 1, rabbit_misc:which_applications()) of - true -> ok; - false -> timer:sleep(1000), - wait_for_application(Application) - end. - load_applications(Worklist, Loaded) -> case queue:out(Worklist) of {empty, _WorkList} -> @@ -388,6 +388,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_info/3]). +%% For INSTR_MOD callbacks +-export([call/3, cast/2, monitor/1, demonitor/1]). + -ifndef(use_specs). -export([behaviour_info/1]). -endif. @@ -616,6 +619,16 @@ handle_call({add_on_right, NewMember}, _From, members_state = MembersState1 }), handle_callback_result({Result, {ok, Group}, State1}). +%% add_on_right causes a catchup to be sent immediately from the left, +%% so we can never see this from the left neighbour. However, it's +%% possible for the right neighbour to send us a check_neighbours +%% immediately before that. We can't possibly handle it, but if we're +%% in this state we know a catchup is coming imminently anyway. So +%% just ignore it. +handle_cast({?TAG, _ReqVer, check_neighbours}, + State = #state { members_state = undefined }) -> + noreply(State); + handle_cast({?TAG, ReqVer, Msg}, State = #state { view = View, members_state = MembersState, @@ -1177,8 +1190,8 @@ can_erase_view_member(Self, Self, _LA, _LP) -> false; can_erase_view_member(_Self, _Id, N, N) -> true; can_erase_view_member(_Self, _Id, _LA, _LP) -> false. -neighbour_cast(N, Msg) -> gen_server2:cast(get_pid(N), Msg). -neighbour_call(N, Msg) -> gen_server2:call(get_pid(N), Msg, infinity). +neighbour_cast(N, Msg) -> ?INSTR_MOD:cast(get_pid(N), Msg). +neighbour_call(N, Msg) -> ?INSTR_MOD:call(get_pid(N), Msg, infinity). %% --------------------------------------------------------------------------- %% View monitoring and maintanence @@ -1192,7 +1205,7 @@ ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) -> ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) -> {RealNeighbour, MRef}; ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> - true = erlang:demonitor(MRef), + true = ?INSTR_MOD:demonitor(MRef), Msg = {?TAG, Ver, check_neighbours}, ok = neighbour_cast(RealNeighbour, Msg), ok = case Neighbour of @@ -1202,7 +1215,7 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> {Neighbour, maybe_monitor(Neighbour, Self)}. maybe_monitor( Self, Self) -> undefined; -maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)). +maybe_monitor(Other, _Self) -> ?INSTR_MOD:monitor(get_pid(Other)). check_neighbours(State = #state { self = Self, left = Left, @@ -1461,3 +1474,12 @@ last_pub( [], LP) -> LP; last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List), true = PubNum > LP, %% ASSERTION PubNum. + +%% --------------------------------------------------------------------------- + +%% Uninstrumented versions + +call(Pid, Msg, Timeout) -> gen_server2:call(Pid, Msg, Timeout). +cast(Pid, Msg) -> gen_server2:cast(Pid, Msg). +monitor(Pid) -> erlang:monitor(process, Pid). +demonitor(MRef) -> erlang:demonitor(MRef). diff --git a/src/gm_qc.erl b/src/gm_qc.erl new file mode 100644 index 00000000..394cbcbd --- /dev/null +++ b/src/gm_qc.erl @@ -0,0 +1,384 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(gm_qc). +-ifdef(use_proper_qc). + +-include_lib("proper/include/proper.hrl"). + +-define(GROUP, test_group). +-define(MAX_SIZE, 5). +-define(MSG_TIMEOUT, 1000000). %% micros + +-export([prop_gm_test/0]). + +-behaviour(proper_statem). +-export([initial_state/0, command/1, precondition/2, postcondition/3, + next_state/3]). + +-behaviour(gm). +-export([joined/2, members_changed/3, handle_msg/3, terminate/2]). + +%% Helpers +-export([do_join/0, do_leave/1, do_send/1, do_proceed1/1, do_proceed2/2]). + +%% For insertion into gm +-export([call/3, cast/2, monitor/1, demonitor/1, execute_mnesia_transaction/1]). + +-record(state, {seq, %% symbolic and dynamic + instrumented, %% dynamic only + outstanding, %% dynamic only + monitors, %% dynamic only + all_join, %% for symbolic + to_join, %% dynamic only + to_leave %% for symbolic + }). + +prop_gm_test() -> + case ?INSTR_MOD of + ?MODULE -> ok; + _ -> exit(compile_with_INSTRUMENT_FOR_QC) + end, + process_flag(trap_exit, true), + erlang:register(?MODULE, self()), + ?FORALL(Cmds, commands(?MODULE), gm_test(Cmds)). + +gm_test(Cmds) -> + {_H, State, Res} = run_commands(?MODULE, Cmds), + cleanup(State), + ?WHENFAIL( + io:format("Result: ~p~n", [Res]), + aggregate(command_names(Cmds), Res =:= ok)). + +cleanup(S) -> + S2 = ensure_joiners_joined_and_msgs_received(S), + All = gms_joined(S2), + All = gms(S2), %% assertion - none to join + check_stale_members(All), + [gm:leave(GM) || GM <- All], + drain_and_proceed_gms(S2), + [await_death(GM) || GM <- All], + gm:forget_group(?GROUP), + ok. + +check_stale_members(All) -> + GMs = [P || P <- processes(), is_gm_process(?GROUP, P)], + case GMs -- All of + [] -> ok; + Rest -> exit({forgot, Rest}) + end. + +is_gm_process(Group, P) -> + case process_info(P, dictionary) of + undefined -> false; + {dictionary, D} -> {gm, Group} =:= proplists:get_value(process_name, D) + end. + +await_death(P) -> + MRef = erlang:monitor(process, P), + await_death(MRef, P). + +await_death(MRef, P) -> + receive + {'DOWN', MRef, process, P, _} -> ok; + {'DOWN', _, _, _, _} -> await_death(MRef, P); + {'EXIT', _, normal} -> await_death(MRef, P); + {'EXIT', _, Reason} -> exit(Reason); + {joined, _GM} -> await_death(MRef, P); + {left, _GM} -> await_death(MRef, P); + Anything -> exit({stray_msg, Anything}) + end. + +%% --------------------------------------------------------------------------- +%% proper_statem +%% --------------------------------------------------------------------------- + +initial_state() -> #state{seq = 1, + outstanding = dict:new(), + instrumented = dict:new(), + monitors = dict:new(), + all_join = sets:new(), + to_join = sets:new(), + to_leave = sets:new()}. + +command(S) -> + case {length(gms_symb_not_left(S)), length(gms_symb(S))} of + {0, 0} -> qc_join(S); + {0, _} -> frequency([{1, qc_join(S)}, + {3, qc_proceed1(S)}, + {5, qc_proceed2(S)}]); + _ -> frequency([{1, qc_join(S)}, + {1, qc_leave(S)}, + {10, qc_send(S)}, + {5, qc_proceed1(S)}, + {15, qc_proceed2(S)}]) + end. + +qc_join(_S) -> {call,?MODULE,do_join, []}. +qc_leave(S) -> {call,?MODULE,do_leave,[oneof(gms_symb_not_left(S))]}. +qc_send(S) -> {call,?MODULE,do_send, [oneof(gms_symb_not_left(S))]}. +qc_proceed1(S) -> {call,?MODULE,do_proceed1, [oneof(gms_symb(S))]}. +qc_proceed2(S) -> {call,?MODULE,do_proceed2, [oneof(gms_symb(S)), + oneof(gms_symb(S))]}. + +precondition(S, {call, ?MODULE, do_join, []}) -> + length(gms_symb(S)) < ?MAX_SIZE; + +precondition(_S, {call, ?MODULE, do_leave, [_GM]}) -> + true; + +precondition(_S, {call, ?MODULE, do_send, [_GM]}) -> + true; + +precondition(_S, {call, ?MODULE, do_proceed1, [_GM]}) -> + true; + +precondition(_S, {call, ?MODULE, do_proceed2, [GM1, GM2]}) -> + GM1 =/= GM2. + +postcondition(_S, {call, _M, _F, _A}, _Res) -> + true. + +next_state(S = #state{to_join = ToSet, + all_join = AllSet}, GM, {call, ?MODULE, do_join, []}) -> + S#state{to_join = sets:add_element(GM, ToSet), + all_join = sets:add_element(GM, AllSet)}; + +next_state(S = #state{to_leave = Set}, _Res, {call, ?MODULE, do_leave, [GM]}) -> + S#state{to_leave = sets:add_element(GM, Set)}; + +next_state(S = #state{seq = Seq, + outstanding = Outstanding}, _Res, + {call, ?MODULE, do_send, [GM]}) -> + case is_pid(GM) andalso lists:member(GM, gms_joined(S)) of + true -> + %% Dynamic state, i.e. runtime + Msg = [{sequence, Seq}, + {sent_to, GM}, + {dests, gms_joined(S)}], + gm:broadcast(GM, Msg), + Outstanding1 = dict:map( + fun (_GM, Set) -> + gb_sets:add_element(Msg, Set) + end, Outstanding), + drain(S#state{seq = Seq + 1, + outstanding = Outstanding1}); + false -> + S + end; + +next_state(S, _Res, {call, ?MODULE, do_proceed1, [Pid]}) -> + proceed(Pid, S); + +next_state(S, _Res, {call, ?MODULE, do_proceed2, [From, To]}) -> + proceed({From, To}, S). + +proceed(K, S = #state{instrumented = Msgs}) -> + case dict:find(K, Msgs) of + {ok, Q} -> case queue:out(Q) of + {{value, Thing}, Q2} -> + S2 = proceed(K, Thing, S), + S2#state{instrumented = dict:store(K, Q2, Msgs)}; + {empty, _} -> + S + end; + error -> S + end. + +%% --------------------------------------------------------------------------- +%% GM +%% --------------------------------------------------------------------------- + +joined(Pid, _Members) -> Pid ! {joined, self()}, + ok. +members_changed(_Pid, _Bs, _Ds) -> ok. +handle_msg(Pid, _From, Msg) -> Pid ! {gm, self(), Msg}, ok. +terminate(Pid, _Reason) -> Pid ! {left, self()}. + +%% --------------------------------------------------------------------------- +%% Helpers +%% --------------------------------------------------------------------------- + +do_join() -> + {ok, GM} = gm:start_link(?GROUP, ?MODULE, self(), + fun execute_mnesia_transaction/1), + GM. + +do_leave(GM) -> + gm:leave(GM), + GM. + +%% We need to update the state, so do the work in next_state +do_send( _GM) -> ok. +do_proceed1(_Pid) -> ok. +do_proceed2(_From, _To) -> ok. + +%% All GMs, joined and to join +gms(#state{outstanding = Outstanding, + to_join = ToJoin}) -> + dict:fetch_keys(Outstanding) ++ sets:to_list(ToJoin). + +%% All GMs, joined and to join +gms_joined(#state{outstanding = Outstanding}) -> + dict:fetch_keys(Outstanding). + +%% All GMs including those that have left (symbolic) +gms_symb(#state{all_join = AllJoin}) -> + sets:to_list(AllJoin). + +%% All GMs not including those that have left (symbolic) +gms_symb_not_left(#state{all_join = AllJoin, + to_leave = ToLeave}) -> + sets:to_list(sets:subtract(AllJoin, ToLeave)). + +drain(S) -> + receive + Msg -> drain(handle_msg(Msg, S)) + after 10 -> S + end. + +drain_and_proceed_gms(S0) -> + S = #state{instrumented = Msgs} = drain(S0), + case dict:size(Msgs) of + 0 -> S; + _ -> S1 = dict:fold( + fun (Key, Q, Si) -> + lists:foldl( + fun (Msg, Sij) -> + proceed(Key, Msg, Sij) + end, Si, queue:to_list(Q)) + end, S, Msgs), + drain_and_proceed_gms(S1#state{instrumented = dict:new()}) + end. + +handle_msg({gm, GM, Msg}, S = #state{outstanding = Outstanding}) -> + case dict:find(GM, Outstanding) of + {ok, Set} -> + Set2 = gb_sets:del_element(Msg, Set), + S#state{outstanding = dict:store(GM, Set2, Outstanding)}; + error -> + %% Message from GM that has already died. OK. + S + end; +handle_msg({instrumented, Key, Thing}, S = #state{instrumented = Msgs}) -> + Q1 = case dict:find(Key, Msgs) of + {ok, Q} -> queue:in(Thing, Q); + error -> queue:from_list([Thing]) + end, + S#state{instrumented = dict:store(Key, Q1, Msgs)}; +handle_msg({joined, GM}, S = #state{outstanding = Outstanding, + to_join = ToJoin}) -> + S#state{outstanding = dict:store(GM, gb_sets:empty(), Outstanding), + to_join = sets:del_element(GM, ToJoin)}; +handle_msg({left, GM}, S = #state{outstanding = Outstanding, + to_join = ToJoin}) -> + true = dict:is_key(GM, Outstanding) orelse sets:is_element(GM, ToJoin), + S#state{outstanding = dict:erase(GM, Outstanding), + to_join = sets:del_element(GM, ToJoin)}; +handle_msg({'DOWN', MRef, _, From, _} = Msg, S = #state{monitors = Mons}) -> + To = dict:fetch(MRef, Mons), + handle_msg({instrumented, {From, To}, {info, Msg}}, + S#state{monitors = dict:erase(MRef, Mons)}); +handle_msg({'EXIT', _From, normal}, S) -> + S; +handle_msg({'EXIT', _From, Reason}, _S) -> + %% We just trapped exits to get nicer SASL logging. + exit(Reason). + +proceed({_From, To}, {cast, Msg}, S) -> gen_server2:cast(To, Msg), S; +proceed({_From, To}, {info, Msg}, S) -> To ! Msg, S; +proceed({From, _To}, {wait, Ref}, S) -> From ! {proceed, Ref}, S; +proceed({From, To}, {mon, Ref}, S) -> add_monitor(From, To, Ref, S); +proceed(_Pid, {demon, MRef}, S) -> erlang:demonitor(MRef), S; +proceed(Pid, {wait, Ref}, S) -> Pid ! {proceed, Ref}, S. + +%% NB From here is To in handle_msg/DOWN above, since the msg is going +%% the other way +add_monitor(From, To, Ref, S = #state{monitors = Mons}) -> + MRef = erlang:monitor(process, To), + From ! {mref, Ref, MRef}, + S#state{monitors = dict:store(MRef, From, Mons)}. + +%% ---------------------------------------------------------------------------- +%% Assertions +%% ---------------------------------------------------------------------------- + +ensure_joiners_joined_and_msgs_received(S0) -> + S = drain_and_proceed_gms(S0), + case outstanding_joiners(S) of + true -> ensure_joiners_joined_and_msgs_received(S); + false -> case outstanding_msgs(S) of + [] -> S; + Out -> exit({outstanding_msgs, Out}) + end + end. + +outstanding_joiners(#state{to_join = ToJoin}) -> + sets:size(ToJoin) > 0. + +outstanding_msgs(#state{outstanding = Outstanding}) -> + dict:fold(fun (GM, Set, OS) -> + case gb_sets:is_empty(Set) of + true -> OS; + false -> [{GM, gb_sets:to_list(Set)} | OS] + end + end, [], Outstanding). + +%% --------------------------------------------------------------------------- +%% For insertion into GM +%% --------------------------------------------------------------------------- + +call(Pid, Msg, infinity) -> + Ref = make_ref(), + whereis(?MODULE) ! {instrumented, {self(), Pid}, {wait, Ref}}, + receive + {proceed, Ref} -> ok + end, + gen_server2:call(Pid, Msg, infinity). + +cast(Pid, Msg) -> + whereis(?MODULE) ! {instrumented, {self(), Pid}, {cast, Msg}}, + ok. + +monitor(Pid) -> + Ref = make_ref(), + whereis(?MODULE) ! {instrumented, {self(), Pid}, {mon, Ref}}, + receive + {mref, Ref, MRef} -> MRef + end. + +demonitor(MRef) -> + whereis(?MODULE) ! {instrumented, self(), {demon, MRef}}, + true. + +execute_mnesia_transaction(Fun) -> + Ref = make_ref(), + whereis(?MODULE) ! {instrumented, self(), {wait, Ref}}, + receive + {proceed, Ref} -> ok + end, + rabbit_misc:execute_mnesia_transaction(Fun). + +-else. + +-export([prop_disabled/0]). + +prop_disabled() -> + exit({compiled_without_proper, + "PropEr was not present during compilation of the test module. " + "Hence all tests are disabled."}). + +-endif. diff --git a/src/rabbit.erl b/src/rabbit.erl index 29e38c1f..191f04a4 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -22,10 +22,9 @@ stop_and_halt/0, await_startup/0, status/0, is_running/0, is_running/1, environment/0, rotate_logs/1, force_event_refresh/1, start_fhc/0]). - -export([start/2, stop/1]). - --export([log_location/1]). %% for testing +-export([start_apps/1, stop_apps/1]). +-export([log_location/1, config_files/0]). %% for testing and mgmt-agent %%--------------------------------------------------------------------------- %% Boot steps. @@ -202,6 +201,7 @@ %% practice 2 processes seems just as fast as any other number > 1, %% and keeps the progress bar realistic-ish. -define(HIPE_PROCESSES, 2). +-define(ASYNC_THREADS_WARNING_THRESHOLD, 8). %%---------------------------------------------------------------------------- @@ -211,6 +211,7 @@ %% this really should be an abstract type -type(log_location() :: 'tty' | 'undefined' | file:filename()). -type(param() :: atom()). +-type(app_name() :: atom()). -spec(start/0 :: () -> 'ok'). -spec(boot/0 :: () -> 'ok'). @@ -242,6 +243,8 @@ -spec(maybe_insert_default_data/0 :: () -> 'ok'). -spec(boot_delegate/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). +-spec(start_apps/1 :: ([app_name()]) -> 'ok'). +-spec(stop_apps/1 :: ([app_name()]) -> 'ok'). -endif. @@ -312,9 +315,7 @@ start() -> ok = ensure_working_log_handlers(), rabbit_node_monitor:prepare_cluster_status_files(), rabbit_mnesia:check_cluster_consistency(), - ok = app_utils:start_applications( - app_startup_order(), fun handle_app_error/2), - ok = log_broker_started(rabbit_plugins:active()) + broker_start() end). boot() -> @@ -329,21 +330,14 @@ boot() -> %% the upgrade, since if we are a secondary node the %% primary node will have forgotten us rabbit_mnesia:check_cluster_consistency(), - Plugins = rabbit_plugins:setup(), - ToBeLoaded = Plugins ++ ?APPS, - ok = app_utils:load_applications(ToBeLoaded), - StartupApps = app_utils:app_dependency_order(ToBeLoaded, - false), - ok = app_utils:start_applications( - StartupApps, fun handle_app_error/2), - ok = log_broker_started(Plugins) + broker_start() end). -handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) -> - throw({could_not_start, App, Reason}); - -handle_app_error(App, Reason) -> - throw({could_not_start, App, Reason}). +broker_start() -> + Plugins = rabbit_plugins:setup(), + ToBeLoaded = Plugins ++ ?APPS, + start_apps(ToBeLoaded), + ok = log_broker_started(rabbit_plugins:active()). start_it(StartFun) -> Marker = spawn_link(fun() -> receive stop -> ok end end), @@ -371,10 +365,11 @@ start_it(StartFun) -> stop() -> case whereis(rabbit_boot) of undefined -> ok; - _ -> await_startup() + _ -> await_startup(true) end, rabbit_log:info("Stopping RabbitMQ~n"), - ok = app_utils:stop_applications(app_shutdown_order()). + Apps = ?APPS ++ rabbit_plugins:active(), + stop_apps(app_utils:app_dependency_order(Apps, true)). stop_and_halt() -> try @@ -385,8 +380,51 @@ stop_and_halt() -> end, ok. +start_apps(Apps) -> + app_utils:load_applications(Apps), + OrderedApps = app_utils:app_dependency_order(Apps, false), + case lists:member(rabbit, Apps) of + false -> run_boot_steps(Apps); %% plugin activation + true -> ok %% will run during start of rabbit app + end, + ok = app_utils:start_applications(OrderedApps, + handle_app_error(could_not_start)). + +stop_apps(Apps) -> + ok = app_utils:stop_applications( + Apps, handle_app_error(error_during_shutdown)), + case lists:member(rabbit, Apps) of + false -> run_cleanup_steps(Apps); %% plugin deactivation + true -> ok %% it's all going anyway + end, + ok. + +handle_app_error(Term) -> + fun(App, {bad_return, {_MFA, {'EXIT', {ExitReason, _}}}}) -> + throw({Term, App, ExitReason}); + (App, Reason) -> + throw({Term, App, Reason}) + end. + +run_cleanup_steps(Apps) -> + [run_step(Name, Attrs, cleanup) || {_, Name, Attrs} <- find_steps(Apps)], + ok. + await_startup() -> - app_utils:wait_for_applications(app_startup_order()). + await_startup(false). + +await_startup(HaveSeenRabbitBoot) -> + %% We don't take absence of rabbit_boot as evidence we've started, + %% since there's a small window before it is registered. + case whereis(rabbit_boot) of + undefined -> case HaveSeenRabbitBoot orelse is_running() of + true -> ok; + false -> timer:sleep(100), + await_startup(false) + end; + _ -> timer:sleep(100), + await_startup(true) + end. status() -> S1 = [{pid, list_to_integer(os:getpid())}, @@ -437,6 +475,9 @@ listeners() -> ip_address = IP, port = Port} <- Listeners, Node =:= node()]. +%% TODO this only determines if the rabbit application has started, +%% not if it is running, never mind plugins. It would be nice to have +%% more nuance here. is_running() -> is_running(node()). is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit). @@ -468,7 +509,8 @@ start(normal, []) -> true = register(rabbit, self()), print_banner(), log_banner(), - [ok = run_boot_step(Step) || Step <- boot_steps()], + warn_if_kernel_config_dubious(), + run_boot_steps(), {ok, SupPid}; Error -> Error @@ -483,42 +525,42 @@ stop(_State) -> ok. %%--------------------------------------------------------------------------- -%% application life cycle +%% boot step logic -app_startup_order() -> - ok = app_utils:load_applications(?APPS), - app_utils:app_dependency_order(?APPS, false). +run_boot_steps() -> + run_boot_steps([App || {App, _, _} <- application:loaded_applications()]). -app_shutdown_order() -> - Apps = ?APPS ++ rabbit_plugins:active(), - app_utils:app_dependency_order(Apps, true). +run_boot_steps(Apps) -> + [ok = run_step(Step, Attrs, mfa) || {_, Step, Attrs} <- find_steps(Apps)], + ok. -%%--------------------------------------------------------------------------- -%% boot step logic +find_steps(Apps) -> + All = sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)), + [Step || {App, _, _} = Step <- All, lists:member(App, Apps)]. -run_boot_step({_StepName, Attributes}) -> - case [MFA || {mfa, MFA} <- Attributes] of +run_step(StepName, Attributes, AttributeName) -> + case [MFA || {Key, MFA} <- Attributes, + Key =:= AttributeName] of [] -> ok; MFAs -> [try apply(M,F,A) of - ok -> ok; - {error, Reason} -> boot_error(Reason, not_available) + ok -> ok; + {error, Reason} -> boot_error({boot_step, StepName, Reason}, + not_available) catch - _:Reason -> boot_error(Reason, erlang:get_stacktrace()) + _:Reason -> boot_error({boot_step, StepName, Reason}, + erlang:get_stacktrace()) end || {M,F,A} <- MFAs], ok end. -boot_steps() -> - sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)). +vertices({AppName, _Module, Steps}) -> + [{StepName, {AppName, StepName, Atts}} || {StepName, Atts} <- Steps]. -vertices(_Module, Steps) -> - [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps]. - -edges(_Module, Steps) -> +edges({_AppName, _Module, Steps}) -> [case Key of requires -> {StepName, OtherStep}; enables -> {OtherStep, StepName} @@ -527,7 +569,7 @@ edges(_Module, Steps) -> Key =:= requires orelse Key =:= enables]. sort_boot_steps(UnsortedSteps) -> - case rabbit_misc:build_acyclic_graph(fun vertices/2, fun edges/2, + case rabbit_misc:build_acyclic_graph(fun vertices/1, fun edges/1, UnsortedSteps) of {ok, G} -> %% Use topological sort to find a consistent ordering (if @@ -541,8 +583,8 @@ sort_boot_steps(UnsortedSteps) -> digraph:delete(G), %% Check that all mentioned {M,F,A} triples are exported. case [{StepName, {M,F,A}} || - {StepName, Attributes} <- SortedSteps, - {mfa, {M,F,A}} <- Attributes, + {_App, StepName, Attributes} <- SortedSteps, + {mfa, {M,F,A}} <- Attributes, not erlang:function_exported(M, F, length(A))] of [] -> SortedSteps; MissingFunctions -> basic_boot_error( @@ -782,6 +824,31 @@ log_banner() -> end || S <- Settings]), error_logger:info_msg("~s", [Banner]). +warn_if_kernel_config_dubious() -> + case erlang:system_info(kernel_poll) of + true -> ok; + false -> error_logger:warning_msg( + "Kernel poll (epoll, kqueue, etc) is disabled. Throughput " + "and CPU utilization may worsen.~n") + end, + AsyncThreads = erlang:system_info(thread_pool_size), + case AsyncThreads < ?ASYNC_THREADS_WARNING_THRESHOLD of + true -> error_logger:warning_msg( + "Erlang VM is running with ~b I/O threads, " + "file I/O performance may worsen~n", [AsyncThreads]); + false -> ok + end, + IDCOpts = case application:get_env(kernel, inet_default_connect_options) of + undefined -> []; + {ok, Val} -> Val + end, + case proplists:get_value(nodelay, IDCOpts, false) of + false -> error_logger:warning_msg( + "Nagle's algorithm is enabled for sockets, " + "network I/O latency will be higher~n"); + true -> ok + end. + home_dir() -> case init:get_argument(home) of {ok, [[Home]]} -> Home; diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1aba7ecb..4e23dbd2 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -18,7 +18,7 @@ -export([recover/0, stop/0, start/1, declare/5, declare/6, delete_immediately/1, delete/3, purge/1, forget_all_durable/1]). --export([pseudo_queue/2]). +-export([pseudo_queue/2, immutable/1]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, @@ -29,8 +29,8 @@ -export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). --export([on_node_down/1]). --export([update/2, store_queue/1, policy_changed/2]). +-export([on_node_up/1, on_node_down/1]). +-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]). @@ -174,9 +174,12 @@ (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()). +-spec(immutable/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). -spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(update_decorators/1 :: (name()) -> 'ok'). -spec(policy_changed/2 :: (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -spec(start_mirroring/1 :: (pid()) -> 'ok'). @@ -254,15 +257,17 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> %% effect) this might not be possible to satisfy. declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> ok = check_declare_arguments(QueueName, Args), - Q = rabbit_policy:set(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none, - slave_pids = [], - sync_slave_pids = [], - gm_pids = []}), + Q = rabbit_queue_decorator:set( + rabbit_policy:set(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + sync_slave_pids = [], + down_slave_nodes = [], + gm_pids = []})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity). @@ -308,12 +313,24 @@ store_queue(Q = #amqqueue{durable = true}) -> ok = mnesia:write(rabbit_durable_queue, Q#amqqueue{slave_pids = [], sync_slave_pids = [], - gm_pids = []}, write), - ok = mnesia:write(rabbit_queue, Q, write), - ok; + gm_pids = [], + decorators = undefined}, write), + store_queue_ram(Q); store_queue(Q = #amqqueue{durable = false}) -> - ok = mnesia:write(rabbit_queue, Q, write), - ok. + store_queue_ram(Q). + +store_queue_ram(Q) -> + ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q), write). + +update_decorators(Name) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:wread({rabbit_queue, Name}) of + [Q] -> store_queue_ram(Q), + ok; + [] -> ok + end + end). policy_changed(Q1 = #amqqueue{decorators = Decorators1}, Q2 = #amqqueue{decorators = Decorators2}) -> @@ -650,15 +667,23 @@ forget_all_durable(Node) -> fun () -> Qs = mnesia:match_object(rabbit_durable_queue, #amqqueue{_ = '_'}, write), - [rabbit_binding:process_deletions( - internal_delete1(Name)) || - #amqqueue{name = Name, pid = Pid} = Q <- Qs, - node(Pid) =:= Node, - rabbit_policy:get(<<"ha-mode">>, Q) =:= undefined], + [forget_node_for_queue(Q) || #amqqueue{pid = Pid} = Q <- Qs, + node(Pid) =:= Node], ok end), ok. +forget_node_for_queue(#amqqueue{name = Name, + down_slave_nodes = []}) -> + %% No slaves to recover from, queue is gone + rabbit_binding:process_deletions(internal_delete1(Name)); + +forget_node_for_queue(Q = #amqqueue{down_slave_nodes = [H|T]}) -> + %% Promote a slave while down - it'll happily recover as a master + Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H), + down_slave_nodes = T}, + ok = mnesia:write(rabbit_durable_queue, Q1, write). + run_backing_queue(QPid, Mod, Fun) -> gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). @@ -674,6 +699,20 @@ stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring). sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors). +on_node_up(Node) -> + ok = rabbit_misc:execute_mnesia_transaction( + fun () -> + Qs = mnesia:match_object(rabbit_queue, + #amqqueue{_ = '_'}, write), + [case lists:member(Node, DSNs) of + true -> DSNs1 = DSNs -- [Node], + store_queue( + Q#amqqueue{down_slave_nodes = DSNs1}); + false -> ok + end || #amqqueue{down_slave_nodes = DSNs} = Q <- Qs], + ok + end). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = @@ -709,6 +748,14 @@ pseudo_queue(QueueName, Pid) -> pid = Pid, slave_pids = []}. +immutable(Q) -> Q#amqqueue{pid = none, + slave_pids = none, + sync_slave_pids = none, + down_slave_nodes = none, + gm_pids = none, + policy = none, + decorators = none}. + deliver([], _Delivery, _Flow) -> %% /dev/null optimisation []; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9b785303..63b18655 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -84,6 +84,7 @@ memory, slave_pids, synchronised_slave_pids, + down_slave_nodes, backing_queue_status, state ]). @@ -102,7 +103,8 @@ start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). -info_keys() -> ?INFO_KEYS. +info_keys() -> ?INFO_KEYS ++ rabbit_backing_queue:info_keys(). +statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys(). %%---------------------------------------------------------------------------- @@ -385,12 +387,12 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined, V when V > 0 -> V + 999; %% always fire later _ -> 0 end) div 1000, - TRef = erlang:send_after(After, self(), {drop_expired, Version}), + TRef = rabbit_misc:send_after(After, self(), {drop_expired, Version}), State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry}; ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef, ttl_timer_expiry = TExpiry}) when Expiry + 1000 < TExpiry -> - case erlang:cancel_timer(TRef) of + case rabbit_misc:cancel_timer(TRef) of false -> State; _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined}) end; @@ -810,19 +812,25 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> false -> ''; true -> SSPids end; +i(down_slave_nodes, #q{q = #amqqueue{name = Name, + durable = Durable}}) -> + {ok, Q = #amqqueue{down_slave_nodes = Nodes}} = + rabbit_amqqueue:lookup(Name), + case Durable andalso rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> ''; + true -> Nodes + end; i(state, #q{status = running}) -> credit_flow:state(); i(state, #q{status = State}) -> State; -i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> - BQ:status(BQS); -i(Item, _) -> - throw({bad_argument, Item}). +i(Item, #q{backing_queue_state = BQS, backing_queue = BQ}) -> + BQ:info(Item, BQS). emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> ExtraKs = [K || {K, _} <- Extra], - Infos = [{K, V} || {K, V} <- infos(?STATISTICS_KEYS, State), + Infos = [{K, V} || {K, V} <- infos(statistics_keys(), State), not lists:member(K, ExtraKs)], rabbit_event:notify(queue_stats, Extra ++ Infos). @@ -922,7 +930,7 @@ handle_call({init, Recover}, From, end; handle_call(info, _From, State) -> - reply(infos(?INFO_KEYS, State), State); + reply(infos(info_keys(), State), State); handle_call({info, Items}, _From, State) -> try @@ -1165,7 +1173,7 @@ handle_cast({force_event_refresh, Ref}, emit_consumer_created( Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref) end, - noreply(State); + noreply(rabbit_event:init_stats_timer(State, #q.stats_timer)); handle_cast(notify_decorators, State) -> notify_decorators(State), diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 8f37bf60..9e5f0813 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -16,6 +16,12 @@ -module(rabbit_backing_queue). +-export([info_keys/0]). + +-define(INFO_KEYS, [messages_ram, messages_ready_ram, + messages_unacknowledged_ram, messages_persistent, + backing_queue_status]). + -ifdef(use_specs). %% We can't specify a per-queue ack/state with callback signatures @@ -37,6 +43,8 @@ -type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)). -type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())). +-spec(info_keys/0 :: () -> rabbit_types:info_keys()). + %% Called on startup with a list of durable queue names. The queues %% aren't being started at this point, but this call allows the %% backing queue to perform any checking necessary for the consistency @@ -216,9 +224,7 @@ %% inbound messages and outbound messages at the moment. -callback msg_rates(state()) -> {float(), float()}. -%% Exists for debugging purposes, to be able to expose state via -%% rabbitmqctl list_queues backing_queue_status --callback status(state()) -> [{atom(), any()}]. +-callback info(atom(), state()) -> any(). %% Passed a function to be invoked with the relevant backing queue's %% state. Useful for when the backing queue or other components need @@ -243,9 +249,11 @@ behaviour_info(callbacks) -> {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, - {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, {status, 1}, - {invoke, 3}, {is_duplicate, 2}] ; + {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, {info_keys, 0}, + {infos, 2}, {invoke, 3}, {is_duplicate, 2}] ; behaviour_info(_Other) -> undefined. -endif. + +info_keys() -> ?INFO_KEYS. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 043ec7e3..d2f6719c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -341,7 +341,7 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> handle_cast({force_event_refresh, Ref}, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State), Ref), - noreply(State); + noreply(rabbit_event:init_stats_timer(State, #ch.stats_timer)); handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) -> %% NB: don't call noreply/1 since we don't want to send confirms. @@ -433,17 +433,22 @@ send(_Command, #ch{state = closing}) -> send(Command, #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Command). -handle_exception(Reason, State = #ch{protocol = Protocol, - channel = Channel, - writer_pid = WriterPid, - reader_pid = ReaderPid, - conn_pid = ConnPid}) -> +handle_exception(Reason, State = #ch{protocol = Protocol, + channel = Channel, + writer_pid = WriterPid, + reader_pid = ReaderPid, + conn_pid = ConnPid, + conn_name = ConnName, + virtual_host = VHost, + user = User}) -> %% something bad's happened: notify_queues may not be 'ok' {_Result, State1} = notify_queues(State), case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of {Channel, CloseMethod} -> - rabbit_log:error("connection ~p, channel ~p - soft error:~n~p~n", - [ConnPid, Channel, Reason]), + rabbit_log:error("Channel error on connection ~p (~s, vhost: '~s'," + " user: '~s'), channel ~p:~n~p~n", + [ConnPid, ConnName, VHost, User#user.username, + Channel, Reason]), ok = rabbit_writer:send_command(WriterPid, CloseMethod), {noreply, State1}; {0, _} -> @@ -668,8 +673,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, mandatory = Mandatory}, Content, State = #ch{virtual_host = VHostPath, tx = Tx, + channel = ChannelNum, confirm_enabled = ConfirmEnabled, - trace_state = TraceState}) -> + trace_state = TraceState, + user = #user{username = Username}, + conn_name = ConnName}) -> check_msg_size(Content), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), @@ -690,7 +698,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> - rabbit_trace:tap_in(Message, TraceState), + rabbit_trace:tap_in(Message, ConnName, ChannelNum, + Username, TraceState), Delivery = rabbit_basic:delivery( Mandatory, DoConfirm, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), @@ -992,7 +1001,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, QueueName, fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( Q, Durable, AutoDelete, Args, Owner), - rabbit_amqqueue:stat(Q) + maybe_stat(NoWait, Q) end) of {ok, MessageCount, ConsumerCount} -> return_queue_declare_ok(QueueName, NoWait, MessageCount, @@ -1048,7 +1057,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = rabbit_amqqueue:with_or_die( - QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), + QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end), ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); @@ -1204,6 +1213,9 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, E end. +maybe_stat(false, Q) -> rabbit_amqqueue:stat(Q); +maybe_stat(true, _Q) -> {ok, 0, 0}. + consumer_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, queue_monitors = QMons, @@ -1365,7 +1377,10 @@ record_sent(ConsumerTag, AckRequired, Msg = {QName, QPid, MsgId, Redelivered, _Message}, State = #ch{unacked_message_q = UAMQ, next_tag = DeliveryTag, - trace_state = TraceState}) -> + trace_state = TraceState, + user = #user{username = Username}, + conn_name = ConnName, + channel = ChannelNum}) -> ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of {none, true} -> get; {none, false} -> get_no_ack; @@ -1376,7 +1391,7 @@ record_sent(ConsumerTag, AckRequired, true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State); false -> ok end, - rabbit_trace:tap_out(Msg, TraceState), + rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState), UAMQ1 = case AckRequired of true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}}, UAMQ); diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl index 81c17fbf..db9349ac 100644 --- a/src/rabbit_channel_interceptor.erl +++ b/src/rabbit_channel_interceptor.erl @@ -33,7 +33,7 @@ -callback description() -> [proplists:property()]. -callback intercept(original_method(), rabbit_types:vhost()) -> - rabbit_types:ok_or_error2(processed_method(), any()). + processed_method() | rabbit_misc:channel_or_connection_exit(). %% Whether the interceptor wishes to intercept the amqp method -callback applies_to(intercept_method()) -> boolean(). @@ -62,20 +62,15 @@ intercept_method(M, VHost) -> intercept_method(M, _VHost, []) -> M; intercept_method(M, VHost, [I]) -> - case I:intercept(M, VHost) of - {ok, M2} -> - case validate_method(M, M2) of - true -> - M2; - _ -> - internal_error("Interceptor: ~p expected " - "to return method: ~p but returned: ~p", - [I, rabbit_misc:method_record_type(M), - rabbit_misc:method_record_type(M2)]) - end; - {error, Reason} -> - internal_error("Interceptor: ~p failed with reason: ~p", - [I, Reason]) + M2 = I:intercept(M, VHost), + case validate_method(M, M2) of + true -> + M2; + _ -> + internal_error("Interceptor: ~p expected " + "to return method: ~p but returned: ~p", + [I, rabbit_misc:method_record_type(M), + rabbit_misc:method_record_type(M2)]) end; intercept_method(M, _VHost, Is) -> internal_error("More than one interceptor for method: ~p -- ~p", diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 451f4d70..70fe10e6 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -54,6 +54,7 @@ change_cluster_node_type, update_cluster_nodes, {forget_cluster_node, [?OFFLINE_DEF]}, + force_boot, cluster_status, {sync_queue, [?VHOST_DEF]}, {cancel_sync_queue, [?VHOST_DEF]}, @@ -114,6 +115,12 @@ {"Policies", rabbit_policy, list_formatted, info_keys}, {"Parameters", rabbit_runtime_parameters, list_formatted, info_keys}]). +-define(COMMANDS_NOT_REQUIRING_APP, + [stop, stop_app, start_app, wait, reset, force_reset, rotate_logs, + join_cluster, change_cluster_node_type, update_cluster_nodes, + forget_cluster_node, cluster_status, status, environment, eval, + force_boot]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -131,6 +138,7 @@ %%---------------------------------------------------------------------------- start() -> + start_distribution(), {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), {Command, Opts, Args} = case parse_arguments(init:get_plain_arguments(), NodeStr) of @@ -154,7 +162,7 @@ start() -> %% The reason we don't use a try/catch here is that rpc:call turns %% thrown errors into normal return values - case catch action(Command, Node, Args, Opts, Inform) of + case catch do_action(Command, Node, Args, Opts, Inform) of ok -> case Quiet of true -> ok; @@ -249,6 +257,15 @@ parse_arguments(CmdLine, NodeStr) -> %%---------------------------------------------------------------------------- +do_action(Command, Node, Args, Opts, Inform) -> + case lists:member(Command, ?COMMANDS_NOT_REQUIRING_APP) of + false -> case ensure_app_running(Node) of + ok -> action(Command, Node, Args, Opts, Inform); + E -> E + end; + true -> action(Command, Node, Args, Opts, Inform) + end. + action(stop, Node, Args, _Opts, Inform) -> Inform("Stopping and halting node ~p", [Node]), Res = call(Node, {rabbit, stop_and_halt, []}), @@ -302,8 +319,19 @@ action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) -> ClusterNode = list_to_atom(ClusterNodeS), RemoveWhenOffline = proplists:get_bool(?OFFLINE_OPT, Opts), Inform("Removing node ~p from cluster", [ClusterNode]), - rpc_call(Node, rabbit_mnesia, forget_cluster_node, - [ClusterNode, RemoveWhenOffline]); + case RemoveWhenOffline of + true -> become(Node), + rabbit_mnesia:forget_cluster_node(ClusterNode, true); + false -> rpc_call(Node, rabbit_mnesia, forget_cluster_node, + [ClusterNode, false]) + end; + +action(force_boot, Node, [], _Opts, Inform) -> + Inform("Forcing boot for Mnesia dir ~s", [mnesia:system_info(directory)]), + case rabbit:is_running(Node) of + false -> rabbit_mnesia:force_load_next_boot(); + true -> {error, rabbit_running} + end; action(sync_queue, Node, [Q], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), @@ -650,6 +678,22 @@ exit_loop(Port) -> {Port, _} -> exit_loop(Port) end. +start_distribution() -> + CtlNodeName = rabbit_misc:format("rabbitmqctl-~s", [os:getpid()]), + {ok, _} = net_kernel:start([list_to_atom(CtlNodeName), shortnames]). + +become(BecomeNode) -> + case net_adm:ping(BecomeNode) of + pong -> exit({node_running, BecomeNode}); + pang -> io:format(" * Impersonating node: ~s...", [BecomeNode]), + error_logger:tty(false), + ok = net_kernel:stop(), + {ok, _} = net_kernel:start([BecomeNode, shortnames]), + io:format(" done~n", []), + Dir = mnesia:system_info(directory), + io:format(" * Mnesia directory : ~s~n", [Dir]) + end. + %%---------------------------------------------------------------------------- default_if_empty(List, Default) when is_list(List) -> @@ -715,6 +759,17 @@ unsafe_rpc(Node, Mod, Fun, Args) -> Normal -> Normal end. +ensure_app_running(Node) -> + case call(Node, {rabbit, is_running, []}) of + true -> ok; + false -> {error_string, + rabbit_misc:format( + "rabbit application is not running on node ~s.~n" + " * Suggestion: start it with \"rabbitmqctl start_app\" " + "and try again", [Node])}; + Other -> Other + end. + call(Node, {Mod, Fun, Args}) -> rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)). diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index ec32e687..728bc431 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -129,6 +129,9 @@ is_cycle(Queue, Deaths) -> {longstr, <<"rejected">>} =/= rabbit_misc:table_lookup(D, <<"reason">>); (_) -> + %% There was something we didn't expect, therefore + %% a client must have put it there, therefore the + %% cycle was not "fully automatic". false end, Cycle ++ [H]) end. diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index b867223b..a33103fd 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -23,6 +23,7 @@ ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]). -export([stats_level/2, if_enabled/3]). -export([notify/2, notify/3, notify_if/3]). +-export([sync_notify/2, sync_notify/3]). %%---------------------------------------------------------------------------- @@ -61,6 +62,9 @@ -spec(notify/2 :: (event_type(), event_props()) -> 'ok'). -spec(notify/3 :: (event_type(), event_props(), reference() | 'none') -> 'ok'). -spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok'). +-spec(sync_notify/2 :: (event_type(), event_props()) -> 'ok'). +-spec(sync_notify/3 :: (event_type(), event_props(), + reference() | 'none') -> 'ok'). -endif. @@ -145,7 +149,16 @@ notify_if(false, _Type, _Props) -> ok. notify(Type, Props) -> notify(Type, Props, none). notify(Type, Props, Ref) -> - gen_event:notify(?MODULE, #event{type = Type, - props = Props, - reference = Ref, - timestamp = os:timestamp()}). + gen_event:notify(?MODULE, event_cons(Type, Props, Ref)). + +sync_notify(Type, Props) -> sync_notify(Type, Props, none). + +sync_notify(Type, Props, Ref) -> + gen_event:sync_notify(?MODULE, event_cons(Type, Props, Ref)). + +event_cons(Type, Props, Ref) -> + #event{type = Type, + props = Props, + reference = Ref, + timestamp = os:timestamp()}. + diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 4d4a2a58..a1772f0a 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -20,7 +20,8 @@ -export([recover/0, policy_changed/2, callback/4, declare/6, assert_equivalence/6, assert_args_equivalence/2, check_type/1, - lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3, + lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2, + update_scratch/3, update_decorators/1, immutable/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, route/2, delete/2, validate_binding/2]). %% these must be run inside a mnesia tx @@ -61,6 +62,7 @@ -spec(lookup_or_die/1 :: (name()) -> rabbit_types:exchange() | rabbit_types:channel_exit()). +-spec(list/0 :: () -> [rabbit_types:exchange()]). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]). -spec(lookup_scratch/2 :: (name(), atom()) -> rabbit_types:ok(term()) | @@ -70,6 +72,8 @@ (name(), fun((rabbit_types:exchange()) -> rabbit_types:exchange())) -> not_found | rabbit_types:exchange()). +-spec(update_decorators/1 :: (name()) -> 'ok'). +-spec(immutable/1 :: (rabbit_types:exchange()) -> rabbit_types:exchange()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()). -spec(info/2 :: @@ -106,24 +110,15 @@ recover() -> mnesia:read({rabbit_exchange, XName}) =:= [] end, fun (X, Tx) -> - case Tx of - true -> store(X); - false -> ok - end, - callback(X, create, map_create_tx(Tx), [X]) + X1 = case Tx of + true -> store_ram(X); + false -> rabbit_exchange_decorator:set(X) + end, + callback(X1, create, map_create_tx(Tx), [X1]) end, rabbit_durable_exchange), - report_missing_decorators(Xs), [XName || #exchange{name = XName} <- Xs]. -report_missing_decorators(Xs) -> - Mods = lists:usort(lists:append([rabbit_exchange_decorator:select(raw, D) || - #exchange{decorators = D} <- Xs])), - case [M || M <- Mods, code:which(M) =:= non_existing] of - [] -> ok; - M -> rabbit_log:warning("Missing exchange decorators: ~p~n", [M]) - end. - callback(X = #exchange{type = XType, decorators = Decorators}, Fun, Serial0, Args) -> Serial = if is_function(Serial0) -> Serial0; @@ -158,12 +153,13 @@ serial(#exchange{name = XName} = X) -> end. declare(XName, Type, Durable, AutoDelete, Internal, Args) -> - X = rabbit_policy:set(#exchange{name = XName, - type = Type, - durable = Durable, - auto_delete = AutoDelete, - internal = Internal, - arguments = Args}), + X = rabbit_exchange_decorator:set( + rabbit_policy:set(#exchange{name = XName, + type = Type, + durable = Durable, + auto_delete = AutoDelete, + internal = Internal, + arguments = Args})), XT = type_to_module(Type), %% We want to upset things if it isn't ok ok = XT:validate(X), @@ -171,13 +167,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> fun () -> case mnesia:wread({rabbit_exchange, XName}) of [] -> - store(X), - ok = case Durable of - true -> mnesia:write(rabbit_durable_exchange, - X, write); - false -> ok - end, - {new, X}; + {new, store(X)}; [ExistingX] -> {existing, ExistingX} end @@ -195,7 +185,19 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> map_create_tx(true) -> transaction; map_create_tx(false) -> none. -store(X) -> ok = mnesia:write(rabbit_exchange, X, write). + +store(X = #exchange{durable = true}) -> + mnesia:write(rabbit_durable_exchange, X#exchange{decorators = undefined}, + write), + store_ram(X); +store(X = #exchange{durable = false}) -> + store_ram(X). + +store_ram(X) -> + X1 = rabbit_exchange_decorator:set(X), + ok = mnesia:write(rabbit_exchange, rabbit_exchange_decorator:set(X1), + write), + X1. %% Used with binaries sent over the wire; the type may not exist. check_type(TypeBin) -> @@ -243,6 +245,8 @@ lookup_or_die(Name) -> {error, not_found} -> rabbit_misc:not_found(Name) end. +list() -> mnesia:dirty_match_object(rabbit_exchange, #exchange{_ = '_'}). + %% Not dirty_match_object since that would not be transactional when used in a %% tx context list(VHostPath) -> @@ -287,20 +291,27 @@ update_scratch(Name, App, Fun) -> ok end). +update_decorators(Name) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:wread({rabbit_exchange, Name}) of + [X] -> store_ram(X), + ok; + [] -> ok + end + end). + update(Name, Fun) -> case mnesia:wread({rabbit_exchange, Name}) of - [X = #exchange{durable = Durable}] -> - X1 = Fun(X), - ok = mnesia:write(rabbit_exchange, X1, write), - case Durable of - true -> ok = mnesia:write(rabbit_durable_exchange, X1, write); - _ -> ok - end, - X1; - [] -> - not_found + [X] -> X1 = Fun(X), + store(X1); + [] -> not_found end. +immutable(X) -> X#exchange{scratches = none, + policy = none, + decorators = none}. + info_keys() -> ?INFO_KEYS. map(VHostPath, F) -> diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index 2f056b1b..900f9c32 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([select/2, set/1]). +-export([select/2, set/1, register/2, unregister/1]). %% This is like an exchange type except that: %% @@ -104,3 +104,25 @@ list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)]. cons_if_eq(Select, Select, Item, List) -> [Item | List]; cons_if_eq(_Select, _Other, _Item, List) -> List. + +register(TypeName, ModuleName) -> + rabbit_registry:register(exchange_decorator, TypeName, ModuleName), + [maybe_recover(X) || X <- rabbit_exchange:list()], + ok. + +unregister(TypeName) -> + rabbit_registry:unregister(exchange_decorator, TypeName), + [maybe_recover(X) || X <- rabbit_exchange:list()], + ok. + +maybe_recover(X = #exchange{name = Name, + decorators = Decs}) -> + #exchange{decorators = Decs1} = set(X), + Old = lists:sort(select(all, Decs)), + New = lists:sort(select(all, Decs1)), + case New of + Old -> ok; + _ -> %% TODO create a tx here for non-federation decorators + [M:create(none, X) || M <- New -- Old], + rabbit_exchange:update_decorators(Name) + end. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 2b16b911..9bccf5dd 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -22,7 +22,7 @@ len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, - msg_rates/1, status/1, invoke/3, is_duplicate/2]). + msg_rates/1, info/2, invoke/3, is_duplicate/2]). -export([start/1, stop/0]). @@ -170,10 +170,24 @@ terminate({shutdown, dropped} = Reason, State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}; terminate(Reason, - State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + State = #state { name = QName, + backing_queue = BQ, + backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but %% shouldn't be deleted. Most likely safe shutdown of this - %% node. Thus just let some other slave take over. + %% node. + {ok, Q = #amqqueue{sync_slave_pids = SSPids}} = + rabbit_amqqueue:lookup(QName), + case SSPids =:= [] andalso + rabbit_policy:get(<<"ha-promote-on-shutdown">>, Q) =/= <<"always">> of + true -> %% Remove the whole queue to avoid data loss + rabbit_mirror_queue_misc:log_warning( + QName, "Stopping all nodes on master shutdown since no " + "synchronised slave is available~n", []), + stop_all_slaves(Reason, State); + false -> %% Just let some other slave take over. + ok + end, State #state { backing_queue_state = BQ:terminate(Reason, BQS) }. delete_and_terminate(Reason, State = #state { backing_queue = BQ, @@ -181,7 +195,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ, stop_all_slaves(Reason, State), State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}. -stop_all_slaves(Reason, #state{name = QName, gm = GM}) -> +stop_all_slaves(Reason, #state{name = QName, gm = GM}) -> {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), MRefs = [erlang:monitor(process, Pid) || Pid <- [GM | SPids]], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), @@ -360,10 +374,13 @@ resume(State = #state { backing_queue = BQ, msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:msg_rates(BQS). -status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - BQ:status(BQS) ++ +info(backing_queue_status, + State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:info(backing_queue_status, BQS) ++ [ {mirror_seen, dict:size(State #state.seen_status)}, - {mirror_senders, sets:size(State #state.known_senders)} ]. + {mirror_senders, sets:size(State #state.known_senders)} ]; +info(Item, #state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:info(Item, BQS). invoke(?MODULE, Fun, State) -> Fun(?MODULE, State); diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index b0f092a9..9e8c4a18 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -29,16 +29,19 @@ -include("rabbit.hrl"). --rabbit_boot_step({?MODULE, - [{description, "HA policy validation"}, - {mfa, {rabbit_registry, register, - [policy_validator, <<"ha-mode">>, ?MODULE]}}, - {mfa, {rabbit_registry, register, - [policy_validator, <<"ha-params">>, ?MODULE]}}, - {mfa, {rabbit_registry, register, - [policy_validator, <<"ha-sync-mode">>, ?MODULE]}}, - {requires, rabbit_registry}, - {enables, recovery}]}). +-rabbit_boot_step( + {?MODULE, + [{description, "HA policy validation"}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-mode">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-params">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-sync-mode">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, recovery}]}). %%---------------------------------------------------------------------------- @@ -75,9 +78,10 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> %% get here. case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [Q = #amqqueue { pid = QPid, - slave_pids = SPids, - gm_pids = GMPids }] -> + [Q = #amqqueue { pid = QPid, + slave_pids = SPids, + gm_pids = GMPids, + down_slave_nodes = DSNs}] -> {DeadGM, AliveGM} = lists:partition( fun ({GM, _}) -> lists:member(GM, DeadGMPids) @@ -86,6 +90,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> AlivePids = [Pid || {_GM, Pid} <- AliveGM], Alive = [Pid || Pid <- [QPid | SPids], lists:member(Pid, AlivePids)], + DSNs1 = [node(Pid) || + Pid <- SPids, + not lists:member(Pid, AlivePids)] ++ DSNs, {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> @@ -94,9 +101,10 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. - Q1 = Q#amqqueue{pid = QPid1, - slave_pids = SPids1, - gm_pids = AliveGM}, + Q1 = Q#amqqueue{pid = QPid1, + slave_pids = SPids1, + gm_pids = AliveGM, + down_slave_nodes = DSNs1}, store_updated_slaves(Q1), %% If we add and remove nodes at the same time we %% might tell the old master we need to sync and @@ -106,8 +114,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> _ -> %% Master has changed, and we're not it. %% [1]. - Q1 = Q#amqqueue{slave_pids = Alive, - gm_pids = AliveGM}, + Q1 = Q#amqqueue{slave_pids = Alive, + gm_pids = AliveGM, + down_slave_nodes = DSNs1}, store_updated_slaves(Q1) end, {ok, QPid1, DeadPids} @@ -236,12 +245,16 @@ log(Level, QName, Fmt, Args) -> rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt, [rabbit_misc:rs(QName) | Args]). -store_updated_slaves(Q = #amqqueue{slave_pids = SPids, - sync_slave_pids = SSPids}) -> +store_updated_slaves(Q = #amqqueue{pid = MPid, + slave_pids = SPids, + sync_slave_pids = SSPids, + down_slave_nodes = DSNs}) -> %% TODO now that we clear sync_slave_pids in rabbit_durable_queue, %% do we still need this filtering? SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)], - Q1 = Q#amqqueue{sync_slave_pids = SSPids1}, + DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]], + Q1 = Q#amqqueue{sync_slave_pids = SSPids1, + down_slave_nodes = DSNs1}, ok = rabbit_amqqueue:store_queue(Q1), %% Wake it up so that we emit a stats event rabbit_amqqueue:notify_policy_changed(Q1), @@ -374,16 +387,21 @@ validate_policy(KeyList) -> Mode = proplists:get_value(<<"ha-mode">>, KeyList, none), Params = proplists:get_value(<<"ha-params">>, KeyList, none), SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none), - case {Mode, Params, SyncMode} of - {none, none, none} -> + PromoteOnShutdown = proplists:get_value( + <<"ha-promote-on-shutdown">>, KeyList, none), + case {Mode, Params, SyncMode, PromoteOnShutdown} of + {none, none, none, none} -> ok; - {none, _, _} -> - {error, "ha-mode must be specified to specify ha-params or " - "ha-sync-mode", []}; + {none, _, _, _} -> + {error, "ha-mode must be specified to specify ha-params, " + "ha-sync-mode or ha-promote-on-shutdown", []}; _ -> case module(Mode) of {ok, M} -> case M:validate_policy(Params) of - ok -> validate_sync_mode(SyncMode); + ok -> case validate_sync_mode(SyncMode) of + ok -> validate_pos(PromoteOnShutdown); + E -> E + end; E -> E end; _ -> {error, "~p is not a valid ha-mode value", [Mode]} @@ -398,3 +416,12 @@ validate_sync_mode(SyncMode) -> Mode -> {error, "ha-sync-mode must be \"manual\" " "or \"automatic\", got ~p", [Mode]} end. + +validate_pos(PromoteOnShutdown) -> + case PromoteOnShutdown of + <<"always">> -> ok; + <<"when-synced">> -> ok; + none -> ok; + Mode -> {error, "ha-promote-on-shutdown must be " + "\"always\" or \"when-synced\", got ~p", [Mode]} + end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 11d6a79c..cc06ae44 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -653,8 +653,9 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> timed -> {ensure_sync_timer(State1), 0 } end. -backing_queue_timeout(State = #state { backing_queue = BQ }) -> - run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State). +backing_queue_timeout(State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State#state{backing_queue_state = BQ:timeout(BQS)}. ensure_sync_timer(State) -> rabbit_misc:ensure_timer(State, #state.sync_timer_ref, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 006bbadf..09355f3f 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -45,7 +45,7 @@ -export([with_local_io/1, local_info_msg/2]). -export([unfold/2, ceil/1, queue_fold/3]). -export([sort_field_table/1]). --export([pid_to_string/1, string_to_pid/1]). +-export([pid_to_string/1, string_to_pid/1, node_to_fake_pid/1]). -export([version_compare/2, version_compare/3]). -export([version_minor_equivalent/2]). -export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). @@ -67,10 +67,11 @@ -export([check_expiry/1]). -export([base64url/1]). -export([interval_operation/4]). --export([ensure_timer/4, stop_timer/2]). +-export([ensure_timer/4, stop_timer/2, send_after/3, cancel_timer/1]). -export([get_parent/0]). -export([store_proc_name/1, store_proc_name/2]). -export([moving_average/4]). +-export([now_to_ms/1]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -81,7 +82,7 @@ -ifdef(use_specs). --export_type([resource_name/0, thunk/1]). +-export_type([resource_name/0, thunk/1, channel_or_connection_exit/0]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). @@ -94,6 +95,7 @@ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])). -type(graph_edge_fun() :: fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])). +-type(tref() :: {'erlang', reference()} | {timer, timer:tref()}). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). @@ -192,6 +194,7 @@ (rabbit_framing:amqp_table()) -> rabbit_framing:amqp_table()). -spec(pid_to_string/1 :: (pid()) -> string()). -spec(string_to_pid/1 :: (string()) -> pid()). +-spec(node_to_fake_pid/1 :: (atom()) -> pid()). -spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt'). -spec(version_compare/3 :: (string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) @@ -209,7 +212,8 @@ [string()]) -> {'ok', {atom(), [{string(), string()}], [string()]}} | 'no_command'). --spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]). +-spec(all_module_attributes/1 :: + (atom()) -> [{atom(), atom(), [term()]}]). -spec(build_acyclic_graph/3 :: (graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}]) -> rabbit_types:ok_or_error2(digraph(), @@ -245,11 +249,16 @@ -> {any(), non_neg_integer()}). -spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A). -spec(stop_timer/2 :: (A, non_neg_integer()) -> A). +-spec(send_after/3 :: (non_neg_integer(), pid(), any()) -> tref()). +-spec(cancel_timer/1 :: (tref()) -> 'ok'). -spec(get_parent/0 :: () -> pid()). -spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok). -spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok). -spec(moving_average/4 :: (float(), float(), float(), float() | 'undefined') -> float()). +-spec(now_to_ms/1 :: ({non_neg_integer(), + non_neg_integer(), + non_neg_integer()}) -> pos_integer()). -endif. %%---------------------------------------------------------------------------- @@ -705,6 +714,10 @@ string_to_pid(Str) -> throw(Err) end. +%% node(node_to_fake_pid(Node)) =:= Node. +node_to_fake_pid(Node) -> + string_to_pid(format("<~s.0.0.0>", [Node])). + version_compare(A, B, lte) -> case version_compare(A, B) of eq -> true; @@ -849,20 +862,20 @@ module_attributes(Module) -> end. all_module_attributes(Name) -> - Modules = + Targets = lists:usort( lists:append( - [Modules || {App, _, _} <- application:loaded_applications(), - {ok, Modules} <- [application:get_key(App, modules)]])), + [[{App, Module} || Module <- Modules] || + {App, _, _} <- application:loaded_applications(), + {ok, Modules} <- [application:get_key(App, modules)]])), lists:foldl( - fun (Module, Acc) -> + fun ({App, Module}, Acc) -> case lists:append([Atts || {N, Atts} <- module_attributes(Module), N =:= Name]) of [] -> Acc; - Atts -> [{Module, Atts} | Acc] + Atts -> [{App, Module, Atts} | Acc] end - end, [], Modules). - + end, [], Targets). build_acyclic_graph(VertexFun, EdgeFun, Graph) -> G = digraph:new([acyclic]), @@ -870,13 +883,13 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) -> [case digraph:vertex(G, Vertex) of false -> digraph:add_vertex(G, Vertex, Label); _ -> ok = throw({graph_error, {vertex, duplicate, Vertex}}) - end || {Module, Atts} <- Graph, - {Vertex, Label} <- VertexFun(Module, Atts)], + end || GraphElem <- Graph, + {Vertex, Label} <- VertexFun(GraphElem)], [case digraph:add_edge(G, From, To) of {error, E} -> throw({graph_error, {edge, E, From, To}}); _ -> ok - end || {Module, Atts} <- Graph, - {From, To} <- EdgeFun(Module, Atts)], + end || GraphElem <- Graph, + {From, To} <- EdgeFun(GraphElem)], {ok, G} catch {graph_error, Reason} -> true = digraph:delete(G), @@ -1017,7 +1030,9 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse V =:= true orelse V =:= false -> V. -check_expiry(N) when N > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, N}}; +now_to_ms({Mega, Sec, Micro}) -> + (Mega * 1000000 * 1000000 + Sec * 1000000 + Micro) div 1000. + check_expiry(N) when N < 0 -> {error, {value_negative, N}}; check_expiry(_N) -> ok. @@ -1045,7 +1060,7 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) -> ensure_timer(State, Idx, After, Msg) -> case element(Idx, State) of - undefined -> TRef = erlang:send_after(After, self(), Msg), + undefined -> TRef = send_after(After, self(), Msg), setelement(Idx, State, TRef); _ -> State end. @@ -1053,12 +1068,25 @@ ensure_timer(State, Idx, After, Msg) -> stop_timer(State, Idx) -> case element(Idx, State) of undefined -> State; - TRef -> case erlang:cancel_timer(TRef) of - false -> State; - _ -> setelement(Idx, State, undefined) - end + TRef -> cancel_timer(TRef), + setelement(Idx, State, undefined) end. +%% timer:send_after/3 goes through a single timer process but allows +%% long delays. erlang:send_after/3 does not have a bottleneck but +%% only allows max 2^32-1 millis. +-define(MAX_ERLANG_SEND_AFTER, 4294967295). +send_after(Millis, Pid, Msg) when Millis > ?MAX_ERLANG_SEND_AFTER -> + {ok, Ref} = timer:send_after(Millis, Pid, Msg), + {timer, Ref}; +send_after(Millis, Pid, Msg) -> + {erlang, erlang:send_after(Millis, Pid, Msg)}. + +cancel_timer({erlang, Ref}) -> erlang:cancel_timer(Ref), + ok; +cancel_timer({timer, Ref}) -> {ok, cancel} = timer:cancel(Ref), + ok. + store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}). store_proc_name(TypeProcName) -> put(process_name, TypeProcName). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index c6c2c8eb..630d9853 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -23,6 +23,7 @@ update_cluster_nodes/1, change_cluster_node_type/1, forget_cluster_node/2, + force_load_next_boot/0, status/0, is_clustered/0, @@ -63,6 +64,7 @@ -spec(update_cluster_nodes/1 :: (node()) -> 'ok'). -spec(change_cluster_node_type/1 :: (node_type()) -> 'ok'). -spec(forget_cluster_node/2 :: (node(), boolean()) -> 'ok'). +-spec(force_load_next_boot/0 :: () -> 'ok'). %% Various queries to get the status of the db -spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} | @@ -303,7 +305,6 @@ remove_node_offline_node(Node) -> e(removing_node_from_offline_node) end. - %%---------------------------------------------------------------------------- %% Queries %%---------------------------------------------------------------------------- diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 9082dbd3..96448f32 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -37,8 +37,6 @@ -include("rabbit.hrl"). -include_lib("kernel/include/inet.hrl"). --define(SSL_TIMEOUT, 5). %% seconds - -define(FIRST_TEST_BIND_PORT, 10000). %%---------------------------------------------------------------------------- @@ -168,9 +166,14 @@ ensure_ssl() -> end end. +ssl_timeout() -> + {ok, Val} = application:get_env(rabbit, ssl_handshake_timeout), + Val. + ssl_transform_fun(SslOpts) -> fun (Sock) -> - case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of + Timeout = ssl_timeout(), + case catch ssl:ssl_accept(Sock, SslOpts, Timeout) of {ok, SslSock} -> {ok, #ssl_socket{tcp = Sock, ssl = SslSock}}; {error, timeout} -> @@ -185,7 +188,7 @@ ssl_transform_fun(SslOpts) -> %% form, according to the TLS spec). So we give %% the ssl_connection a little bit of time to send %% such alerts. - timer:sleep(?SSL_TIMEOUT * 1000), + timer:sleep(Timeout), {error, {ssl_upgrade_error, Reason}}; {'EXIT', Reason} -> {error, {ssl_upgrade_failure, Reason}} diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index ca843e14..7eaeaf50 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -458,6 +458,7 @@ ensure_ping_timer(State) -> State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL, ping_nodes). handle_live_rabbit(Node) -> + ok = rabbit_amqqueue:on_node_up(Node), ok = rabbit_alarm:on_node_up(Node), ok = rabbit_mnesia:on_node_up(Node). diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index c0fb05e2..9acaa1d4 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -18,6 +18,7 @@ -include("rabbit.hrl"). -export([setup/0, active/0, read_enabled/1, list/1, dependencies/3]). +-export([ensure/1]). %%---------------------------------------------------------------------------- @@ -31,22 +32,54 @@ -spec(read_enabled/1 :: (file:filename()) -> [plugin_name()]). -spec(dependencies/3 :: (boolean(), [plugin_name()], [#plugin{}]) -> [plugin_name()]). - +-spec(ensure/1 :: (string()) -> {'ok', [atom()], [atom()]} | {error, any()}). -endif. %%---------------------------------------------------------------------------- +ensure(FileJustChanged) -> + {ok, OurFile} = application:get_env(rabbit, enabled_plugins_file), + case OurFile of + FileJustChanged -> + {ok, Dir} = application:get_env(rabbit, plugins_dir), + Enabled = read_enabled(OurFile), + Wanted = dependencies(false, Enabled, list(Dir)), + prepare_plugins(Enabled), + Current = active(), + Start = Wanted -- Current, + Stop = Current -- Wanted, + rabbit:start_apps(Start), + %% We need sync_notify here since mgmt will attempt to look at all + %% the modules for the disabled plugins - if they are unloaded + %% that won't work. + ok = rabbit_event:sync_notify(plugins_changed, [{enabled, Start}, + {disabled, Stop}]), + rabbit:stop_apps(Stop), + clean_plugins(Stop), + {ok, Start, Stop}; + _ -> + {error, {enabled_plugins_mismatch, FileJustChanged, OurFile}} + end. + %% @doc Prepares the file system and installs all enabled plugins. setup() -> - {ok, PluginDir} = application:get_env(rabbit, plugins_dir), {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), + + %% Eliminate the contents of the destination directory + case delete_recursively(ExpandDir) of + ok -> ok; + {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir, + [ExpandDir, E1]}}) + end, + {ok, EnabledFile} = application:get_env(rabbit, enabled_plugins_file), - prepare_plugins(EnabledFile, PluginDir, ExpandDir). + Enabled = read_enabled(EnabledFile), + prepare_plugins(Enabled). %% @doc Lists the plugins which are currently running. active() -> {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), - InstalledPlugins = [ P#plugin.name || P <- list(ExpandDir) ], + InstalledPlugins = plugin_names(list(ExpandDir)), [App || {App, _, _} <- rabbit_misc:which_applications(), lists:member(App, InstalledPlugins)]. @@ -67,7 +100,7 @@ list(PluginsDir) -> _ -> error_logger:warning_msg( "Problem reading some plugins: ~p~n", [Problems]) end, - Plugins. + ensure_dependencies(Plugins). %% @doc Read the list of enabled plugins from the supplied term file. read_enabled(PluginsFile) -> @@ -86,15 +119,10 @@ read_enabled(PluginsFile) -> %% the resulting list, otherwise they're skipped. dependencies(Reverse, Sources, AllPlugins) -> {ok, G} = rabbit_misc:build_acyclic_graph( - fun (App, _Deps) -> [{App, App}] end, - fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end, - lists:ukeysort( - 1, [{Name, Deps} || - #plugin{name = Name, - dependencies = Deps} <- AllPlugins] ++ - [{Dep, []} || - #plugin{dependencies = Deps} <- AllPlugins, - Dep <- Deps])), + fun ({App, _Deps}) -> [{App, App}] end, + fun ({App, Deps}) -> [{App, Dep} || Dep <- Deps] end, + [{Name, Deps} || #plugin{name = Name, + dependencies = Deps} <- AllPlugins]), Dests = case Reverse of false -> digraph_utils:reachable(Sources, G); true -> digraph_utils:reaching(Sources, G) @@ -102,27 +130,44 @@ dependencies(Reverse, Sources, AllPlugins) -> true = digraph:delete(G), Dests. +%% Make sure we don't list OTP apps in here, and also that we detect +%% missing dependencies. +ensure_dependencies(Plugins) -> + Names = plugin_names(Plugins), + NotThere = [Dep || #plugin{dependencies = Deps} <- Plugins, + Dep <- Deps, + not lists:member(Dep, Names)], + {OTP, Missing} = lists:partition(fun is_loadable/1, lists:usort(NotThere)), + case Missing of + [] -> ok; + _ -> Blame = [Name || #plugin{name = Name, + dependencies = Deps} <- Plugins, + lists:any(fun (Dep) -> + lists:member(Dep, Missing) + end, Deps)], + throw({error, {missing_dependencies, Missing, Blame}}) + end, + [P#plugin{dependencies = Deps -- OTP} + || P = #plugin{dependencies = Deps} <- Plugins]. + +is_loadable(App) -> + case application:load(App) of + {error, {already_loaded, _}} -> true; + ok -> application:unload(App), + true; + _ -> false + end. + %%---------------------------------------------------------------------------- -prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) -> +prepare_plugins(Enabled) -> + {ok, PluginsDistDir} = application:get_env(rabbit, plugins_dir), + {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), + AllPlugins = list(PluginsDistDir), - Enabled = read_enabled(EnabledFile), ToUnpack = dependencies(false, Enabled, AllPlugins), ToUnpackPlugins = lookup_plugins(ToUnpack, AllPlugins), - case Enabled -- plugin_names(ToUnpackPlugins) of - [] -> ok; - Missing -> error_logger:warning_msg( - "The following enabled plugins were not found: ~p~n", - [Missing]) - end, - - %% Eliminate the contents of the destination directory - case delete_recursively(ExpandDir) of - ok -> ok; - {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir, - [ExpandDir, E1]}}) - end, case filelib:ensure_dir(ExpandDir ++ "/") of ok -> ok; {error, E2} -> throw({error, {cannot_create_plugins_expand_dir, @@ -134,6 +179,20 @@ prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) -> [prepare_dir_plugin(PluginAppDescPath) || PluginAppDescPath <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")]. +clean_plugins(Plugins) -> + {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), + [clean_plugin(Plugin, ExpandDir) || Plugin <- Plugins]. + +clean_plugin(Plugin, ExpandDir) -> + {ok, Mods} = application:get_key(Plugin, modules), + application:unload(Plugin), + [begin + code:soft_purge(Mod), + code:delete(Mod), + false = code:is_loaded(Mod) + end || Mod <- Mods], + delete_recursively(rabbit_misc:format("~s/~s", [ExpandDir, Plugin])). + prepare_dir_plugin(PluginAppDescPath) -> code:add_path(filename:dirname(PluginAppDescPath)), list_to_atom(filename:basename(PluginAppDescPath, ".app")). @@ -172,8 +231,7 @@ plugin_info(Base, {app, App0}) -> mkplugin(Name, Props, Type, Location) -> Version = proplists:get_value(vsn, Props, "0"), Description = proplists:get_value(description, Props, ""), - Dependencies = - filter_applications(proplists:get_value(applications, Props, [])), + Dependencies = proplists:get_value(applications, Props, []), #plugin{name = Name, version = Version, description = Description, dependencies = Dependencies, location = Location, type = Type}. @@ -206,18 +264,6 @@ parse_binary(Bin) -> Err -> {error, {invalid_app, Err}} end. -filter_applications(Applications) -> - [Application || Application <- Applications, - not is_available_app(Application)]. - -is_available_app(Application) -> - case application:load(Application) of - {error, {already_loaded, _}} -> true; - ok -> application:unload(Application), - true; - _ -> false - end. - plugin_names(Plugins) -> [Name || #plugin{name = Name} <- Plugins]. diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl index 89e16f14..278fcf98 100644 --- a/src/rabbit_plugins_main.erl +++ b/src/rabbit_plugins_main.erl @@ -18,23 +18,34 @@ -include("rabbit.hrl"). -export([start/0, stop/0]). +-export([action/6]). +-define(NODE_OPT, "-n"). -define(VERBOSE_OPT, "-v"). -define(MINIMAL_OPT, "-m"). -define(ENABLED_OPT, "-E"). -define(ENABLED_ALL_OPT, "-e"). +-define(OFFLINE_OPT, "--offline"). +-define(ONLINE_OPT, "--online"). +-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}). -define(VERBOSE_DEF, {?VERBOSE_OPT, flag}). -define(MINIMAL_DEF, {?MINIMAL_OPT, flag}). -define(ENABLED_DEF, {?ENABLED_OPT, flag}). -define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}). +-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}). +-define(ONLINE_DEF, {?ONLINE_OPT, flag}). --define(GLOBAL_DEFS, []). +-define(RPC_TIMEOUT, infinity). + +-define(GLOBAL_DEFS(Node), [?NODE_DEF(Node)]). -define(COMMANDS, [{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]}, - enable, - disable]). + {enable, [?OFFLINE_DEF, ?ONLINE_DEF]}, + {disable, [?OFFLINE_DEF, ?ONLINE_DEF]}, + {set, [?OFFLINE_DEF, ?ONLINE_DEF]}, + {sync, []}]). %%---------------------------------------------------------------------------- @@ -51,11 +62,10 @@ start() -> {ok, [[PluginsFile|_]|_]} = init:get_argument(enabled_plugins_file), + {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), {ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir), {Command, Opts, Args} = - case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS, - init:get_plain_arguments()) - of + case parse_arguments(init:get_plain_arguments(), NodeStr) of {ok, Res} -> Res; no_command -> print_error("could not recognise command", []), usage() @@ -67,7 +77,8 @@ start() -> [string:join([atom_to_list(Command) | Args], " ")]) end, - case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of + Node = proplists:get_value(?NODE_OPT, Opts), + case catch action(Command, Node, Args, Opts, PluginsFile, PluginsDir) of ok -> rabbit_misc:quit(0); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> @@ -76,12 +87,23 @@ start() -> {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> PrintInvalidCommandError(), usage(); + {error, {missing_dependencies, Missing, Blame}} -> + print_error("dependent plugins ~p not found; used by ~p.", + [Missing, Blame]), + rabbit_misc:quit(2); {error, Reason} -> print_error("~p", [Reason]), rabbit_misc:quit(2); {error_string, Reason} -> print_error("~s", [Reason]), rabbit_misc:quit(2); + {badrpc, {'EXIT', Reason}} -> + print_error("~p", [Reason]), + rabbit_misc:quit(2); + {badrpc, Reason} -> + print_error("unable to connect to node ~w: ~w", [Node, Reason]), + print_badrpc_diagnostics([Node]), + rabbit_misc:quit(2); Other -> print_error("~p", [Other]), rabbit_misc:quit(2) @@ -92,50 +114,80 @@ stop() -> %%---------------------------------------------------------------------------- -action(list, [], Opts, PluginsFile, PluginsDir) -> - action(list, [".*"], Opts, PluginsFile, PluginsDir); -action(list, [Pat], Opts, PluginsFile, PluginsDir) -> - format_plugins(Pat, Opts, PluginsFile, PluginsDir); +parse_arguments(CmdLine, NodeStr) -> + case rabbit_misc:parse_arguments( + ?COMMANDS, ?GLOBAL_DEFS(NodeStr), CmdLine) of + {ok, {Cmd, Opts0, Args}} -> + Opts = [case K of + ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)}; + _ -> {K, V} + end || {K, V} <- Opts0], + {ok, {Cmd, Opts, Args}}; + E -> + E + end. + +action(list, Node, [], Opts, PluginsFile, PluginsDir) -> + action(list, Node, [".*"], Opts, PluginsFile, PluginsDir); +action(list, Node, [Pat], Opts, PluginsFile, PluginsDir) -> + format_plugins(Node, Pat, Opts, PluginsFile, PluginsDir); -action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) -> +action(enable, Node, ToEnable0, Opts, PluginsFile, PluginsDir) -> case ToEnable0 of [] -> throw({error_string, "Not enough arguments for 'enable'"}); _ -> ok end, AllPlugins = rabbit_plugins:list(PluginsDir), Enabled = rabbit_plugins:read_enabled(PluginsFile), - ImplicitlyEnabled = rabbit_plugins:dependencies(false, - Enabled, AllPlugins), + ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins), ToEnable = [list_to_atom(Name) || Name <- ToEnable0], Missing = ToEnable -- plugin_names(AllPlugins), + case Missing of + [] -> ok; + _ -> throw({error_string, fmt_missing(Missing)}) + end, NewEnabled = lists:usort(Enabled ++ ToEnable), NewImplicitlyEnabled = rabbit_plugins:dependencies(false, NewEnabled, AllPlugins), - MissingDeps = (NewImplicitlyEnabled -- plugin_names(AllPlugins)) -- Missing, - case {Missing, MissingDeps} of - {[], []} -> ok; - {Miss, []} -> throw({error_string, fmt_missing("plugins", Miss)}); - {[], Miss} -> throw({error_string, fmt_missing("dependencies", Miss)}); - {_, _} -> throw({error_string, - fmt_missing("plugins", Missing) ++ - fmt_missing("dependencies", MissingDeps)}) - end, write_enabled_plugins(PluginsFile, NewEnabled), case NewEnabled -- ImplicitlyEnabled of [] -> io:format("Plugin configuration unchanged.~n"); _ -> print_list("The following plugins have been enabled:", - NewImplicitlyEnabled -- ImplicitlyEnabled), - report_change() - end; + NewImplicitlyEnabled -- ImplicitlyEnabled) + end, + action_change( + Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile); + +action(set, Node, ToSet0, Opts, PluginsFile, PluginsDir) -> + ToSet = [list_to_atom(Name) || Name <- ToSet0], + AllPlugins = rabbit_plugins:list(PluginsDir), + Enabled = rabbit_plugins:read_enabled(PluginsFile), + ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins), + Missing = ToSet -- plugin_names(AllPlugins), + case Missing of + [] -> ok; + _ -> throw({error_string, fmt_missing(Missing)}) + end, + NewImplicitlyEnabled = rabbit_plugins:dependencies(false, + ToSet, AllPlugins), + write_enabled_plugins(PluginsFile, ToSet), + case NewImplicitlyEnabled of + [] -> io:format("All plugins are now disabled.~n"); + _ -> print_list("The following plugins are now enabled:", + NewImplicitlyEnabled) + end, + action_change( + Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile); -action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> +action(disable, Node, ToDisable0, Opts, PluginsFile, PluginsDir) -> case ToDisable0 of [] -> throw({error_string, "Not enough arguments for 'disable'"}); _ -> ok end, - ToDisable = [list_to_atom(Name) || Name <- ToDisable0], - Enabled = rabbit_plugins:read_enabled(PluginsFile), AllPlugins = rabbit_plugins:list(PluginsDir), + Enabled = rabbit_plugins:read_enabled(PluginsFile), + ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins), + ToDisable = [list_to_atom(Name) || Name <- ToDisable0], Missing = ToDisable -- plugin_names(AllPlugins), case Missing of [] -> ok; @@ -144,30 +196,35 @@ action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> end, ToDisableDeps = rabbit_plugins:dependencies(true, ToDisable, AllPlugins), NewEnabled = Enabled -- ToDisableDeps, + NewImplicitlyEnabled = rabbit_plugins:dependencies(false, + NewEnabled, AllPlugins), case length(Enabled) =:= length(NewEnabled) of true -> io:format("Plugin configuration unchanged.~n"); - false -> ImplicitlyEnabled = - rabbit_plugins:dependencies(false, Enabled, AllPlugins), - NewImplicitlyEnabled = - rabbit_plugins:dependencies(false, - NewEnabled, AllPlugins), - print_list("The following plugins have been disabled:", + false -> print_list("The following plugins have been disabled:", ImplicitlyEnabled -- NewImplicitlyEnabled), - write_enabled_plugins(PluginsFile, NewEnabled), - report_change() - end. + write_enabled_plugins(PluginsFile, NewEnabled) + end, + action_change( + Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile); + +action(sync, Node, [], _Opts, PluginsFile, _PluginsDir) -> + sync(Node, true, PluginsFile). %%---------------------------------------------------------------------------- -print_error(Format, Args) -> - rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args). +fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args). + +print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args). + +print_badrpc_diagnostics(Nodes) -> + fmt_stderr(rabbit_nodes:diagnostics(Nodes), []). usage() -> io:format("~s", [rabbit_plugins_usage:usage()]), rabbit_misc:quit(1). %% Pretty print a list of plugins. -format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> +format_plugins(Node, Pattern, Opts, PluginsFile, PluginsDir) -> Verbose = proplists:get_bool(?VERBOSE_OPT, Opts), Minimal = proplists:get_bool(?MINIMAL_OPT, Opts), Format = case {Verbose, Minimal} of @@ -182,41 +239,52 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> AvailablePlugins = rabbit_plugins:list(PluginsDir), EnabledExplicitly = rabbit_plugins:read_enabled(PluginsFile), - EnabledImplicitly = - rabbit_plugins:dependencies(false, EnabledExplicitly, - AvailablePlugins) -- EnabledExplicitly, - Missing = [#plugin{name = Name, dependencies = []} || - Name <- ((EnabledExplicitly ++ EnabledImplicitly) -- - plugin_names(AvailablePlugins))], + AllEnabled = rabbit_plugins:dependencies(false, EnabledExplicitly, + AvailablePlugins), + EnabledImplicitly = AllEnabled -- EnabledExplicitly, + {StatusMsg, Running} = + case rpc:call(Node, rabbit_plugins, active, [], ?RPC_TIMEOUT) of + {badrpc, _} -> {"[failed to contact ~s - status not shown]", []}; + Active -> {"* = running on ~s", Active} + end, {ok, RE} = re:compile(Pattern), Plugins = [ Plugin || - Plugin = #plugin{name = Name} <- AvailablePlugins ++ Missing, + Plugin = #plugin{name = Name} <- AvailablePlugins, re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match, - if OnlyEnabled -> lists:member(Name, EnabledExplicitly); - OnlyEnabledAll -> (lists:member(Name, - EnabledExplicitly) or - lists:member(Name, EnabledImplicitly)); + if OnlyEnabled -> lists:member(Name, EnabledExplicitly); + OnlyEnabledAll -> lists:member(Name, EnabledExplicitly) or + lists:member(Name,EnabledImplicitly); true -> true end], Plugins1 = usort_plugins(Plugins), MaxWidth = lists:max([length(atom_to_list(Name)) || #plugin{name = Name} <- Plugins1] ++ [0]), - [format_plugin(P, EnabledExplicitly, EnabledImplicitly, - plugin_names(Missing), Format, MaxWidth) || P <- Plugins1], + case Format of + minimal -> ok; + _ -> io:format(" Configured: E = explicitly enabled; " + "e = implicitly enabled~n" + " | Status: ~s~n" + " |/~n", [rabbit_misc:format(StatusMsg, [Node])]) + end, + [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Running, + Format, MaxWidth) || P <- Plugins1], ok. format_plugin(#plugin{name = Name, version = Version, description = Description, dependencies = Deps}, - EnabledExplicitly, EnabledImplicitly, Missing, - Format, MaxWidth) -> - Glyph = case {lists:member(Name, EnabledExplicitly), - lists:member(Name, EnabledImplicitly), - lists:member(Name, Missing)} of - {true, false, false} -> "[E]"; - {false, true, false} -> "[e]"; - {_, _, true} -> "[!]"; - _ -> "[ ]" - end, + EnabledExplicitly, EnabledImplicitly, Running, Format, + MaxWidth) -> + EnabledGlyph = case {lists:member(Name, EnabledExplicitly), + lists:member(Name, EnabledImplicitly)} of + {true, false} -> "E"; + {false, true} -> "e"; + _ -> " " + end, + RunningGlyph = case lists:member(Name, Running) of + true -> "*"; + false -> " " + end, + Glyph = rabbit_misc:format("[~s~s]", [EnabledGlyph, RunningGlyph]), Opt = fun (_F, A, A) -> ok; ( F, A, _) -> io:format(F, [A]) end, @@ -227,9 +295,9 @@ format_plugin(#plugin{name = Name, version = Version, Opt("~s", Version, undefined), io:format("~n"); verbose -> io:format("~s ~w~n", [Glyph, Name]), - Opt(" Version: \t~s~n", Version, undefined), - Opt(" Dependencies:\t~p~n", Deps, []), - Opt(" Description: \t~s~n", Description, undefined), + Opt(" Version: \t~s~n", Version, undefined), + Opt(" Dependencies:\t~p~n", Deps, []), + Opt(" Description: \t~s~n", Description, undefined), io:format("~n") end. @@ -240,8 +308,8 @@ fmt_list(Header, Plugins) -> lists:flatten( [Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]). -fmt_missing(Desc, Missing) -> - fmt_list("The following " ++ Desc ++ " could not be found:", Missing). +fmt_missing(Missing) -> + fmt_list("The following plugins could not be found:", Missing). usort_plugins(Plugins) -> lists:usort(fun plugins_cmp/2, Plugins). @@ -262,6 +330,51 @@ write_enabled_plugins(PluginsFile, Plugins) -> PluginsFile, Reason}}) end. -report_change() -> - io:format("Plugin configuration has changed. " - "Restart RabbitMQ for changes to take effect.~n"). +action_change(Opts, Node, Old, New, PluginsFile) -> + action_change0(proplists:get_bool(?OFFLINE_OPT, Opts), + proplists:get_bool(?ONLINE_OPT, Opts), + Node, Old, New, PluginsFile). + +action_change0(true, _Online, _Node, Same, Same, _PluginsFile) -> + %% Definitely nothing to do + ok; +action_change0(true, _Online, _Node, _Old, _New, _PluginsFile) -> + io:format("Offline change; changes will take effect at broker restart.~n"); +action_change0(false, Online, Node, _Old, _New, PluginsFile) -> + sync(Node, Online, PluginsFile). + +sync(Node, ForceOnline, PluginsFile) -> + rpc_call(Node, ForceOnline, rabbit_plugins, ensure, [PluginsFile]). + +rpc_call(Node, Online, Mod, Fun, Args) -> + io:format("~nApplying plugin configuration to ~s...", [Node]), + case rpc:call(Node, Mod, Fun, Args) of + {ok, [], []} -> + io:format(" nothing to do.~n", []); + {ok, Start, []} -> + io:format(" started ~b plugin~s.~n", [length(Start), plur(Start)]); + {ok, [], Stop} -> + io:format(" stopped ~b plugin~s.~n", [length(Stop), plur(Stop)]); + {ok, Start, Stop} -> + io:format(" stopped ~b plugin~s and started ~b plugin~s.~n", + [length(Stop), plur(Stop), length(Start), plur(Start)]); + {badrpc, nodedown} = Error -> + io:format(" failed.~n", []), + case Online of + true -> Error; + false -> io:format( + " * Could not contact node ~s.~n" + " Changes will take effect at broker restart.~n" + " * Options: --online - fail if broker cannot be " + "contacted.~n" + " --offline - do not try to contact " + "broker.~n", + [Node]) + end; + Error -> + io:format(" failed.~n", []), + Error + end. + +plur([_]) -> ""; +plur(_) -> "s". diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index fe2b766f..3558cf98 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -61,13 +61,13 @@ validate_policy0(<<"dead-letter-routing-key">>, Value) -> {error, "~p is not a valid dead letter routing key", [Value]}; validate_policy0(<<"message-ttl">>, Value) - when is_integer(Value), Value >= 0, Value =< ?MAX_EXPIRY_TIMER -> + when is_integer(Value), Value >= 0 -> ok; validate_policy0(<<"message-ttl">>, Value) -> {error, "~p is not a valid message TTL", [Value]}; validate_policy0(<<"expires">>, Value) - when is_integer(Value), Value >= 1, Value =< ?MAX_EXPIRY_TIMER -> + when is_integer(Value), Value >= 1 -> ok; validate_policy0(<<"expires">>, Value) -> {error, "~p is not a valid queue expiry", [Value]}; diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 0a69fb32..f5d03360 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -46,17 +46,11 @@ name(#exchange{policy = Policy}) -> name0(Policy). name0(undefined) -> none; name0(Policy) -> pget(name, Policy). -set(Q = #amqqueue{name = Name}) -> rabbit_queue_decorator:set( - Q#amqqueue{policy = set0(Name)}); -set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set( - X#exchange{policy = set0(Name)}). +set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)}; +set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}. set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)). -set(Q = #amqqueue{name = Name}, Ps) -> Q#amqqueue{policy = match(Name, Ps)}; -set(X = #exchange{name = Name}, Ps) -> rabbit_exchange_decorator:set( - X#exchange{policy = match(Name, Ps)}). - get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy); get(Name, #exchange{policy = Policy}) -> get0(Name, Policy); %% Caution - SLOW. @@ -104,12 +98,18 @@ recover0() -> Policies = list(), [rabbit_misc:execute_mnesia_transaction( fun () -> - mnesia:write(rabbit_durable_exchange, set(X, Policies), write) - end) || X <- Xs], + mnesia:write( + rabbit_durable_exchange, + rabbit_exchange_decorator:set( + X#exchange{policy = match(Name, Policies)}), write) + end) || X = #exchange{name = Name} <- Xs], [rabbit_misc:execute_mnesia_transaction( fun () -> - mnesia:write(rabbit_durable_queue, set(Q, Policies), write) - end) || Q <- Qs], + mnesia:write( + rabbit_durable_queue, + rabbit_queue_decorator:set( + Q#amqqueue{policy = match(Name, Policies)}), write) + end) || Q = #amqqueue{name = Name} <- Qs], ok. invalid_file() -> diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl index 6205e2dc..adfe0c7f 100644 --- a/src/rabbit_queue_decorator.erl +++ b/src/rabbit_queue_decorator.erl @@ -2,7 +2,7 @@ -include("rabbit.hrl"). --export([select/1, set/1]). +-export([select/1, set/1, register/2, unregister/1]). %%---------------------------------------------------------------------------- @@ -41,3 +41,24 @@ select(Modules) -> set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}. list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)]. + +register(TypeName, ModuleName) -> + rabbit_registry:register(queue_decorator, TypeName, ModuleName), + [maybe_recover(Q) || Q <- rabbit_amqqueue:list()], + ok. + +unregister(TypeName) -> + rabbit_registry:unregister(queue_decorator, TypeName), + [maybe_recover(Q) || Q <- rabbit_amqqueue:list()], + ok. + +maybe_recover(Q = #amqqueue{name = Name, + decorators = Decs}) -> + #amqqueue{decorators = Decs1} = set(Q), + Old = lists:sort(select(Decs)), + New = lists:sort(select(Decs1)), + case New of + Old -> ok; + _ -> [M:startup(Q) || M <- New -- Old], + rabbit_amqqueue:update_decorators(Name) + end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e6e94e28..2ac24f97 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -27,7 +27,6 @@ -export([conserve_resources/3, server_properties/1]). --define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 30). -define(CHANNEL_TERMINATION_TIMEOUT, 3). @@ -43,7 +42,7 @@ -record(connection, {name, host, peer_host, port, peer_port, protocol, user, timeout_sec, frame_max, channel_max, vhost, client_properties, capabilities, - auth_mechanism, auth_state}). + auth_mechanism, auth_state, connected_at}). -record(throttle, {alarmed_by, last_blocked_by, last_blocked_at}). @@ -55,7 +54,7 @@ peer_host, ssl, peer_cert_subject, peer_cert_issuer, peer_cert_validity, auth_mechanism, ssl_protocol, ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost, - timeout, frame_max, channel_max, client_properties]). + timeout, frame_max, channel_max, client_properties, connected_at]). -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). @@ -189,10 +188,10 @@ server_capabilities(_) -> log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args). socket_error(Reason) when is_atom(Reason) -> - log(error, "error on AMQP connection ~p: ~s~n", + log(error, "Error on AMQP connection ~p: ~s~n", [self(), rabbit_misc:format_inet_error(Reason)]); socket_error(Reason) -> - log(error, "error on AMQP connection ~p:~n~p~n", [self(), Reason]). + log(error, "Error on AMQP connection ~p:~n~p~n", [self(), Reason]). inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). @@ -216,8 +215,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> exit(normal) end, log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]), + {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout), ClientSock = socket_op(Sock, SockTransform), - erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), + erlang:send_after(HandshakeTimeout, self(), handshake_timeout), {PeerHost, PeerPort, Host, Port} = socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), ?store_proc_name(list_to_binary(Name)), @@ -231,13 +231,14 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> peer_port = PeerPort, protocol = none, user = none, - timeout_sec = ?HANDSHAKE_TIMEOUT, + timeout_sec = (HandshakeTimeout / 1000), frame_max = ?FRAME_MIN_SIZE, vhost = none, client_properties = none, capabilities = [], auth_mechanism = none, - auth_state = none}, + auth_state = none, + connected_at = rabbit_misc:now_to_ms(os:timestamp())}, callback = uninitialized_callback, recv_len = 0, pending_recv = false, @@ -410,7 +411,7 @@ handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) rabbit_event:notify( connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref), - State; + rabbit_event:init_stats_timer(State, #v1.stats_timer); handle_other({'$gen_cast', force_event_refresh}, State) -> %% Ignore, we will emit a created event once we start running. State; @@ -548,21 +549,27 @@ wait_for_channel_termination(0, TimerRef, State) -> end; _ -> State end; -wait_for_channel_termination(N, TimerRef, State) -> +wait_for_channel_termination(N, TimerRef, + State = #v1{connection_state = CS, + connection = #connection{ + name = ConnName, + user = User, + vhost = VHost}}) -> receive {'DOWN', _MRef, process, ChPid, Reason} -> {Channel, State1} = channel_cleanup(ChPid, State), case {Channel, termination_kind(Reason)} of - {undefined, _} -> exit({abnormal_dependent_exit, - ChPid, Reason}); - {_, controlled} -> wait_for_channel_termination( - N-1, TimerRef, State1); - {_, uncontrolled} -> log(error, - "AMQP connection ~p, channel ~p - " - "error while terminating:~n~p~n", - [self(), Channel, Reason]), - wait_for_channel_termination( - N-1, TimerRef, State1) + {undefined, _} -> + exit({abnormal_dependent_exit, ChPid, Reason}); + {_, controlled} -> + wait_for_channel_termination(N-1, TimerRef, State1); + {_, uncontrolled} -> + log(error, "Error on AMQP connection ~p (~s, vhost: '~s'," + " user: '~s', state: ~p), channel ~p:" + "error while terminating:~n~p~n", + [self(), ConnName, VHost, User#user.username, + CS, Channel, Reason]), + wait_for_channel_termination(N-1, TimerRef, State1) end; cancel_wait -> exit(channel_termination_timeout) @@ -581,16 +588,24 @@ maybe_close(State) -> termination_kind(normal) -> controlled; termination_kind(_) -> uncontrolled. +log_hard_error(#v1{connection_state = CS, + connection = #connection{ + name = ConnName, + user = User, + vhost = VHost}}, Channel, Reason) -> + log(error, + "Error on AMQP connection ~p (~s, vhost: '~s'," + " user: '~s', state: ~p), channel ~p:~n~p~n", + [self(), ConnName, VHost, User#user.username, CS, Channel, Reason]). + handle_exception(State = #v1{connection_state = closed}, Channel, Reason) -> - log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n", - [self(), closed, Channel, Reason]), + log_hard_error(State, Channel, Reason), State; handle_exception(State = #v1{connection = #connection{protocol = Protocol}, connection_state = CS}, Channel, Reason) when ?IS_RUNNING(State) orelse CS =:= closing -> - log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n", - [self(), CS, Channel, Reason]), + log_hard_error(State, Channel, Reason), {0, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), State1 = close_connection(terminate_channels(State)), @@ -1130,6 +1145,7 @@ ic(channel_max, #connection{channel_max = ChMax}) -> ChMax; ic(client_properties, #connection{client_properties = CP}) -> CP; ic(auth_mechanism, #connection{auth_mechanism = none}) -> none; ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name; +ic(connected_at, #connection{connected_at = T}) -> T; ic(Item, #connection{}) -> throw({bad_argument, Item}). socket_info(Get, Select, #v1{sock = Sock}) -> diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index da75932d..fe2c3b58 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -70,7 +70,13 @@ wait_for_replicated() -> not lists:member({local_content, true}, TabDef)]). wait(TableNames) -> - case mnesia:wait_for_tables(TableNames, 30000) of + %% We might be in ctl here for offline ops, in which case we can't + %% get_env() for the rabbit app. + Timeout = case application:get_env(rabbit, mnesia_table_loading_timeout) of + {ok, T} -> T; + undefined -> 30000 + end, + case mnesia:wait_for_tables(TableNames, Timeout) of ok -> ok; {timeout, BadTabs} -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index da6938bd..34a8cc5c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2454,7 +2454,7 @@ with_fresh_variable_queue(Fun) -> spawn_link(fun() -> ok = empty_test_queue(), VQ = variable_queue_init(test_amqqueue(true), false), - S0 = rabbit_variable_queue:status(VQ), + S0 = variable_queue_status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -2579,7 +2579,7 @@ variable_queue_with_holes(VQ0) -> {delta, _, 0, _} -> true; 0 -> true; _ -> false - end || {K, V} <- rabbit_variable_queue:status(VQ8), + end || {K, V} <- variable_queue_status(VQ8), lists:member(K, [q1, delta, q3])], Depth = Count + Interval, Depth = rabbit_variable_queue:depth(VQ8), @@ -2649,17 +2649,20 @@ test_variable_queue_ack_limiting(VQ0) -> %% fetch half the messages {VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3), - VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2}, - {ram_ack_count, Len div 2}, - {ram_msg_count, Len div 2}]), + VQ5 = check_variable_queue_status( + VQ4, [{len, Len div 2}, + {messages_unacknowledged_ram, Len div 2}, + {messages_ready_ram, Len div 2}, + {messages_ram, Len}]), %% ensure all acks go to disk on 0 duration target VQ6 = check_variable_queue_status( variable_queue_set_ram_duration_target(0, VQ5), - [{len, Len div 2}, - {target_ram_count, 0}, - {ram_msg_count, 0}, - {ram_ack_count, 0}]), + [{len, Len div 2}, + {target_ram_count, 0}, + {messages_unacknowledged_ram, 0}, + {messages_ready_ram, 0}, + {messages_ram, 0}]), VQ6. @@ -2760,7 +2763,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> fun (Duration1, VQ4) -> {_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4), io:format("~p:~n~p~n", - [Duration1, rabbit_variable_queue:status(VQ5)]), + [Duration1, variable_queue_status(VQ5)]), VQ6 = variable_queue_set_ram_duration_target( Duration1, VQ5), publish_fetch_and_ack(Churn, Len, VQ6) @@ -2820,11 +2823,16 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> check_variable_queue_status(VQ0, Props) -> VQ1 = variable_queue_wait_for_shuffling_end(VQ0), - S = rabbit_variable_queue:status(VQ1), + S = variable_queue_status(VQ1), io:format("~p~n", [S]), assert_props(S, Props), VQ1. +variable_queue_status(VQ) -> + Keys = rabbit_backing_queue:info_keys() -- [backing_queue_status], + [{K, rabbit_variable_queue:info(K, VQ)} || K <- Keys] ++ + rabbit_variable_queue:info(backing_queue_status, VQ). + variable_queue_wait_for_shuffling_end(VQ) -> case credit_flow:blocked() of false -> VQ; diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index aafd81df..dbc2856d 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -16,7 +16,7 @@ -module(rabbit_trace). --export([init/1, enabled/1, tap_in/2, tap_out/2, start/1, stop/1]). +-export([init/1, enabled/1, tap_in/5, tap_out/5, start/1, stop/1]). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -32,8 +32,12 @@ -spec(init/1 :: (rabbit_types:vhost()) -> state()). -spec(enabled/1 :: (rabbit_types:vhost()) -> boolean()). --spec(tap_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok'). --spec(tap_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok'). +-spec(tap_in/5 :: (rabbit_types:basic_message(), binary(), + rabbit_channel:channel_number(), + rabbit_types:username(), state()) -> 'ok'). +-spec(tap_out/5 :: (rabbit_amqqueue:qmsg(), binary(), + rabbit_channel:channel_number(), + rabbit_types:username(), state()) -> 'ok'). -spec(start/1 :: (rabbit_types:vhost()) -> 'ok'). -spec(stop/1 :: (rabbit_types:vhost()) -> 'ok'). @@ -54,15 +58,27 @@ enabled(VHost) -> {ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS), lists:member(VHost, VHosts). -tap_in(_Msg, none) -> ok; -tap_in(Msg = #basic_message{exchange_name = #resource{name = XName}}, TraceX) -> - trace(TraceX, Msg, <<"publish">>, XName, []). - -tap_out(_Msg, none) -> ok; -tap_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}, TraceX) -> +tap_in(_Msg, _ConnName, _ChannelNum, _Username, none) -> ok; +tap_in(Msg = #basic_message{exchange_name = #resource{name = XName, + virtual_host = VHost}}, + ConnName, ChannelNum, Username, TraceX) -> + trace(TraceX, Msg, <<"publish">>, XName, + [{<<"vhost">>, longstr, VHost}, + {<<"connection">>, longstr, ConnName}, + {<<"channel">>, signedint, ChannelNum}, + {<<"user">>, longstr, Username}]). + +tap_out(_Msg, _ConnName, _ChannelNum, _Username, none) -> ok; +tap_out({#resource{name = QName, virtual_host = VHost}, + _QPid, _QMsgId, Redelivered, Msg}, + ConnName, ChannelNum, Username, TraceX) -> RedeliveredNum = case Redelivered of true -> 1; false -> 0 end, trace(TraceX, Msg, <<"deliver">>, QName, - [{<<"redelivered">>, signedint, RedeliveredNum}]). + [{<<"redelivered">>, signedint, RedeliveredNum}, + {<<"vhost">>, longstr, VHost}, + {<<"connection">>, longstr, ConnName}, + {<<"channel">>, signedint, ChannelNum}, + {<<"user">>, longstr, Username}]). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index b6d37852..1104f373 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -48,6 +48,7 @@ -rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}). -rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}). -rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}). +-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}). %% ------------------------------------------------------------------- @@ -77,6 +78,8 @@ -spec(policy_apply_to/0 :: () -> 'ok'). -spec(queue_decorators/0 :: () -> 'ok'). -spec(internal_system_x/0 :: () -> 'ok'). +-spec(cluster_name/0 :: () -> 'ok'). +-spec(down_slave_nodes/0 :: () -> 'ok'). -endif. @@ -382,6 +385,21 @@ cluster_name_tx() -> [mnesia:delete(T, K, write) || K <- Ks], ok. +down_slave_nodes() -> + ok = down_slave_nodes(rabbit_queue), + ok = down_slave_nodes(rabbit_durable_queue). + +down_slave_nodes(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, Policy, GmPids, Decorators}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, [], Policy, GmPids, Decorators} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ede69748..03b99562 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -22,7 +22,7 @@ ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, - status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]). + info/2, invoke/3, is_duplicate/2, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -821,15 +821,19 @@ msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, out = AvgEgressRate } }) -> {AvgIngressRate, AvgEgressRate}. -status(#vqstate { +info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) -> + RamMsgCount; +info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA}) -> + gb_trees:size(RPA); +info(messages_ram, State) -> + info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State); +info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> + PersistentCount; +info(backing_queue_status, #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, - ram_pending_ack = RPA, - disk_pending_ack = DPA, target_ram_count = TargetRamCount, - ram_msg_count = RamMsgCount, next_seq_id = NextSeqId, - persistent_count = PersistentCount, rates = #rates { in = AvgIngressRate, out = AvgEgressRate, ack_in = AvgAckIngressRate, @@ -841,16 +845,14 @@ status(#vqstate { {q3 , ?QUEUE:len(Q3)}, {q4 , ?QUEUE:len(Q4)}, {len , Len}, - {pending_acks , gb_trees:size(RPA) + gb_trees:size(DPA)}, {target_ram_count , TargetRamCount}, - {ram_msg_count , RamMsgCount}, - {ram_ack_count , gb_trees:size(RPA)}, {next_seq_id , NextSeqId}, - {persistent_count , PersistentCount}, {avg_ingress_rate , AvgIngressRate}, {avg_egress_rate , AvgEgressRate}, {avg_ack_ingress_rate, AvgAckIngressRate}, - {avg_ack_egress_rate , AvgAckEgressRate} ]. + {avg_ack_egress_rate , AvgAckEgressRate} ]; +info(Item, _) -> + throw({bad_argument, Item}). invoke(?MODULE, Fun, State) -> Fun(?MODULE, State); invoke( _, _, State) -> State. diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index d943b599..3a041508 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -114,8 +114,8 @@ upgrades_required(Scope) -> with_upgrade_graph(Fun, Scope) -> case rabbit_misc:build_acyclic_graph( - fun (Module, Steps) -> vertices(Module, Steps, Scope) end, - fun (Module, Steps) -> edges(Module, Steps, Scope) end, + fun ({_App, Module, Steps}) -> vertices(Module, Steps, Scope) end, + fun ({_App, Module, Steps}) -> edges(Module, Steps, Scope) end, rabbit_misc:all_module_attributes(rabbit_upgrade)) of {ok, G} -> try Fun(G) @@ -161,7 +161,7 @@ heads(G) -> categorise_by_scope(Version) when is_list(Version) -> Categorised = - [{Scope, Name} || {_Module, Attributes} <- + [{Scope, Name} || {_App, _Module, Attributes} <- rabbit_misc:all_module_attributes(rabbit_upgrade), {Name, Scope, _Requires} <- Attributes, lists:member(Name, Version)], |