diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-21 18:32:35 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-21 18:32:35 +0100 |
commit | e2efbc4e4033022ac31426e3057551e1396e48da (patch) | |
tree | dbd365c51cff72eac882b52ba4075ede1120cd63 | |
parent | ac193e253ad515f232e96515725ded707fbda400 (diff) | |
parent | 413551fe76926c74c0e7ae6c5296b01812b7187f (diff) | |
download | rabbitmq-server-e2efbc4e4033022ac31426e3057551e1396e48da.tar.gz |
stable to default
56 files changed, 2328 insertions, 845 deletions
@@ -20,8 +20,6 @@ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml)) WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml $(DOCS_DIR)/rabbitmq-echopid.xml) USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-plugins.1.xml USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML))) -QC_MODULES := rabbit_backing_queue_qc -QC_TRIALS ?= 100 ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes) PYTHON=python @@ -56,6 +54,12 @@ endif #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc) +ifdef INSTRUMENT_FOR_QC +ERLC_OPTS += -DINSTR_MOD=gm_qc +else +ERLC_OPTS += -DINSTR_MOD=gm +endif + include version.mk PLUGINS_SRC_DIR?=$(shell [ -d "plugins-src" ] && echo "plugins-src" || echo ) @@ -123,7 +127,7 @@ plugins: # Not building plugins check-xref: - $(info xref checks are disabled) + $(info xref checks are disabled as there is no plugins-src directory) endif @@ -217,7 +221,8 @@ run-tests: all echo $$OUT ; echo $$OUT | grep '^{ok, passed}$$' > /dev/null run-qc: all - $(foreach MOD,$(QC_MODULES),./quickcheck $(RABBITMQ_NODENAME) $(MOD) $(QC_TRIALS)) + ./quickcheck $(RABBITMQ_NODENAME) rabbit_backing_queue_qc 100 40 + ./quickcheck $(RABBITMQ_NODENAME) gm_qc 1000 200 start-background-node: all -rm -f $(RABBITMQ_MNESIA_DIR).pid @@ -381,3 +386,4 @@ include $(DEPS_FILE) endif .PHONY: run-qc + diff --git a/docs/rabbitmq-plugins.1.xml b/docs/rabbitmq-plugins.1.xml index 8ecb4fc8..f7be2d29 100644 --- a/docs/rabbitmq-plugins.1.xml +++ b/docs/rabbitmq-plugins.1.xml @@ -40,6 +40,7 @@ <refsynopsisdiv> <cmdsynopsis> <command>rabbitmq-plugins</command> + <arg choice="opt">-n <replaceable>node</replaceable></arg> <arg choice="req"><replaceable>command</replaceable></arg> <arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg> </cmdsynopsis> @@ -62,6 +63,16 @@ enabled. Implicitly enabled plugins are automatically disabled again when they are no longer required. </para> + + <para> + The <command>enable</command>, <command>disable</command> and + <command>set</command> commands will update the plugins file and + then attempt to connect to the broker and ensure it is running + all enabled plugins. By default if it is not possible to connect + to the running broker (for example if it is stopped) then a + warning is displayed. Specify <command>--online</command> or + <command>--offline</command> to change this behaviour. + </para> </refsect1> <refsect1> @@ -97,12 +108,14 @@ </variablelist> <para> Lists all plugins, their versions, dependencies and - descriptions. Each plugin is prefixed with a status - indicator - [ ] to indicate that the plugin is not - enabled, [E] to indicate that it is explicitly enabled, - [e] to indicate that it is implicitly enabled, and [!] to - indicate that it is enabled but missing and thus not - operational. + descriptions. Each plugin is prefixed with two status + indicator characters inside [ ]. The first indicator can + be " " to indicate that the plugin is not enabled, "E" to + indicate that it is explicitly enabled, "e" to indicate + that it is implicitly enabled, or "!" to indicate that it + is enabled but missing and thus not operational. The + second indicator can be " " to show that the plugin is not + running, or "*" to show that it is. </para> <para> If the optional pattern is given, only plugins whose @@ -130,17 +143,24 @@ </varlistentry> <varlistentry> - <term><cmdsynopsis><command>enable</command> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term> + <term><cmdsynopsis><command>enable</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> + <term>--offline</term> + <listitem><para>Just modify the enabled plugins file.</para></listitem> + </varlistentry> + <varlistentry> + <term>--online</term> + <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem> + </varlistentry> + <varlistentry> <term>plugin</term> <listitem><para>One or more plugins to enable.</para></listitem> </varlistentry> </variablelist> <para> - Enables the specified plugins and all their - dependencies. + Enables the specified plugins and all their dependencies. </para> <para role="example-prefix">For example:</para> @@ -154,17 +174,24 @@ </varlistentry> <varlistentry> - <term><cmdsynopsis><command>disable</command> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term> + <term><cmdsynopsis><command>disable</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> + <term>--offline</term> + <listitem><para>Just modify the enabled plugins file.</para></listitem> + </varlistentry> + <varlistentry> + <term>--online</term> + <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem> + </varlistentry> + <varlistentry> <term>plugin</term> <listitem><para>One or more plugins to disable.</para></listitem> </varlistentry> </variablelist> <para> - Disables the specified plugins and all plugins that - depend on them. + Disables the specified plugins and all their dependencies. </para> <para role="example-prefix">For example:</para> @@ -175,6 +202,42 @@ </para> </listitem> </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>set</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>--offline</term> + <listitem><para>Just modify the enabled plugins file.</para></listitem> + </varlistentry> + <varlistentry> + <term>--online</term> + <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem> + </varlistentry> + <varlistentry> + <term>plugin</term> + <listitem><para>Zero or more plugins to enable.</para></listitem> + </varlistentry> + </variablelist> + <para> + Enables the specified plugins and all their + dependencies. Unlike <command>rabbitmq-plugins + enable</command> this command ignores and overwrites any + existing enabled plugins. <command>rabbitmq-plugins + set</command> with no plugin arguments is a legal command + meaning "disable all plugins". + </para> + + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmq-plugins set rabbitmq_management</screen> + <para role="example"> + This command enables the <command>management</command> + plugin and its dependencies and disables everything else. + </para> + </listitem> + </varlistentry> + </variablelist> </refsect1> diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index a128bfbc..63540568 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -27,6 +27,11 @@ %% %% {ssl_listeners, [5671]}, + %% Maximum time for AMQP 0-8/0-9/0-9-1 handshake (after socket connection + %% and SSL handshake), in milliseconds. + %% + %% {handshake_timeout, 10000}, + %% Log levels (currently just used for connection logging). %% One of 'info', 'warning', 'error' or 'none', in decreasing order %% of verbosity. Defaults to 'info'. @@ -111,6 +116,10 @@ %% %% {ssl_cert_login_from, common_name}, + %% SSL handshake timeout, in milliseconds. + %% + %% {ssl_handshake_timeout, 5000}, + %% %% Default User / VHost %% ==================== @@ -221,7 +230,12 @@ %% Explicitly enable/disable hipe compilation. %% - %% {hipe_compile, true} + %% {hipe_compile, true}, + + %% Timeout used when waiting for Mnesia tables in a cluster to + %% become available. + %% + %% {mnesia_table_loading_timeout, 30000} ]}, @@ -265,9 +279,13 @@ %% {certfile, "/path/to/cert.pem"}, %% {keyfile, "/path/to/key.pem"}]}]}, + %% One of 'basic', 'detailed' or 'none'. See + %% http://www.rabbitmq.com/management.html#fine-stats for more details. + %% {rates_mode, basic}, + %% Configure how long aggregated data (such as message rates and queue %% lengths) is retained. Please read the plugin's documentation in - %% https://www.rabbitmq.com/management.html#configuration for more + %% http://www.rabbitmq.com/management.html#configuration for more %% details. %% %% {sample_retention_policies, @@ -276,14 +294,6 @@ %% {detailed, [{10, 5}]}]} ]}, - {rabbitmq_management_agent, - [%% Misc/Advanced Options - %% - %% NB: Change these only if you understand what you are doing! - %% - %% {force_fine_statistics, true} - ]}, - %% ---------------------------------------------------------------------------- %% RabbitMQ Shovel Plugin %% diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 01b024a2..eb3c7ef3 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -406,11 +406,15 @@ online, except when using the <command>--offline</command> flag. </para> <para> - When using the <command>--offline</command> flag the node you - connect to will become the canonical source for cluster metadata - (e.g. which queues exist), even if it was not before. Therefore - you should use this command on the latest node to shut down if - at all possible. + When using the <command>--offline</command> flag + rabbitmqctl will not attempt to connect to a node as + normal; instead it will temporarily become the node in + order to make the change. This is useful if the node + cannot be started normally. In this case the node will + become the canonical source for cluster metadata + (e.g. which queues exist), even if it was not + before. Therefore you should use this command on the + latest node to shut down if at all possible. </para> <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl -n hare@mcnulty forget_cluster_node rabbit@stringer</screen> @@ -454,6 +458,44 @@ </listitem> </varlistentry> <varlistentry> + <term><cmdsynopsis><command>force_boot</command></cmdsynopsis></term> + <listitem> + <para> + Ensure that the node will start next time, even if it + was not the last to shut down. + </para> + <para> + Normally when you shut down a RabbitMQ cluster + altogether, the first node you restart should be the + last one to go down, since it may have seen things + happen that other nodes did not. But sometimes + that's not possible: for instance if the entire cluster + loses power then all nodes may think they were not the + last to shut down. + </para> + <para> + In such a case you can invoke <command>rabbitmqctl + force_boot</command> while the node is down. This will + tell the node to unconditionally start next time you ask + it to. If any changes happened to the cluster after this + node shut down, they will be lost. + </para> + <para> + If the last node to go down is permanently lost then you + should use <command>rabbitmqctl forget_cluster_node + --offline</command> in preference to this command, as it + will ensure that mirrored queues which were mastered on + the lost node get promoted. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl force_boot</screen> + <para role="example"> + This will force the node not to wait for other nodes + next time it is started. + </para> + </listitem> + </varlistentry> + <varlistentry> <term><cmdsynopsis><command>sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis> </term> <listitem> @@ -1148,6 +1190,42 @@ (queue depth).</para></listitem> </varlistentry> <varlistentry> + <term>messages_ready_ram</term> + <listitem><para>Number of messages from messages_ready which are resident in ram.</para></listitem> + </varlistentry> + <varlistentry> + <term>messages_unacknowledged_ram</term> + <listitem><para>Number of messages from messages_unacknowledged which are resident in ram.</para></listitem> + </varlistentry> + <varlistentry> + <term>messages_ram</term> + <listitem><para>Total number of messages which are resident in ram.</para></listitem> + </varlistentry> + <varlistentry> + <term>messages_persistent</term> + <listitem><para>Total number of persistent messages in the queue (will always be 0 for transient queues).</para></listitem> + </varlistentry> + <varlistentry> + <term>message_bytes</term> + <listitem><para>Sum of the size of all message bodies in the queue. This does not include the message properties (including headers) or any overhead.</para></listitem> + </varlistentry> + <varlistentry> + <term>message_bytes_ready</term> + <listitem><para>Like <command>message_bytes</command> but counting only those messages ready to be delivered to clients.</para></listitem> + </varlistentry> + <varlistentry> + <term>message_bytes_unacknowledged</term> + <listitem><para>Like <command>message_bytes</command> but counting only those messages delivered to clients but not yet acknowledged.</para></listitem> + </varlistentry> + <varlistentry> + <term>message_bytes_ram</term> + <listitem><para>Like <command>message_bytes</command> but counting only those messages which are in RAM.</para></listitem> + </varlistentry> + <varlistentry> + <term>message_bytes_persistent</term> + <listitem><para>Like <command>message_bytes</command> but counting only those messages which are persistent.</para></listitem> + </varlistentry> + <varlistentry> <term>consumers</term> <listitem><para>Number of consumers.</para></listitem> </varlistentry> @@ -1475,6 +1553,10 @@ <term>send_pend</term> <listitem><para>Send queue size.</para></listitem> </varlistentry> + <varlistentry> + <term>connected_at</term> + <listitem><para>Date and time this connection was established, as timestamp.</para></listitem> + </varlistentry> </variablelist> <para> If no <command>connectioninfoitem</command>s are diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 7360208a..f26e0f77 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -39,12 +39,15 @@ {server_properties, []}, {collect_statistics, none}, {collect_statistics_interval, 5000}, + {mnesia_table_loading_timeout, 30000}, {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, {auth_backends, [rabbit_auth_backend_internal]}, {delegate_count, 16}, {trace_vhosts, []}, {log_levels, [{connection, info}]}, {ssl_cert_login_from, distinguished_name}, + {ssl_handshake_timeout, 5000}, + {handshake_timeout, 10000}, {reverse_dns_lookups, false}, {cluster_partition_handling, ignore}, {tcp_listen_options, [binary, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 5ac3197e..5e41ea93 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -39,13 +39,25 @@ -record(resource, {virtual_host, kind, name}). --record(exchange, {name, type, durable, auto_delete, internal, arguments, - scratches, policy, decorators}). --record(exchange_serial, {name, next}). +%% fields described as 'transient' here are cleared when writing to +%% rabbit_durable_<thing> +-record(exchange, { + name, type, durable, auto_delete, internal, arguments, %% immutable + scratches, %% durable, explicitly updated via update_scratch/3 + policy, %% durable, implicitly updated when policy changes + decorators}). %% transient, recalculated in store/1 (i.e. recovery) + +-record(amqqueue, { + name, durable, auto_delete, exclusive_owner = none, %% immutable + arguments, %% immutable + pid, %% durable (just so we know home node) + slave_pids, sync_slave_pids, %% transient + down_slave_nodes, %% durable + policy, %% durable, implicit update as above + gm_pids, %% transient + decorators}). %% transient, recalculated as above --record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid, slave_pids, sync_slave_pids, policy, - gm_pids, decorators}). +-record(exchange_serial, {name, next}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). @@ -75,7 +87,7 @@ -record(event, {type, props, reference = undefined, timestamp}). --record(message_properties, {expiry, needs_confirming = false}). +-record(message_properties, {expiry, needs_confirming = false, size}). -record(plugin, {name, %% atom() version, %% string() @@ -105,9 +117,6 @@ -define(DESIRED_HIBERNATE, 10000). -define(CREDIT_DISC_BOUND, {2000, 500}). -%% This is dictated by `erlang:send_after' on which we depend to implement TTL. --define(MAX_EXPIRY_TIMER, 4294967295). - -define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>). -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). -define(DELETED_HEADER, <<"BCC">>). diff --git a/packaging/common/README b/packaging/common/README index 0a29ee27..35a1523a 100644 --- a/packaging/common/README +++ b/packaging/common/README @@ -17,4 +17,4 @@ run as the superuser. An example configuration file is provided in the same directory as this README. Copy it to /etc/rabbitmq/rabbitmq.config to use it. The RabbitMQ server must be restarted after changing the configuration -file or enabling or disabling plugins. +file. @@ -7,17 +7,21 @@ %% NodeStr is a local broker node name %% ModStr is the module containing quickcheck properties %% TrialsStr is the number of trials -main([NodeStr, ModStr, TrialsStr]) -> +main([NodeStr, ModStr, NumTestsStr, MaxSizeStr]) -> {ok, Hostname} = inet:gethostname(), Node = list_to_atom(NodeStr ++ "@" ++ Hostname), Mod = list_to_atom(ModStr), - Trials = erlang:list_to_integer(TrialsStr), + NumTests = erlang:list_to_integer(NumTestsStr), + MaxSize = erlang:list_to_integer(MaxSizeStr), case rpc:call(Node, code, ensure_loaded, [proper]) of {module, proper} -> case rpc:call(Node, proper, module, - [Mod] ++ [[{numtests, Trials}, {constraint_tries, 200}]]) of + [Mod] ++ [[{numtests, NumTests}, + {max_size, MaxSize}, + {constraint_tries, 200}]]) of [] -> ok; - _ -> quit(1) + R -> io:format("~p.~n", [R]), + quit(1) end; {badrpc, Reason} -> io:format("Could not contact node ~p: ~p.~n", [Node, Reason]), diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index 69d5a9c9..74298440 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -58,3 +58,47 @@ fi ## Get configuration variables from the configure environment file [ -f ${CONF_ENV_FILE} ] && . ${CONF_ENV_FILE} || true + +##--- Set environment vars RABBITMQ_<var_name> to defaults if not set + +DEFAULT_NODE_IP_ADDRESS=auto +DEFAULT_NODE_PORT=5672 +[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} +[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} + +[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS} +[ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT} + +[ "x" = "x$RABBITMQ_DIST_PORT" ] && RABBITMQ_DIST_PORT=${DIST_PORT} +[ "x" = "x$RABBITMQ_DIST_PORT" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_DIST_PORT=$((${DEFAULT_NODE_PORT} + 20000)) +[ "x" = "x$RABBITMQ_DIST_PORT" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_DIST_PORT=$((${RABBITMQ_NODE_PORT} + 20000)) + +[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} +[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} +[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE} +[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE} +[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE} +[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS} +[ "x" = "x$RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS" ] && RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=${SERVER_ADDITIONAL_ERL_ARGS} +[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR} +[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME} + +[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${PID_FILE} +[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${RABBITMQ_MNESIA_DIR}.pid + +[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR} +[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand + +[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE} + +[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR} + +## Log rotation +[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS} +[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}.log" +[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS=${SASL_LOGS} +[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}-sasl.log" + +[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS} + +##--- End of overridden <var_name> variables diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins index bd7d0b6a..8b5b30b7 100755 --- a/scripts/rabbitmq-plugins +++ b/scripts/rabbitmq-plugins @@ -19,13 +19,6 @@ # Non-empty defaults should be set in rabbitmq-env . `dirname $0`/rabbitmq-env -##--- Set environment vars RABBITMQ_<var_name> to defaults if not set - -[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE} -[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR} - -##--- End of overridden <var_name> variables - exec ${ERL_DIR}erl \ -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ @@ -35,4 +28,5 @@ exec ${ERL_DIR}erl \ -s rabbit_plugins_main \ -enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \ -plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \ + -nodename $RABBITMQ_NODENAME \ -extra "$@" diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat index a535ebad..61e39e38 100755 --- a/scripts/rabbitmq-plugins.bat +++ b/scripts/rabbitmq-plugins.bat @@ -31,6 +31,10 @@ if "!RABBITMQ_BASE!"=="" ( set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
+if "!RABBITMQ_NODENAME!"=="" (
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
+)
+
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
@@ -51,7 +55,7 @@ if "!RABBITMQ_PLUGINS_DIR!"=="" ( set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index bd397441..686c90cf 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -19,48 +19,6 @@ # Non-empty defaults should be set in rabbitmq-env . `dirname $0`/rabbitmq-env -##--- Set environment vars RABBITMQ_<var_name> to defaults if not set - -DEFAULT_NODE_IP_ADDRESS=auto -DEFAULT_NODE_PORT=5672 -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} -[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} - -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS} -[ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT} - -[ "x" = "x$RABBITMQ_DIST_PORT" ] && RABBITMQ_DIST_PORT=${DIST_PORT} -[ "x" = "x$RABBITMQ_DIST_PORT" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_DIST_PORT=$((${DEFAULT_NODE_PORT} + 20000)) -[ "x" = "x$RABBITMQ_DIST_PORT" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_DIST_PORT=$((${RABBITMQ_NODE_PORT} + 20000)) - -[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} -[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} -[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE} -[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE} -[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE} -[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS} - -[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR} -[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME} - -[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${PID_FILE} -[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${RABBITMQ_MNESIA_DIR}.pid - -[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR} -[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand - -[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE} - -[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR} - -## Log rotation -[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS} -[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}.log" -[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS=${SASL_LOGS} -[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}-sasl.log" - -##--- End of overridden <var_name> variables - RABBITMQ_START_RABBIT= [ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT=" -noinput" [ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT="$RABBITMQ_START_RABBIT -s rabbit boot " @@ -131,6 +89,7 @@ exec ${ERL_DIR}erl \ ${RABBITMQ_CONFIG_ARG} \ +W w \ ${RABBITMQ_SERVER_ERL_ARGS} \ + ${RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS} \ ${RABBITMQ_LISTEN_ARG} \ -sasl errlog_type error \ -sasl sasl_error_logger false \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 043204fa..e2312406 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -147,6 +147,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( -kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
!RABBITMQ_SERVER_ERL_ARGS! ^
+!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
-sasl errlog_type error ^
-sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 895561d4..fb2703f2 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -235,6 +235,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^ -os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
!RABBITMQ_SERVER_START_ARGS! ^
+!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
!RABBITMQ_DIST_ARG! ^
!STARVAR!
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index 309abf2a..31fe0afc 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -19,20 +19,21 @@ # Non-empty defaults should be set in rabbitmq-env . `dirname $0`/rabbitmq-env -##--- Set environment vars RABBITMQ_<var_name> to defaults if not set - -[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} -[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS} - -##--- End of overridden <var_name> variables +# rabbitmqctl starts distribution itself, so we need to make sure epmd +# is running. +${ERL_DIR}erl -sname rabbitmqctl-prelaunch-$$ -noinput -eval 'erlang:halt().' +# We specify Mnesia dir and sasl error logger since some actions +# (e.g. forget_cluster_node --offline) require us to impersonate the +# real node. exec ${ERL_DIR}erl \ -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ -hidden \ ${RABBITMQ_CTL_ERL_ARGS} \ - -sname rabbitmqctl$$ \ -boot "${CLEAN_BOOT_FILE}" \ + -sasl errlog_type error \ + -mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \ -s rabbit_control_main \ -nodename $RABBITMQ_NODENAME \ -extra "$@" diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index 8e8ba1bd..7a2c454c 100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -23,6 +23,10 @@ set TDP0=%~dp0 set STAR=%*
setlocal enabledelayedexpansion
+if "!RABBITMQ_BASE!"=="" (
+ set RABBITMQ_BASE=!APPDATA!\RabbitMQ
+)
+
if "!COMPUTERNAME!"=="" (
set COMPUTERNAME=localhost
)
@@ -31,6 +35,14 @@ if "!RABBITMQ_NODENAME!"=="" ( set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
+if "!RABBITMQ_MNESIA_BASE!"=="" (
+ set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE!/db
+)
+
+if "!RABBITMQ_MNESIA_DIR!"=="" (
+ set RABBITMQ_MNESIA_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-mnesia
+)
+
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
@@ -43,7 +55,20 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( exit /B
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM!!TIME:~9! -s rabbit_control_main -nodename !RABBITMQ_NODENAME! -extra !STAR!
+rem rabbitmqctl starts distribution itself, so we need to make sure epmd
+rem is running.
+"!ERLANG_HOME!\bin\erl.exe" -sname rabbitmqctl-prelaunch-!RANDOM!!TIME:~9! -noinput -eval "erlang:halt()."
+
+"!ERLANG_HOME!\bin\erl.exe" ^
+-pa "!TDP0!..\ebin" ^
+-noinput ^
+-hidden ^
+!RABBITMQ_CTL_ERL_ARGS! ^
+-sasl errlog_type error ^
+-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
+-s rabbit_control_main ^
+-nodename !RABBITMQ_NODENAME! ^
+-extra !STAR!
endlocal
endlocal
diff --git a/src/app_utils.erl b/src/app_utils.erl index 0479ce66..87e6fa0b 100644 --- a/src/app_utils.erl +++ b/src/app_utils.erl @@ -17,7 +17,7 @@ -export([load_applications/1, start_applications/1, start_applications/2, stop_applications/1, stop_applications/2, app_dependency_order/2, - wait_for_applications/1]). + app_dependencies/1]). -ifdef(use_specs). @@ -28,8 +28,8 @@ -spec stop_applications([atom()]) -> 'ok'. -spec start_applications([atom()], error_handler()) -> 'ok'. -spec stop_applications([atom()], error_handler()) -> 'ok'. --spec wait_for_applications([atom()]) -> 'ok'. -spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()]. +-spec app_dependencies(atom()) -> [atom()]. -endif. @@ -68,14 +68,10 @@ stop_applications(Apps, ErrorHandler) -> ErrorHandler, Apps). - -wait_for_applications(Apps) -> - [wait_for_application(App) || App <- Apps], ok. - app_dependency_order(RootApps, StripUnreachable) -> {ok, G} = rabbit_misc:build_acyclic_graph( - fun (App, _Deps) -> [{App, App}] end, - fun (App, Deps) -> [{Dep, App} || Dep <- Deps] end, + fun ({App, _Deps}) -> [{App, App}] end, + fun ({App, Deps}) -> [{Dep, App} || Dep <- Deps] end, [{App, app_dependencies(App)} || {App, _Desc, _Vsn} <- application:loaded_applications()]), try @@ -92,13 +88,6 @@ app_dependency_order(RootApps, StripUnreachable) -> %%--------------------------------------------------------------------------- %% Private API -wait_for_application(Application) -> - case lists:keymember(Application, 1, rabbit_misc:which_applications()) of - true -> ok; - false -> timer:sleep(1000), - wait_for_application(Application) - end. - load_applications(Worklist, Loaded) -> case queue:out(Worklist) of {empty, _WorkList} -> @@ -388,6 +388,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_info/3]). +%% For INSTR_MOD callbacks +-export([call/3, cast/2, monitor/1, demonitor/1]). + -ifndef(use_specs). -export([behaviour_info/1]). -endif. @@ -616,6 +619,16 @@ handle_call({add_on_right, NewMember}, _From, members_state = MembersState1 }), handle_callback_result({Result, {ok, Group}, State1}). +%% add_on_right causes a catchup to be sent immediately from the left, +%% so we can never see this from the left neighbour. However, it's +%% possible for the right neighbour to send us a check_neighbours +%% immediately before that. We can't possibly handle it, but if we're +%% in this state we know a catchup is coming imminently anyway. So +%% just ignore it. +handle_cast({?TAG, _ReqVer, check_neighbours}, + State = #state { members_state = undefined }) -> + noreply(State); + handle_cast({?TAG, ReqVer, Msg}, State = #state { view = View, members_state = MembersState, @@ -1177,8 +1190,8 @@ can_erase_view_member(Self, Self, _LA, _LP) -> false; can_erase_view_member(_Self, _Id, N, N) -> true; can_erase_view_member(_Self, _Id, _LA, _LP) -> false. -neighbour_cast(N, Msg) -> gen_server2:cast(get_pid(N), Msg). -neighbour_call(N, Msg) -> gen_server2:call(get_pid(N), Msg, infinity). +neighbour_cast(N, Msg) -> ?INSTR_MOD:cast(get_pid(N), Msg). +neighbour_call(N, Msg) -> ?INSTR_MOD:call(get_pid(N), Msg, infinity). %% --------------------------------------------------------------------------- %% View monitoring and maintanence @@ -1192,7 +1205,7 @@ ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) -> ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) -> {RealNeighbour, MRef}; ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> - true = erlang:demonitor(MRef), + true = ?INSTR_MOD:demonitor(MRef), Msg = {?TAG, Ver, check_neighbours}, ok = neighbour_cast(RealNeighbour, Msg), ok = case Neighbour of @@ -1202,7 +1215,7 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> {Neighbour, maybe_monitor(Neighbour, Self)}. maybe_monitor( Self, Self) -> undefined; -maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)). +maybe_monitor(Other, _Self) -> ?INSTR_MOD:monitor(get_pid(Other)). check_neighbours(State = #state { self = Self, left = Left, @@ -1461,3 +1474,12 @@ last_pub( [], LP) -> LP; last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List), true = PubNum > LP, %% ASSERTION PubNum. + +%% --------------------------------------------------------------------------- + +%% Uninstrumented versions + +call(Pid, Msg, Timeout) -> gen_server2:call(Pid, Msg, Timeout). +cast(Pid, Msg) -> gen_server2:cast(Pid, Msg). +monitor(Pid) -> erlang:monitor(process, Pid). +demonitor(MRef) -> erlang:demonitor(MRef). diff --git a/src/gm_qc.erl b/src/gm_qc.erl new file mode 100644 index 00000000..394cbcbd --- /dev/null +++ b/src/gm_qc.erl @@ -0,0 +1,384 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(gm_qc). +-ifdef(use_proper_qc). + +-include_lib("proper/include/proper.hrl"). + +-define(GROUP, test_group). +-define(MAX_SIZE, 5). +-define(MSG_TIMEOUT, 1000000). %% micros + +-export([prop_gm_test/0]). + +-behaviour(proper_statem). +-export([initial_state/0, command/1, precondition/2, postcondition/3, + next_state/3]). + +-behaviour(gm). +-export([joined/2, members_changed/3, handle_msg/3, terminate/2]). + +%% Helpers +-export([do_join/0, do_leave/1, do_send/1, do_proceed1/1, do_proceed2/2]). + +%% For insertion into gm +-export([call/3, cast/2, monitor/1, demonitor/1, execute_mnesia_transaction/1]). + +-record(state, {seq, %% symbolic and dynamic + instrumented, %% dynamic only + outstanding, %% dynamic only + monitors, %% dynamic only + all_join, %% for symbolic + to_join, %% dynamic only + to_leave %% for symbolic + }). + +prop_gm_test() -> + case ?INSTR_MOD of + ?MODULE -> ok; + _ -> exit(compile_with_INSTRUMENT_FOR_QC) + end, + process_flag(trap_exit, true), + erlang:register(?MODULE, self()), + ?FORALL(Cmds, commands(?MODULE), gm_test(Cmds)). + +gm_test(Cmds) -> + {_H, State, Res} = run_commands(?MODULE, Cmds), + cleanup(State), + ?WHENFAIL( + io:format("Result: ~p~n", [Res]), + aggregate(command_names(Cmds), Res =:= ok)). + +cleanup(S) -> + S2 = ensure_joiners_joined_and_msgs_received(S), + All = gms_joined(S2), + All = gms(S2), %% assertion - none to join + check_stale_members(All), + [gm:leave(GM) || GM <- All], + drain_and_proceed_gms(S2), + [await_death(GM) || GM <- All], + gm:forget_group(?GROUP), + ok. + +check_stale_members(All) -> + GMs = [P || P <- processes(), is_gm_process(?GROUP, P)], + case GMs -- All of + [] -> ok; + Rest -> exit({forgot, Rest}) + end. + +is_gm_process(Group, P) -> + case process_info(P, dictionary) of + undefined -> false; + {dictionary, D} -> {gm, Group} =:= proplists:get_value(process_name, D) + end. + +await_death(P) -> + MRef = erlang:monitor(process, P), + await_death(MRef, P). + +await_death(MRef, P) -> + receive + {'DOWN', MRef, process, P, _} -> ok; + {'DOWN', _, _, _, _} -> await_death(MRef, P); + {'EXIT', _, normal} -> await_death(MRef, P); + {'EXIT', _, Reason} -> exit(Reason); + {joined, _GM} -> await_death(MRef, P); + {left, _GM} -> await_death(MRef, P); + Anything -> exit({stray_msg, Anything}) + end. + +%% --------------------------------------------------------------------------- +%% proper_statem +%% --------------------------------------------------------------------------- + +initial_state() -> #state{seq = 1, + outstanding = dict:new(), + instrumented = dict:new(), + monitors = dict:new(), + all_join = sets:new(), + to_join = sets:new(), + to_leave = sets:new()}. + +command(S) -> + case {length(gms_symb_not_left(S)), length(gms_symb(S))} of + {0, 0} -> qc_join(S); + {0, _} -> frequency([{1, qc_join(S)}, + {3, qc_proceed1(S)}, + {5, qc_proceed2(S)}]); + _ -> frequency([{1, qc_join(S)}, + {1, qc_leave(S)}, + {10, qc_send(S)}, + {5, qc_proceed1(S)}, + {15, qc_proceed2(S)}]) + end. + +qc_join(_S) -> {call,?MODULE,do_join, []}. +qc_leave(S) -> {call,?MODULE,do_leave,[oneof(gms_symb_not_left(S))]}. +qc_send(S) -> {call,?MODULE,do_send, [oneof(gms_symb_not_left(S))]}. +qc_proceed1(S) -> {call,?MODULE,do_proceed1, [oneof(gms_symb(S))]}. +qc_proceed2(S) -> {call,?MODULE,do_proceed2, [oneof(gms_symb(S)), + oneof(gms_symb(S))]}. + +precondition(S, {call, ?MODULE, do_join, []}) -> + length(gms_symb(S)) < ?MAX_SIZE; + +precondition(_S, {call, ?MODULE, do_leave, [_GM]}) -> + true; + +precondition(_S, {call, ?MODULE, do_send, [_GM]}) -> + true; + +precondition(_S, {call, ?MODULE, do_proceed1, [_GM]}) -> + true; + +precondition(_S, {call, ?MODULE, do_proceed2, [GM1, GM2]}) -> + GM1 =/= GM2. + +postcondition(_S, {call, _M, _F, _A}, _Res) -> + true. + +next_state(S = #state{to_join = ToSet, + all_join = AllSet}, GM, {call, ?MODULE, do_join, []}) -> + S#state{to_join = sets:add_element(GM, ToSet), + all_join = sets:add_element(GM, AllSet)}; + +next_state(S = #state{to_leave = Set}, _Res, {call, ?MODULE, do_leave, [GM]}) -> + S#state{to_leave = sets:add_element(GM, Set)}; + +next_state(S = #state{seq = Seq, + outstanding = Outstanding}, _Res, + {call, ?MODULE, do_send, [GM]}) -> + case is_pid(GM) andalso lists:member(GM, gms_joined(S)) of + true -> + %% Dynamic state, i.e. runtime + Msg = [{sequence, Seq}, + {sent_to, GM}, + {dests, gms_joined(S)}], + gm:broadcast(GM, Msg), + Outstanding1 = dict:map( + fun (_GM, Set) -> + gb_sets:add_element(Msg, Set) + end, Outstanding), + drain(S#state{seq = Seq + 1, + outstanding = Outstanding1}); + false -> + S + end; + +next_state(S, _Res, {call, ?MODULE, do_proceed1, [Pid]}) -> + proceed(Pid, S); + +next_state(S, _Res, {call, ?MODULE, do_proceed2, [From, To]}) -> + proceed({From, To}, S). + +proceed(K, S = #state{instrumented = Msgs}) -> + case dict:find(K, Msgs) of + {ok, Q} -> case queue:out(Q) of + {{value, Thing}, Q2} -> + S2 = proceed(K, Thing, S), + S2#state{instrumented = dict:store(K, Q2, Msgs)}; + {empty, _} -> + S + end; + error -> S + end. + +%% --------------------------------------------------------------------------- +%% GM +%% --------------------------------------------------------------------------- + +joined(Pid, _Members) -> Pid ! {joined, self()}, + ok. +members_changed(_Pid, _Bs, _Ds) -> ok. +handle_msg(Pid, _From, Msg) -> Pid ! {gm, self(), Msg}, ok. +terminate(Pid, _Reason) -> Pid ! {left, self()}. + +%% --------------------------------------------------------------------------- +%% Helpers +%% --------------------------------------------------------------------------- + +do_join() -> + {ok, GM} = gm:start_link(?GROUP, ?MODULE, self(), + fun execute_mnesia_transaction/1), + GM. + +do_leave(GM) -> + gm:leave(GM), + GM. + +%% We need to update the state, so do the work in next_state +do_send( _GM) -> ok. +do_proceed1(_Pid) -> ok. +do_proceed2(_From, _To) -> ok. + +%% All GMs, joined and to join +gms(#state{outstanding = Outstanding, + to_join = ToJoin}) -> + dict:fetch_keys(Outstanding) ++ sets:to_list(ToJoin). + +%% All GMs, joined and to join +gms_joined(#state{outstanding = Outstanding}) -> + dict:fetch_keys(Outstanding). + +%% All GMs including those that have left (symbolic) +gms_symb(#state{all_join = AllJoin}) -> + sets:to_list(AllJoin). + +%% All GMs not including those that have left (symbolic) +gms_symb_not_left(#state{all_join = AllJoin, + to_leave = ToLeave}) -> + sets:to_list(sets:subtract(AllJoin, ToLeave)). + +drain(S) -> + receive + Msg -> drain(handle_msg(Msg, S)) + after 10 -> S + end. + +drain_and_proceed_gms(S0) -> + S = #state{instrumented = Msgs} = drain(S0), + case dict:size(Msgs) of + 0 -> S; + _ -> S1 = dict:fold( + fun (Key, Q, Si) -> + lists:foldl( + fun (Msg, Sij) -> + proceed(Key, Msg, Sij) + end, Si, queue:to_list(Q)) + end, S, Msgs), + drain_and_proceed_gms(S1#state{instrumented = dict:new()}) + end. + +handle_msg({gm, GM, Msg}, S = #state{outstanding = Outstanding}) -> + case dict:find(GM, Outstanding) of + {ok, Set} -> + Set2 = gb_sets:del_element(Msg, Set), + S#state{outstanding = dict:store(GM, Set2, Outstanding)}; + error -> + %% Message from GM that has already died. OK. + S + end; +handle_msg({instrumented, Key, Thing}, S = #state{instrumented = Msgs}) -> + Q1 = case dict:find(Key, Msgs) of + {ok, Q} -> queue:in(Thing, Q); + error -> queue:from_list([Thing]) + end, + S#state{instrumented = dict:store(Key, Q1, Msgs)}; +handle_msg({joined, GM}, S = #state{outstanding = Outstanding, + to_join = ToJoin}) -> + S#state{outstanding = dict:store(GM, gb_sets:empty(), Outstanding), + to_join = sets:del_element(GM, ToJoin)}; +handle_msg({left, GM}, S = #state{outstanding = Outstanding, + to_join = ToJoin}) -> + true = dict:is_key(GM, Outstanding) orelse sets:is_element(GM, ToJoin), + S#state{outstanding = dict:erase(GM, Outstanding), + to_join = sets:del_element(GM, ToJoin)}; +handle_msg({'DOWN', MRef, _, From, _} = Msg, S = #state{monitors = Mons}) -> + To = dict:fetch(MRef, Mons), + handle_msg({instrumented, {From, To}, {info, Msg}}, + S#state{monitors = dict:erase(MRef, Mons)}); +handle_msg({'EXIT', _From, normal}, S) -> + S; +handle_msg({'EXIT', _From, Reason}, _S) -> + %% We just trapped exits to get nicer SASL logging. + exit(Reason). + +proceed({_From, To}, {cast, Msg}, S) -> gen_server2:cast(To, Msg), S; +proceed({_From, To}, {info, Msg}, S) -> To ! Msg, S; +proceed({From, _To}, {wait, Ref}, S) -> From ! {proceed, Ref}, S; +proceed({From, To}, {mon, Ref}, S) -> add_monitor(From, To, Ref, S); +proceed(_Pid, {demon, MRef}, S) -> erlang:demonitor(MRef), S; +proceed(Pid, {wait, Ref}, S) -> Pid ! {proceed, Ref}, S. + +%% NB From here is To in handle_msg/DOWN above, since the msg is going +%% the other way +add_monitor(From, To, Ref, S = #state{monitors = Mons}) -> + MRef = erlang:monitor(process, To), + From ! {mref, Ref, MRef}, + S#state{monitors = dict:store(MRef, From, Mons)}. + +%% ---------------------------------------------------------------------------- +%% Assertions +%% ---------------------------------------------------------------------------- + +ensure_joiners_joined_and_msgs_received(S0) -> + S = drain_and_proceed_gms(S0), + case outstanding_joiners(S) of + true -> ensure_joiners_joined_and_msgs_received(S); + false -> case outstanding_msgs(S) of + [] -> S; + Out -> exit({outstanding_msgs, Out}) + end + end. + +outstanding_joiners(#state{to_join = ToJoin}) -> + sets:size(ToJoin) > 0. + +outstanding_msgs(#state{outstanding = Outstanding}) -> + dict:fold(fun (GM, Set, OS) -> + case gb_sets:is_empty(Set) of + true -> OS; + false -> [{GM, gb_sets:to_list(Set)} | OS] + end + end, [], Outstanding). + +%% --------------------------------------------------------------------------- +%% For insertion into GM +%% --------------------------------------------------------------------------- + +call(Pid, Msg, infinity) -> + Ref = make_ref(), + whereis(?MODULE) ! {instrumented, {self(), Pid}, {wait, Ref}}, + receive + {proceed, Ref} -> ok + end, + gen_server2:call(Pid, Msg, infinity). + +cast(Pid, Msg) -> + whereis(?MODULE) ! {instrumented, {self(), Pid}, {cast, Msg}}, + ok. + +monitor(Pid) -> + Ref = make_ref(), + whereis(?MODULE) ! {instrumented, {self(), Pid}, {mon, Ref}}, + receive + {mref, Ref, MRef} -> MRef + end. + +demonitor(MRef) -> + whereis(?MODULE) ! {instrumented, self(), {demon, MRef}}, + true. + +execute_mnesia_transaction(Fun) -> + Ref = make_ref(), + whereis(?MODULE) ! {instrumented, self(), {wait, Ref}}, + receive + {proceed, Ref} -> ok + end, + rabbit_misc:execute_mnesia_transaction(Fun). + +-else. + +-export([prop_disabled/0]). + +prop_disabled() -> + exit({compiled_without_proper, + "PropEr was not present during compilation of the test module. " + "Hence all tests are disabled."}). + +-endif. diff --git a/src/pg_local.erl b/src/pg_local.erl index f535b136..4d9914d9 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -34,7 +34,7 @@ %% -module(pg_local). --export([join/2, leave/2, get_members/1]). +-export([join/2, leave/2, get_members/1, in_group/2]). -export([sync/0]). %% intended for testing only; not part of official API -export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -50,6 +50,7 @@ -spec(join/2 :: (name(), pid()) -> 'ok'). -spec(leave/2 :: (name(), pid()) -> 'ok'). -spec(get_members/1 :: (name()) -> [pid()]). +-spec(in_group/2 :: (name(), pid()) -> boolean()). -spec(sync/0 :: () -> 'ok'). @@ -81,6 +82,16 @@ get_members(Name) -> ensure_started(), group_members(Name). +in_group(Name, Pid) -> + ensure_started(), + %% The join message is a cast and thus can race, but we want to + %% keep it that way to be fast in the common case. + case member_present(Name, Pid) of + true -> true; + false -> sync(), + member_present(Name, Pid) + end. + sync() -> ensure_started(), gen_server:call(?MODULE, sync, infinity). @@ -199,6 +210,12 @@ member_in_group(Pid, Name) -> [{{member, Name, Pid}, N}] = ets:lookup(pg_local_table, {member, Name, Pid}), lists:duplicate(N, Pid). +member_present(Name, Pid) -> + case ets:lookup(pg_local_table, {member, Name, Pid}) of + [_] -> true; + [] -> false + end. + member_groups(Pid) -> [Name || [Name] <- ets:match(pg_local_table, {{pid, Pid, '$1'}})]. diff --git a/src/rabbit.erl b/src/rabbit.erl index 29e38c1f..b00a1ad7 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -22,10 +22,9 @@ stop_and_halt/0, await_startup/0, status/0, is_running/0, is_running/1, environment/0, rotate_logs/1, force_event_refresh/1, start_fhc/0]). - -export([start/2, stop/1]). - --export([log_location/1]). %% for testing +-export([start_apps/1, stop_apps/1]). +-export([log_location/1, config_files/0]). %% for testing and mgmt-agent %%--------------------------------------------------------------------------- %% Boot steps. @@ -75,13 +74,6 @@ {requires, external_infrastructure}, {enables, kernel_ready}]}). --rabbit_boot_step({rabbit_log, - [{description, "logging server"}, - {mfa, {rabbit_sup, start_restartable_child, - [rabbit_log]}}, - {requires, external_infrastructure}, - {enables, kernel_ready}]}). - -rabbit_boot_step({rabbit_event, [{description, "statistics event manager"}, {mfa, {rabbit_sup, start_restartable_child, @@ -202,6 +194,7 @@ %% practice 2 processes seems just as fast as any other number > 1, %% and keeps the progress bar realistic-ish. -define(HIPE_PROCESSES, 2). +-define(ASYNC_THREADS_WARNING_THRESHOLD, 8). %%---------------------------------------------------------------------------- @@ -211,6 +204,7 @@ %% this really should be an abstract type -type(log_location() :: 'tty' | 'undefined' | file:filename()). -type(param() :: atom()). +-type(app_name() :: atom()). -spec(start/0 :: () -> 'ok'). -spec(boot/0 :: () -> 'ok'). @@ -242,6 +236,8 @@ -spec(maybe_insert_default_data/0 :: () -> 'ok'). -spec(boot_delegate/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). +-spec(start_apps/1 :: ([app_name()]) -> 'ok'). +-spec(stop_apps/1 :: ([app_name()]) -> 'ok'). -endif. @@ -263,7 +259,7 @@ maybe_hipe_compile() -> warn_if_hipe_compilation_failed(true) -> ok; warn_if_hipe_compilation_failed(false) -> - error_logger:warning_msg( + rabbit_log:warning( "Not HiPE compiling: HiPE not found in this Erlang installation.~n"). %% HiPE compilation happens before we have log handlers and can take a @@ -312,9 +308,7 @@ start() -> ok = ensure_working_log_handlers(), rabbit_node_monitor:prepare_cluster_status_files(), rabbit_mnesia:check_cluster_consistency(), - ok = app_utils:start_applications( - app_startup_order(), fun handle_app_error/2), - ok = log_broker_started(rabbit_plugins:active()) + broker_start() end). boot() -> @@ -329,21 +323,14 @@ boot() -> %% the upgrade, since if we are a secondary node the %% primary node will have forgotten us rabbit_mnesia:check_cluster_consistency(), - Plugins = rabbit_plugins:setup(), - ToBeLoaded = Plugins ++ ?APPS, - ok = app_utils:load_applications(ToBeLoaded), - StartupApps = app_utils:app_dependency_order(ToBeLoaded, - false), - ok = app_utils:start_applications( - StartupApps, fun handle_app_error/2), - ok = log_broker_started(Plugins) + broker_start() end). -handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) -> - throw({could_not_start, App, Reason}); - -handle_app_error(App, Reason) -> - throw({could_not_start, App, Reason}). +broker_start() -> + Plugins = rabbit_plugins:setup(), + ToBeLoaded = Plugins ++ ?APPS, + start_apps(ToBeLoaded), + ok = log_broker_started(rabbit_plugins:active()). start_it(StartFun) -> Marker = spawn_link(fun() -> receive stop -> ok end end), @@ -371,22 +358,66 @@ start_it(StartFun) -> stop() -> case whereis(rabbit_boot) of undefined -> ok; - _ -> await_startup() + _ -> await_startup(true) end, - rabbit_log:info("Stopping RabbitMQ~n"), - ok = app_utils:stop_applications(app_shutdown_order()). + rabbit_log:info("Stopping RabbitMQ~n", []), + Apps = ?APPS ++ rabbit_plugins:active(), + stop_apps(app_utils:app_dependency_order(Apps, true)). stop_and_halt() -> try stop() after - rabbit_misc:local_info_msg("Halting Erlang VM~n", []), + rabbit_log:info("Halting Erlang VM~n", []), init:stop() end, ok. +start_apps(Apps) -> + app_utils:load_applications(Apps), + OrderedApps = app_utils:app_dependency_order(Apps, false), + case lists:member(rabbit, Apps) of + false -> run_boot_steps(Apps); %% plugin activation + true -> ok %% will run during start of rabbit app + end, + ok = app_utils:start_applications(OrderedApps, + handle_app_error(could_not_start)). + +stop_apps(Apps) -> + ok = app_utils:stop_applications( + Apps, handle_app_error(error_during_shutdown)), + case lists:member(rabbit, Apps) of + false -> run_cleanup_steps(Apps); %% plugin deactivation + true -> ok %% it's all going anyway + end, + ok. + +handle_app_error(Term) -> + fun(App, {bad_return, {_MFA, {'EXIT', {ExitReason, _}}}}) -> + throw({Term, App, ExitReason}); + (App, Reason) -> + throw({Term, App, Reason}) + end. + +run_cleanup_steps(Apps) -> + [run_step(Name, Attrs, cleanup) || {_, Name, Attrs} <- find_steps(Apps)], + ok. + await_startup() -> - app_utils:wait_for_applications(app_startup_order()). + await_startup(false). + +await_startup(HaveSeenRabbitBoot) -> + %% We don't take absence of rabbit_boot as evidence we've started, + %% since there's a small window before it is registered. + case whereis(rabbit_boot) of + undefined -> case HaveSeenRabbitBoot orelse is_running() of + true -> ok; + false -> timer:sleep(100), + await_startup(false) + end; + _ -> timer:sleep(100), + await_startup(true) + end. status() -> S1 = [{pid, list_to_integer(os:getpid())}, @@ -437,6 +468,9 @@ listeners() -> ip_address = IP, port = Port} <- Listeners, Node =:= node()]. +%% TODO this only determines if the rabbit application has started, +%% not if it is running, never mind plugins. It would be nice to have +%% more nuance here. is_running() -> is_running(node()). is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit). @@ -447,7 +481,7 @@ environment() -> rotate_logs(BinarySuffix) -> Suffix = binary_to_list(BinarySuffix), - rabbit_misc:local_info_msg("Rotating logs with suffix '~s'~n", [Suffix]), + rabbit_log:info("Rotating logs with suffix '~s'~n", [Suffix]), log_rotation_result(rotate_logs(log_location(kernel), Suffix, rabbit_error_logger_file_h), @@ -461,14 +495,15 @@ start(normal, []) -> case erts_version_check() of ok -> {ok, Vsn} = application:get_key(rabbit, vsn), - error_logger:info_msg("Starting RabbitMQ ~s on Erlang ~s~n~s~n~s~n", - [Vsn, erlang:system_info(otp_release), - ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), + rabbit_log:info("Starting RabbitMQ ~s on Erlang ~s~n~s~n~s~n", + [Vsn, erlang:system_info(otp_release), + ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), {ok, SupPid} = rabbit_sup:start_link(), true = register(rabbit, self()), print_banner(), log_banner(), - [ok = run_boot_step(Step) || Step <- boot_steps()], + warn_if_kernel_config_dubious(), + run_boot_steps(), {ok, SupPid}; Error -> Error @@ -483,42 +518,42 @@ stop(_State) -> ok. %%--------------------------------------------------------------------------- -%% application life cycle +%% boot step logic -app_startup_order() -> - ok = app_utils:load_applications(?APPS), - app_utils:app_dependency_order(?APPS, false). +run_boot_steps() -> + run_boot_steps([App || {App, _, _} <- application:loaded_applications()]). -app_shutdown_order() -> - Apps = ?APPS ++ rabbit_plugins:active(), - app_utils:app_dependency_order(Apps, true). +run_boot_steps(Apps) -> + [ok = run_step(Step, Attrs, mfa) || {_, Step, Attrs} <- find_steps(Apps)], + ok. -%%--------------------------------------------------------------------------- -%% boot step logic +find_steps(Apps) -> + All = sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)), + [Step || {App, _, _} = Step <- All, lists:member(App, Apps)]. -run_boot_step({_StepName, Attributes}) -> - case [MFA || {mfa, MFA} <- Attributes] of +run_step(StepName, Attributes, AttributeName) -> + case [MFA || {Key, MFA} <- Attributes, + Key =:= AttributeName] of [] -> ok; MFAs -> [try apply(M,F,A) of - ok -> ok; - {error, Reason} -> boot_error(Reason, not_available) + ok -> ok; + {error, Reason} -> boot_error({boot_step, StepName, Reason}, + not_available) catch - _:Reason -> boot_error(Reason, erlang:get_stacktrace()) + _:Reason -> boot_error({boot_step, StepName, Reason}, + erlang:get_stacktrace()) end || {M,F,A} <- MFAs], ok end. -boot_steps() -> - sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)). - -vertices(_Module, Steps) -> - [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps]. +vertices({AppName, _Module, Steps}) -> + [{StepName, {AppName, StepName, Atts}} || {StepName, Atts} <- Steps]. -edges(_Module, Steps) -> +edges({_AppName, _Module, Steps}) -> [case Key of requires -> {StepName, OtherStep}; enables -> {OtherStep, StepName} @@ -527,7 +562,7 @@ edges(_Module, Steps) -> Key =:= requires orelse Key =:= enables]. sort_boot_steps(UnsortedSteps) -> - case rabbit_misc:build_acyclic_graph(fun vertices/2, fun edges/2, + case rabbit_misc:build_acyclic_graph(fun vertices/1, fun edges/1, UnsortedSteps) of {ok, G} -> %% Use topological sort to find a consistent ordering (if @@ -541,8 +576,8 @@ sort_boot_steps(UnsortedSteps) -> digraph:delete(G), %% Check that all mentioned {M,F,A} triples are exported. case [{StepName, {M,F,A}} || - {StepName, Attributes} <- SortedSteps, - {mfa, {M,F,A}} <- Attributes, + {_App, StepName, Attributes} <- SortedSteps, + {mfa, {M,F,A}} <- Attributes, not erlang:function_exported(M, F, length(A))] of [] -> SortedSteps; MissingFunctions -> basic_boot_error( @@ -603,7 +638,7 @@ boot_error(Reason, Fmt, Args, Stacktrace) -> basic_boot_error(Reason, Format, Args) -> io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args), - rabbit_misc:local_info_msg(Format, Args), + rabbit_log:info(Format, Args), timer:sleep(1000), exit({?MODULE, failure_during_boot, Reason}). @@ -727,11 +762,11 @@ force_event_refresh(Ref) -> %% misc log_broker_started(Plugins) -> - rabbit_misc:with_local_io( + rabbit_log:with_local_io( fun() -> PluginList = iolist_to_binary([rabbit_misc:format(" * ~s~n", [P]) || P <- Plugins]), - error_logger:info_msg( + rabbit_log:info( "Server startup complete; ~b plugins started.~n~s", [length(Plugins), PluginList]), io:format(" completed with ~p plugins.~n", [length(Plugins)]) @@ -780,7 +815,31 @@ log_banner() -> {K, V} -> Format(K, V) end || S <- Settings]), - error_logger:info_msg("~s", [Banner]). + rabbit_log:info("~s", [Banner]). + +warn_if_kernel_config_dubious() -> + case erlang:system_info(kernel_poll) of + true -> ok; + false -> rabbit_log:warning( + "Kernel poll (epoll, kqueue, etc) is disabled. Throughput " + "and CPU utilization may worsen.~n") + end, + AsyncThreads = erlang:system_info(thread_pool_size), + case AsyncThreads < ?ASYNC_THREADS_WARNING_THRESHOLD of + true -> rabbit_log:warning( + "Erlang VM is running with ~b I/O threads, " + "file I/O performance may worsen~n", [AsyncThreads]); + false -> ok + end, + IDCOpts = case application:get_env(kernel, inet_default_connect_options) of + undefined -> []; + {ok, Val} -> Val + end, + case proplists:get_value(nodelay, IDCOpts, false) of + false -> rabbit_log:warning("Nagle's algorithm is enabled for sockets, " + "network I/O latency will be higher~n"); + true -> ok + end. home_dir() -> case init:get_argument(home) of diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index e45e026e..692179fc 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -18,7 +18,7 @@ -export([recover/0, stop/0, start/1, declare/5, declare/6, delete_immediately/1, delete/3, purge/1, forget_all_durable/1]). --export([pseudo_queue/2]). +-export([pseudo_queue/2, immutable/1]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, @@ -29,8 +29,8 @@ -export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). --export([on_node_down/1]). --export([update/2, store_queue/1, policy_changed/2]). +-export([on_node_up/1, on_node_down/1]). +-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]). @@ -174,9 +174,12 @@ (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()). +-spec(immutable/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). -spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(update_decorators/1 :: (name()) -> 'ok'). -spec(policy_changed/2 :: (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -spec(start_mirroring/1 :: (pid()) -> 'ok'). @@ -254,15 +257,17 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> %% effect) this might not be possible to satisfy. declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> ok = check_declare_arguments(QueueName, Args), - Q = rabbit_policy:set(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none, - slave_pids = [], - sync_slave_pids = [], - gm_pids = []}), + Q = rabbit_queue_decorator:set( + rabbit_policy:set(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + sync_slave_pids = [], + down_slave_nodes = [], + gm_pids = []})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity). @@ -308,12 +313,24 @@ store_queue(Q = #amqqueue{durable = true}) -> ok = mnesia:write(rabbit_durable_queue, Q#amqqueue{slave_pids = [], sync_slave_pids = [], - gm_pids = []}, write), - ok = mnesia:write(rabbit_queue, Q, write), - ok; + gm_pids = [], + decorators = undefined}, write), + store_queue_ram(Q); store_queue(Q = #amqqueue{durable = false}) -> - ok = mnesia:write(rabbit_queue, Q, write), - ok. + store_queue_ram(Q). + +store_queue_ram(Q) -> + ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q), write). + +update_decorators(Name) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:wread({rabbit_queue, Name}) of + [Q] -> store_queue_ram(Q), + ok; + [] -> ok + end + end). policy_changed(Q1 = #amqqueue{decorators = Decorators1}, Q2 = #amqqueue{decorators = Decorators2}) -> @@ -444,7 +461,8 @@ declare_args() -> [{<<"x-expires">>, fun check_expires_arg/2}, {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, - {<<"x-max-length">>, fun check_non_neg_int_arg/2}]. + {<<"x-max-length">>, fun check_non_neg_int_arg/2}, + {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}]. consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. @@ -650,15 +668,23 @@ forget_all_durable(Node) -> fun () -> Qs = mnesia:match_object(rabbit_durable_queue, #amqqueue{_ = '_'}, write), - [rabbit_binding:process_deletions( - internal_delete1(Name, true)) || - #amqqueue{name = Name, pid = Pid} = Q <- Qs, - node(Pid) =:= Node, - rabbit_policy:get(<<"ha-mode">>, Q) =:= undefined], + [forget_node_for_queue(Q) || #amqqueue{pid = Pid} = Q <- Qs, + node(Pid) =:= Node], ok end), ok. +forget_node_for_queue(#amqqueue{name = Name, + down_slave_nodes = []}) -> + %% No slaves to recover from, queue is gone + rabbit_binding:process_deletions(internal_delete1(Name, true)); + +forget_node_for_queue(Q = #amqqueue{down_slave_nodes = [H|T]}) -> + %% Promote a slave while down - it'll happily recover as a master + Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H), + down_slave_nodes = T}, + ok = mnesia:write(rabbit_durable_queue, Q1, write). + run_backing_queue(QPid, Mod, Fun) -> gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). @@ -674,6 +700,20 @@ stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring). sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors). +on_node_up(Node) -> + ok = rabbit_misc:execute_mnesia_transaction( + fun () -> + Qs = mnesia:match_object(rabbit_queue, + #amqqueue{_ = '_'}, write), + [case lists:member(Node, DSNs) of + true -> DSNs1 = DSNs -- [Node], + store_queue( + Q#amqqueue{down_slave_nodes = DSNs1}); + false -> ok + end || #amqqueue{down_slave_nodes = DSNs} = Q <- Qs], + ok + end). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = @@ -709,6 +749,14 @@ pseudo_queue(QueueName, Pid) -> pid = Pid, slave_pids = []}. +immutable(Q) -> Q#amqqueue{pid = none, + slave_pids = none, + sync_slave_pids = none, + down_slave_nodes = none, + gm_pids = none, + policy = none, + decorators = none}. + deliver([], _Delivery, _Flow) -> %% /dev/null optimisation []; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9b785303..db297c1d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -52,6 +52,7 @@ dlx, dlx_routing_key, max_length, + max_bytes, args_policy_version, status }). @@ -84,6 +85,7 @@ memory, slave_pids, synchronised_slave_pids, + down_slave_nodes, backing_queue_status, state ]). @@ -102,7 +104,8 @@ start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). -info_keys() -> ?INFO_KEYS. +info_keys() -> ?INFO_KEYS ++ rabbit_backing_queue:info_keys(). +statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys(). %%---------------------------------------------------------------------------- @@ -263,7 +266,8 @@ process_args_policy(State = #q{q = Q, {<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2}, {<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2}, {<<"message-ttl">>, fun res_min/2, fun init_ttl/2}, - {<<"max-length">>, fun res_min/2, fun init_max_length/2}], + {<<"max-length">>, fun res_min/2, fun init_max_length/2}, + {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}], drop_expired_msgs( lists:foldl(fun({Name, Resolve, Fun}, StateN) -> Fun(args_policy_lookup(Name, Resolve, Q), StateN) @@ -302,6 +306,10 @@ init_max_length(MaxLen, State) -> {_Dropped, State1} = maybe_drop_head(State#q{max_length = MaxLen}), State1. +init_max_bytes(MaxBytes, State) -> + {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}), + State1. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS, consumers = Consumers} = lists:foldl(fun (F, S) -> F(S) end, State, @@ -385,15 +393,13 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined, V when V > 0 -> V + 999; %% always fire later _ -> 0 end) div 1000, - TRef = erlang:send_after(After, self(), {drop_expired, Version}), + TRef = rabbit_misc:send_after(After, self(), {drop_expired, Version}), State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry}; ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef, ttl_timer_expiry = TExpiry}) when Expiry + 1000 < TExpiry -> - case erlang:cancel_timer(TRef) of - false -> State; - _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined}) - end; + rabbit_misc:cancel_timer(TRef), + ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined}); ensure_ttl_timer(_Expiry, State) -> State. @@ -543,34 +549,41 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, %% remains unchanged, or if the newly published message %% has no expiry and becomes the head of the queue then %% the call is unnecessary. - case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of + case {Dropped, QLen =:= 1, Props#message_properties.expiry} of {false, false, _} -> State4; {true, true, undefined} -> State4; {_, _, _} -> drop_expired_msgs(State4) end end. -maybe_drop_head(State = #q{max_length = undefined}) -> - {0, State}; -maybe_drop_head(State = #q{max_length = MaxLen, - backing_queue = BQ, - backing_queue_state = BQS}) -> - case BQ:len(BQS) - MaxLen of - Excess when Excess > 0 -> - {Excess, - with_dlx( - State#q.dlx, - fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end, - fun () -> - {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) -> - BQ:drop(false, BQS0) - end, {ok, BQS}, - lists:seq(1, Excess)), - State#q{backing_queue_state = BQS1} - end)}; - _ -> {0, State} +maybe_drop_head(State = #q{max_length = undefined, + max_bytes = undefined}) -> + {false, State}; +maybe_drop_head(State) -> + maybe_drop_head(false, State). + +maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + case over_max_length(State) of + true -> + maybe_drop_head(true, + with_dlx( + State#q.dlx, + fun (X) -> dead_letter_maxlen_msg(X, State) end, + fun () -> + {_, BQS1} = BQ:drop(false, BQS), + State#q{backing_queue_state = BQS1} + end)); + false -> + {AlreadyDropped, State} end. +over_max_length(#q{max_length = MaxLen, + max_bytes = MaxBytes, + backing_queue = BQ, + backing_queue_state = BQS}) -> + BQ:len(BQS) > MaxLen orelse BQ:info(message_bytes_ready, BQS) > MaxBytes. + requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> WasEmpty = BQ:is_empty(BQS), @@ -664,9 +677,12 @@ subtract_acks(ChPid, AckTags, State = #q{consumers = Consumers}, Fun) -> run_message_queue(true, Fun(State1)) end. -message_properties(Message, Confirm, #q{ttl = TTL}) -> +message_properties(Message = #basic_message{content = Content}, + Confirm, #q{ttl = TTL}) -> + #content{payload_fragments_rev = PFR} = Content, #message_properties{expiry = calculate_msg_expiry(Message, TTL), - needs_confirming = Confirm == eventually}. + needs_confirming = Confirm == eventually, + size = iolist_size(PFR)}. calculate_msg_expiry(#basic_message{content = Content}, TTL) -> #content{properties = Props} = @@ -723,15 +739,12 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) -> end, rejected, X, State), State1. -dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) -> +dead_letter_maxlen_msg(X, State = #q{backing_queue = BQ}) -> {ok, State1} = dead_letter_msgs( fun (DLFun, Acc, BQS) -> - lists:foldl(fun (_, {ok, Acc0, BQS0}) -> - {{Msg, _, AckTag}, BQS1} = - BQ:fetch(true, BQS0), - {ok, DLFun(Msg, AckTag, Acc0), BQS1} - end, {ok, Acc, BQS}, lists:seq(1, Excess)) + {{Msg, _, AckTag}, BQS1} = BQ:fetch(true, BQS), + {ok, DLFun(Msg, AckTag, Acc), BQS1} end, maxlen, X, State), State1. @@ -810,19 +823,25 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> false -> ''; true -> SSPids end; +i(down_slave_nodes, #q{q = #amqqueue{name = Name, + durable = Durable}}) -> + {ok, Q = #amqqueue{down_slave_nodes = Nodes}} = + rabbit_amqqueue:lookup(Name), + case Durable andalso rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> ''; + true -> Nodes + end; i(state, #q{status = running}) -> credit_flow:state(); i(state, #q{status = State}) -> State; -i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> - BQ:status(BQS); -i(Item, _) -> - throw({bad_argument, Item}). +i(Item, #q{backing_queue_state = BQS, backing_queue = BQ}) -> + BQ:info(Item, BQS). emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> ExtraKs = [K || {K, _} <- Extra], - Infos = [{K, V} || {K, V} <- infos(?STATISTICS_KEYS, State), + Infos = [{K, V} || {K, V} <- infos(statistics_keys(), State), not lists:member(K, ExtraKs)], rabbit_event:notify(queue_stats, Extra ++ Infos). @@ -922,7 +941,7 @@ handle_call({init, Recover}, From, end; handle_call(info, _From, State) -> - reply(infos(?INFO_KEYS, State), State); + reply(infos(info_keys(), State), State); handle_call({info, Items}, _From, State) -> try @@ -1165,7 +1184,7 @@ handle_cast({force_event_refresh, Ref}, emit_consumer_created( Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref) end, - noreply(State); + noreply(rabbit_event:init_stats_timer(State, #q.stats_timer)); handle_cast(notify_decorators, State) -> notify_decorators(State), diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 8f37bf60..098f5f43 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -16,6 +16,14 @@ -module(rabbit_backing_queue). +-export([info_keys/0]). + +-define(INFO_KEYS, [messages_ram, messages_ready_ram, + messages_unacknowledged_ram, messages_persistent, + message_bytes, message_bytes_ready, + message_bytes_unacknowledged, message_bytes_ram, + message_bytes_persistent, backing_queue_status]). + -ifdef(use_specs). %% We can't specify a per-queue ack/state with callback signatures @@ -37,6 +45,8 @@ -type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)). -type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())). +-spec(info_keys/0 :: () -> rabbit_types:info_keys()). + %% Called on startup with a list of durable queue names. The queues %% aren't being started at this point, but this call allows the %% backing queue to perform any checking necessary for the consistency @@ -216,9 +226,7 @@ %% inbound messages and outbound messages at the moment. -callback msg_rates(state()) -> {float(), float()}. -%% Exists for debugging purposes, to be able to expose state via -%% rabbitmqctl list_queues backing_queue_status --callback status(state()) -> [{atom(), any()}]. +-callback info(atom(), state()) -> any(). %% Passed a function to be invoked with the relevant backing queue's %% state. Useful for when the backing queue or other components need @@ -243,9 +251,11 @@ behaviour_info(callbacks) -> {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, - {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, {status, 1}, - {invoke, 3}, {is_duplicate, 2}] ; + {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, {info_keys, 0}, + {infos, 2}, {invoke, 3}, {is_duplicate, 2}] ; behaviour_info(_Other) -> undefined. -endif. + +info_keys() -> ?INFO_KEYS. diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 49b71122..622b1b16 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -116,7 +116,8 @@ qc_publish(#state{bqstate = BQ}) -> [qc_message(), #message_properties{needs_confirming = frequency([{1, true}, {20, false}]), - expiry = oneof([undefined | lists:seq(1, 10)])}, + expiry = oneof([undefined | lists:seq(1, 10)]), + size = 10}, false, self(), BQ]}. qc_publish_multiple(#state{}) -> @@ -124,7 +125,7 @@ qc_publish_multiple(#state{}) -> qc_publish_delivered(#state{bqstate = BQ}) -> {call, ?BQMOD, publish_delivered, - [qc_message(), #message_properties{}, self(), BQ]}. + [qc_message(), #message_properties{size = 10}, self(), BQ]}. qc_fetch(#state{bqstate = BQ}) -> {call, ?BQMOD, fetch, [boolean(), BQ]}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 043ec7e3..59453385 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -21,7 +21,8 @@ -behaviour(gen_server2). -export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2]). +-export([send_command/2, deliver/4, deliver_reply/2, + send_credit_reply/2, send_drained/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). -export([force_event_refresh/1]). @@ -30,7 +31,7 @@ handle_info/2, handle_pre_hibernate/1, prioritise_call/4, prioritise_cast/3, prioritise_info/3, format_message_queue/2]). %% Internal --export([list_local/0]). +-export([list_local/0, deliver_reply_local/3]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, conn_name, limiter, tx, next_tag, unacked_message_q, user, @@ -39,7 +40,7 @@ queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed, confirmed, mandatory, capabilities, trace_state, - consumer_prefetch}). + consumer_prefetch, reply_consumer}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -96,6 +97,9 @@ -spec(deliver/4 :: (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). +-spec(deliver_reply/2 :: (binary(), rabbit_types:delivery()) -> 'ok'). +-spec(deliver_reply_local/3 :: + (pid(), binary(), rabbit_types:delivery()) -> 'ok'). -spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}]) -> 'ok'). @@ -142,6 +146,42 @@ send_command(Pid, Msg) -> deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). +deliver_reply(<<"amq.rabbitmq.reply-to.", Rest/binary>>, Delivery) -> + {ok, Pid, Key} = decode_fast_reply_to(Rest), + delegate:invoke_no_result( + Pid, {?MODULE, deliver_reply_local, [Key, Delivery]}). + +%% We want to ensure people can't use this mechanism to send a message +%% to an arbitrary process and kill it! +deliver_reply_local(Pid, Key, Delivery) -> + case pg_local:in_group(rabbit_channels, Pid) of + true -> gen_server2:cast(Pid, {deliver_reply, Key, Delivery}); + false -> ok + end. + +declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) -> + exists; +declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", Rest/binary>>) -> + case decode_fast_reply_to(Rest) of + {ok, Pid, Key} -> + Msg = {declare_fast_reply_to, Key}, + rabbit_misc:with_exit_handler( + rabbit_misc:const(not_found), + fun() -> gen_server2:call(Pid, Msg, infinity) end); + error -> + not_found + end; +declare_fast_reply_to(_) -> + not_found. + +%% It's inelegant to depend on the encoded lengths here, but +%% binary:split/2 does not exist in R13B03. +decode_fast_reply_to(<<PidEnc:40/binary, ".", Key:24/binary>>) -> + Pid = binary_to_term(base64:decode(PidEnc)), + {ok, Pid, Key}; +decode_fast_reply_to(_) -> + error. + send_credit_reply(Pid, Len) -> gen_server2:cast(Pid, {send_credit_reply, Len}). @@ -219,7 +259,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, mandatory = dtree:empty(), capabilities = Capabilities, trace_state = rabbit_trace:init(VHost), - consumer_prefetch = 0}, + consumer_prefetch = 0, + reply_consumer = none}, State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #ch.stats_timer, @@ -262,6 +303,13 @@ handle_call({info, Items}, _From, State) -> handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> reply(ok, State#ch{trace_state = rabbit_trace:init(VHost)}); +handle_call({declare_fast_reply_to, Key}, _From, + State = #ch{reply_consumer = Consumer}) -> + reply(case Consumer of + {_, _, Key} -> exists; + _ -> not_found + end, State); + handle_call(_Request, _From, State) -> noreply(State). @@ -326,6 +374,29 @@ handle_cast({deliver, ConsumerTag, AckRequired, Content), noreply(record_sent(ConsumerTag, AckRequired, Msg, State)); +handle_cast({deliver_reply, _K, _Del}, State = #ch{state = closing}) -> + noreply(State); +handle_cast({deliver_reply, _K, _Del}, State = #ch{reply_consumer = none}) -> + noreply(State); +handle_cast({deliver_reply, Key, #delivery{message = + #basic_message{exchange_name = ExchangeName, + routing_keys = [RoutingKey | _CcRoutes], + content = Content}}}, + State = #ch{writer_pid = WriterPid, + next_tag = DeliveryTag, + reply_consumer = {ConsumerTag, _Suffix, Key}}) -> + ok = rabbit_writer:send_command( + WriterPid, + #'basic.deliver'{consumer_tag = ConsumerTag, + delivery_tag = DeliveryTag, + redelivered = false, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey}, + Content), + noreply(State); +handle_cast({deliver_reply, _K1, _}, State=#ch{reply_consumer = {_, _, _K2}}) -> + noreply(State); + handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command( WriterPid, #'basic.credit_ok'{available = Len}), @@ -341,7 +412,7 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> handle_cast({force_event_refresh, Ref}, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State), Ref), - noreply(State); + noreply(rabbit_event:init_stats_timer(State, #ch.stats_timer)); handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) -> %% NB: don't call noreply/1 since we don't want to send confirms. @@ -433,17 +504,22 @@ send(_Command, #ch{state = closing}) -> send(Command, #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Command). -handle_exception(Reason, State = #ch{protocol = Protocol, - channel = Channel, - writer_pid = WriterPid, - reader_pid = ReaderPid, - conn_pid = ConnPid}) -> +handle_exception(Reason, State = #ch{protocol = Protocol, + channel = Channel, + writer_pid = WriterPid, + reader_pid = ReaderPid, + conn_pid = ConnPid, + conn_name = ConnName, + virtual_host = VHost, + user = User}) -> %% something bad's happened: notify_queues may not be 'ok' {_Result, State1} = notify_queues(State), case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of {Channel, CloseMethod} -> - rabbit_log:error("connection ~p, channel ~p - soft error:~n~p~n", - [ConnPid, Channel, Reason]), + rabbit_log:error("Channel error on connection ~p (~s, vhost: '~s'," + " user: '~s'), channel ~p:~n~p~n", + [ConnPid, ConnName, VHost, User#user.username, + Channel, Reason]), ok = rabbit_writer:send_command(WriterPid, CloseMethod), {noreply, State1}; {0, _} -> @@ -604,6 +680,21 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) -> check_name(_Kind, NameBin) -> NameBin. +maybe_set_fast_reply_to( + C = #content{properties = P = #'P_basic'{reply_to = + <<"amq.rabbitmq.reply-to">>}}, + #ch{reply_consumer = ReplyConsumer}) -> + case ReplyConsumer of + none -> rabbit_misc:protocol_error( + precondition_failed, + "fast reply consumer does not exist", []); + {_, Suf, _K} -> Rep = <<"amq.rabbitmq.reply-to.", Suf/binary>>, + rabbit_binary_generator:clear_encoded_content( + C#content{properties = P#'P_basic'{reply_to = Rep}}) + end; +maybe_set_fast_reply_to(C, _State) -> + C. + record_confirms([], State) -> State; record_confirms(MXs, State = #ch{confirmed = C}) -> @@ -668,8 +759,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, mandatory = Mandatory}, Content, State = #ch{virtual_host = VHostPath, tx = Tx, + channel = ChannelNum, confirm_enabled = ConfirmEnabled, - trace_state = TraceState}) -> + trace_state = TraceState, + user = #user{username = Username}, + conn_name = ConnName}) -> check_msg_size(Content), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), @@ -678,7 +772,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = #content {properties = Props} = - rabbit_binary_parser:ensure_content_decoded(Content), + maybe_set_fast_reply_to( + rabbit_binary_parser:ensure_content_decoded(Content), State), check_user_id_header(Props, State), check_expiration_header(Props), DoConfirm = Tx =/= none orelse ConfirmEnabled, @@ -690,7 +785,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> - rabbit_trace:tap_in(Message, TraceState), + rabbit_trace:tap_in(Message, ConnName, ChannelNum, + Username, TraceState), Delivery = rabbit_basic:delivery( Mandatory, DoConfirm, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), @@ -752,6 +848,55 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, {reply, #'basic.get_empty'{}, State} end; +handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>, + consumer_tag = CTag0, + no_ack = NoAck, + nowait = NoWait}, + _, State = #ch{reply_consumer = ReplyConsumer, + consumer_mapping = ConsumerMapping}) -> + case dict:find(CTag0, ConsumerMapping) of + error -> + case {ReplyConsumer, NoAck} of + {none, true} -> + CTag = case CTag0 of + <<>> -> rabbit_guid:binary( + rabbit_guid:gen_secure(), "amq.ctag"); + Other -> Other + end, + %% Precalculate both suffix and key; base64 encoding is + %% expensive + Key = base64:encode(rabbit_guid:gen_secure()), + PidEnc = base64:encode(term_to_binary(self())), + Suffix = <<PidEnc/binary, ".", Key/binary>>, + State1 = State#ch{reply_consumer = {CTag, Suffix, Key}}, + case NoWait of + true -> {noreply, State1}; + false -> Rep = #'basic.consume_ok'{consumer_tag = CTag}, + {reply, Rep, State1} + end; + {_, false} -> + rabbit_misc:protocol_error( + precondition_failed, + "reply consumer cannot acknowledge", []); + _ -> + rabbit_misc:protocol_error( + precondition_failed, "reply consumer already set", []) + end; + {ok, _} -> + %% Attempted reuse of consumer tag. + rabbit_misc:protocol_error( + not_allowed, "attempt to reuse consumer tag '~s'", [CTag0]) + end; + +handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, + _, State = #ch{reply_consumer = {ConsumerTag, _, _}}) -> + State1 = State#ch{reply_consumer = none}, + case NoWait of + true -> {noreply, State1}; + false -> Rep = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, + {reply, Rep, State1} + end; + handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ConsumerTag, no_local = _, % FIXME: implement @@ -966,6 +1111,18 @@ handle_method(#'exchange.unbind'{destination = DestinationNameBin, SourceNameBin, exchange, DestinationNameBin, RoutingKey, Arguments, #'exchange.unbind_ok'{}, NoWait, State); +%% Note that all declares to these are effectively passive. If it +%% exists it by definition has one consumer. +handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to", + _/binary>> = QueueNameBin, + nowait = NoWait}, _, + State = #ch{virtual_host = VHost}) -> + QueueName = rabbit_misc:r(VHost, queue, QueueNameBin), + case declare_fast_reply_to(QueueNameBin) of + exists -> return_queue_declare_ok(QueueName, NoWait, 0, 1, State); + not_found -> rabbit_misc:not_found(QueueName) + end; + handle_method(#'queue.declare'{queue = QueueNameBin, passive = false, durable = DurableDeclare, @@ -992,7 +1149,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, QueueName, fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( Q, Durable, AutoDelete, Args, Owner), - rabbit_amqqueue:stat(Q) + maybe_stat(NoWait, Q) end) of {ok, MessageCount, ConsumerCount} -> return_queue_declare_ok(QueueName, NoWait, MessageCount, @@ -1048,7 +1205,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = rabbit_amqqueue:with_or_die( - QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), + QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end), ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); @@ -1204,6 +1361,9 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, E end. +maybe_stat(false, Q) -> rabbit_amqqueue:stat(Q); +maybe_stat(true, _Q) -> {ok, 0, 0}. + consumer_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, queue_monitors = QMons, @@ -1365,7 +1525,10 @@ record_sent(ConsumerTag, AckRequired, Msg = {QName, QPid, MsgId, Redelivered, _Message}, State = #ch{unacked_message_q = UAMQ, next_tag = DeliveryTag, - trace_state = TraceState}) -> + trace_state = TraceState, + user = #user{username = Username}, + conn_name = ConnName, + channel = ChannelNum}) -> ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of {none, true} -> get; {none, false} -> get_no_ack; @@ -1376,7 +1539,7 @@ record_sent(ConsumerTag, AckRequired, true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State); false -> ok end, - rabbit_trace:tap_out(Msg, TraceState), + rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState), UAMQ1 = case AckRequired of true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}}, UAMQ); diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl index 81c17fbf..db9349ac 100644 --- a/src/rabbit_channel_interceptor.erl +++ b/src/rabbit_channel_interceptor.erl @@ -33,7 +33,7 @@ -callback description() -> [proplists:property()]. -callback intercept(original_method(), rabbit_types:vhost()) -> - rabbit_types:ok_or_error2(processed_method(), any()). + processed_method() | rabbit_misc:channel_or_connection_exit(). %% Whether the interceptor wishes to intercept the amqp method -callback applies_to(intercept_method()) -> boolean(). @@ -62,20 +62,15 @@ intercept_method(M, VHost) -> intercept_method(M, _VHost, []) -> M; intercept_method(M, VHost, [I]) -> - case I:intercept(M, VHost) of - {ok, M2} -> - case validate_method(M, M2) of - true -> - M2; - _ -> - internal_error("Interceptor: ~p expected " - "to return method: ~p but returned: ~p", - [I, rabbit_misc:method_record_type(M), - rabbit_misc:method_record_type(M2)]) - end; - {error, Reason} -> - internal_error("Interceptor: ~p failed with reason: ~p", - [I, Reason]) + M2 = I:intercept(M, VHost), + case validate_method(M, M2) of + true -> + M2; + _ -> + internal_error("Interceptor: ~p expected " + "to return method: ~p but returned: ~p", + [I, rabbit_misc:method_record_type(M), + rabbit_misc:method_record_type(M2)]) end; intercept_method(M, _VHost, Is) -> internal_error("More than one interceptor for method: ~p -- ~p", diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 451f4d70..70fe10e6 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -54,6 +54,7 @@ change_cluster_node_type, update_cluster_nodes, {forget_cluster_node, [?OFFLINE_DEF]}, + force_boot, cluster_status, {sync_queue, [?VHOST_DEF]}, {cancel_sync_queue, [?VHOST_DEF]}, @@ -114,6 +115,12 @@ {"Policies", rabbit_policy, list_formatted, info_keys}, {"Parameters", rabbit_runtime_parameters, list_formatted, info_keys}]). +-define(COMMANDS_NOT_REQUIRING_APP, + [stop, stop_app, start_app, wait, reset, force_reset, rotate_logs, + join_cluster, change_cluster_node_type, update_cluster_nodes, + forget_cluster_node, cluster_status, status, environment, eval, + force_boot]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -131,6 +138,7 @@ %%---------------------------------------------------------------------------- start() -> + start_distribution(), {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), {Command, Opts, Args} = case parse_arguments(init:get_plain_arguments(), NodeStr) of @@ -154,7 +162,7 @@ start() -> %% The reason we don't use a try/catch here is that rpc:call turns %% thrown errors into normal return values - case catch action(Command, Node, Args, Opts, Inform) of + case catch do_action(Command, Node, Args, Opts, Inform) of ok -> case Quiet of true -> ok; @@ -249,6 +257,15 @@ parse_arguments(CmdLine, NodeStr) -> %%---------------------------------------------------------------------------- +do_action(Command, Node, Args, Opts, Inform) -> + case lists:member(Command, ?COMMANDS_NOT_REQUIRING_APP) of + false -> case ensure_app_running(Node) of + ok -> action(Command, Node, Args, Opts, Inform); + E -> E + end; + true -> action(Command, Node, Args, Opts, Inform) + end. + action(stop, Node, Args, _Opts, Inform) -> Inform("Stopping and halting node ~p", [Node]), Res = call(Node, {rabbit, stop_and_halt, []}), @@ -302,8 +319,19 @@ action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) -> ClusterNode = list_to_atom(ClusterNodeS), RemoveWhenOffline = proplists:get_bool(?OFFLINE_OPT, Opts), Inform("Removing node ~p from cluster", [ClusterNode]), - rpc_call(Node, rabbit_mnesia, forget_cluster_node, - [ClusterNode, RemoveWhenOffline]); + case RemoveWhenOffline of + true -> become(Node), + rabbit_mnesia:forget_cluster_node(ClusterNode, true); + false -> rpc_call(Node, rabbit_mnesia, forget_cluster_node, + [ClusterNode, false]) + end; + +action(force_boot, Node, [], _Opts, Inform) -> + Inform("Forcing boot for Mnesia dir ~s", [mnesia:system_info(directory)]), + case rabbit:is_running(Node) of + false -> rabbit_mnesia:force_load_next_boot(); + true -> {error, rabbit_running} + end; action(sync_queue, Node, [Q], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), @@ -650,6 +678,22 @@ exit_loop(Port) -> {Port, _} -> exit_loop(Port) end. +start_distribution() -> + CtlNodeName = rabbit_misc:format("rabbitmqctl-~s", [os:getpid()]), + {ok, _} = net_kernel:start([list_to_atom(CtlNodeName), shortnames]). + +become(BecomeNode) -> + case net_adm:ping(BecomeNode) of + pong -> exit({node_running, BecomeNode}); + pang -> io:format(" * Impersonating node: ~s...", [BecomeNode]), + error_logger:tty(false), + ok = net_kernel:stop(), + {ok, _} = net_kernel:start([BecomeNode, shortnames]), + io:format(" done~n", []), + Dir = mnesia:system_info(directory), + io:format(" * Mnesia directory : ~s~n", [Dir]) + end. + %%---------------------------------------------------------------------------- default_if_empty(List, Default) when is_list(List) -> @@ -715,6 +759,17 @@ unsafe_rpc(Node, Mod, Fun, Args) -> Normal -> Normal end. +ensure_app_running(Node) -> + case call(Node, {rabbit, is_running, []}) of + true -> ok; + false -> {error_string, + rabbit_misc:format( + "rabbit application is not running on node ~s.~n" + " * Suggestion: start it with \"rabbitmqctl start_app\" " + "and try again", [Node])}; + Other -> Other + end. + call(Node, {Mod, Fun, Args}) -> rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)). diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index ec32e687..728bc431 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -129,6 +129,9 @@ is_cycle(Queue, Deaths) -> {longstr, <<"rejected">>} =/= rabbit_misc:table_lookup(D, <<"reason">>); (_) -> + %% There was something we didn't expect, therefore + %% a client must have put it there, therefore the + %% cycle was not "fully automatic". false end, Cycle ++ [H]) end. diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index b867223b..a33103fd 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -23,6 +23,7 @@ ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]). -export([stats_level/2, if_enabled/3]). -export([notify/2, notify/3, notify_if/3]). +-export([sync_notify/2, sync_notify/3]). %%---------------------------------------------------------------------------- @@ -61,6 +62,9 @@ -spec(notify/2 :: (event_type(), event_props()) -> 'ok'). -spec(notify/3 :: (event_type(), event_props(), reference() | 'none') -> 'ok'). -spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok'). +-spec(sync_notify/2 :: (event_type(), event_props()) -> 'ok'). +-spec(sync_notify/3 :: (event_type(), event_props(), + reference() | 'none') -> 'ok'). -endif. @@ -145,7 +149,16 @@ notify_if(false, _Type, _Props) -> ok. notify(Type, Props) -> notify(Type, Props, none). notify(Type, Props, Ref) -> - gen_event:notify(?MODULE, #event{type = Type, - props = Props, - reference = Ref, - timestamp = os:timestamp()}). + gen_event:notify(?MODULE, event_cons(Type, Props, Ref)). + +sync_notify(Type, Props) -> sync_notify(Type, Props, none). + +sync_notify(Type, Props, Ref) -> + gen_event:sync_notify(?MODULE, event_cons(Type, Props, Ref)). + +event_cons(Type, Props, Ref) -> + #event{type = Type, + props = Props, + reference = Ref, + timestamp = os:timestamp()}. + diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 685c311f..f184174c 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -20,7 +20,8 @@ -export([recover/0, policy_changed/2, callback/4, declare/6, assert_equivalence/6, assert_args_equivalence/2, check_type/1, - lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3, + lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2, + update_scratch/3, update_decorators/1, immutable/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, route/2, delete/2, validate_binding/2]). %% these must be run inside a mnesia tx @@ -61,6 +62,7 @@ -spec(lookup_or_die/1 :: (name()) -> rabbit_types:exchange() | rabbit_types:channel_exit()). +-spec(list/0 :: () -> [rabbit_types:exchange()]). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]). -spec(lookup_scratch/2 :: (name(), atom()) -> rabbit_types:ok(term()) | @@ -70,6 +72,8 @@ (name(), fun((rabbit_types:exchange()) -> rabbit_types:exchange())) -> not_found | rabbit_types:exchange()). +-spec(update_decorators/1 :: (name()) -> 'ok'). +-spec(immutable/1 :: (rabbit_types:exchange()) -> rabbit_types:exchange()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()). -spec(info/2 :: @@ -106,24 +110,15 @@ recover() -> mnesia:read({rabbit_exchange, XName}) =:= [] end, fun (X, Tx) -> - case Tx of - true -> store(X); - false -> ok - end, - callback(X, create, map_create_tx(Tx), [X]) + X1 = case Tx of + true -> store_ram(X); + false -> rabbit_exchange_decorator:set(X) + end, + callback(X1, create, map_create_tx(Tx), [X1]) end, rabbit_durable_exchange), - report_missing_decorators(Xs), [XName || #exchange{name = XName} <- Xs]. -report_missing_decorators(Xs) -> - Mods = lists:usort(lists:append([rabbit_exchange_decorator:select(raw, D) || - #exchange{decorators = D} <- Xs])), - case [M || M <- Mods, code:which(M) =:= non_existing] of - [] -> ok; - M -> rabbit_log:warning("Missing exchange decorators: ~p~n", [M]) - end. - callback(X = #exchange{type = XType, decorators = Decorators}, Fun, Serial0, Args) -> Serial = if is_function(Serial0) -> Serial0; @@ -158,12 +153,13 @@ serial(#exchange{name = XName} = X) -> end. declare(XName, Type, Durable, AutoDelete, Internal, Args) -> - X = rabbit_policy:set(#exchange{name = XName, - type = Type, - durable = Durable, - auto_delete = AutoDelete, - internal = Internal, - arguments = Args}), + X = rabbit_exchange_decorator:set( + rabbit_policy:set(#exchange{name = XName, + type = Type, + durable = Durable, + auto_delete = AutoDelete, + internal = Internal, + arguments = Args})), XT = type_to_module(Type), %% We want to upset things if it isn't ok ok = XT:validate(X), @@ -171,13 +167,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> fun () -> case mnesia:wread({rabbit_exchange, XName}) of [] -> - store(X), - ok = case Durable of - true -> mnesia:write(rabbit_durable_exchange, - X, write); - false -> ok - end, - {new, X}; + {new, store(X)}; [ExistingX] -> {existing, ExistingX} end @@ -195,7 +185,19 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> map_create_tx(true) -> transaction; map_create_tx(false) -> none. -store(X) -> ok = mnesia:write(rabbit_exchange, X, write). + +store(X = #exchange{durable = true}) -> + mnesia:write(rabbit_durable_exchange, X#exchange{decorators = undefined}, + write), + store_ram(X); +store(X = #exchange{durable = false}) -> + store_ram(X). + +store_ram(X) -> + X1 = rabbit_exchange_decorator:set(X), + ok = mnesia:write(rabbit_exchange, rabbit_exchange_decorator:set(X1), + write), + X1. %% Used with binaries sent over the wire; the type may not exist. check_type(TypeBin) -> @@ -243,6 +245,8 @@ lookup_or_die(Name) -> {error, not_found} -> rabbit_misc:not_found(Name) end. +list() -> mnesia:dirty_match_object(rabbit_exchange, #exchange{_ = '_'}). + %% Not dirty_match_object since that would not be transactional when used in a %% tx context list(VHostPath) -> @@ -287,20 +291,27 @@ update_scratch(Name, App, Fun) -> ok end). +update_decorators(Name) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:wread({rabbit_exchange, Name}) of + [X] -> store_ram(X), + ok; + [] -> ok + end + end). + update(Name, Fun) -> case mnesia:wread({rabbit_exchange, Name}) of - [X = #exchange{durable = Durable}] -> - X1 = Fun(X), - ok = mnesia:write(rabbit_exchange, X1, write), - case Durable of - true -> ok = mnesia:write(rabbit_durable_exchange, X1, write); - _ -> ok - end, - X1; - [] -> - not_found + [X] -> X1 = Fun(X), + store(X1); + [] -> not_found end. +immutable(X) -> X#exchange{scratches = none, + policy = none, + decorators = none}. + info_keys() -> ?INFO_KEYS. map(VHostPath, F) -> @@ -333,14 +344,21 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName, decorators = Decorators} = X, #delivery{message = #basic_message{routing_keys = RKs}} = Delivery) -> - case {RName, rabbit_exchange_decorator:select(route, Decorators)} of - {<<"">>, []} -> - %% Optimisation - [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; - {_, SelectedDecorators} -> - lists:usort(route1(Delivery, SelectedDecorators, {[X], XName, []})) + case RName of + <<>> -> + RKsSorted = lists:usort(RKs), + [rabbit_channel:deliver_reply(RK, Delivery) || + RK <- RKsSorted, virtual_reply_queue(RK)], + [rabbit_misc:r(VHost, queue, RK) || RK <- RKsSorted, + not virtual_reply_queue(RK)]; + _ -> + Decs = rabbit_exchange_decorator:select(route, Decorators), + lists:usort(route1(Delivery, Decs, {[X], XName, []})) end. +virtual_reply_queue(<<"amq.rabbitmq.reply-to.", _/binary>>) -> true; +virtual_reply_queue(_) -> false. + route1(_, _, {[], _, QNames}) -> QNames; route1(Delivery, Decorators, diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index 2f056b1b..900f9c32 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([select/2, set/1]). +-export([select/2, set/1, register/2, unregister/1]). %% This is like an exchange type except that: %% @@ -104,3 +104,25 @@ list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)]. cons_if_eq(Select, Select, Item, List) -> [Item | List]; cons_if_eq(_Select, _Other, _Item, List) -> List. + +register(TypeName, ModuleName) -> + rabbit_registry:register(exchange_decorator, TypeName, ModuleName), + [maybe_recover(X) || X <- rabbit_exchange:list()], + ok. + +unregister(TypeName) -> + rabbit_registry:unregister(exchange_decorator, TypeName), + [maybe_recover(X) || X <- rabbit_exchange:list()], + ok. + +maybe_recover(X = #exchange{name = Name, + decorators = Decs}) -> + #exchange{decorators = Decs1} = set(X), + Old = lists:sort(select(all, Decs)), + New = lists:sort(select(all, Decs1)), + case New of + Old -> ok; + _ -> %% TODO create a tx here for non-federation decorators + [M:create(none, X) || M <- New -- Old], + rabbit_exchange:update_decorators(Name) + end. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index b17b7de9..f32a187d 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -183,12 +183,13 @@ -record(lim, {prefetch_count = 0, ch_pid, + %% 'Notify' is a boolean that indicates whether a queue should be + %% notified of a change in the limit or volume that may allow it to + %% deliver more messages via the limiter's channel. queues = orddict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). -%% 'Notify' is a boolean that indicates whether a queue should be -%% notified of a change in the limit or volume that may allow it to -%% deliver more messages via the limiter's channel. +%% mode is of type credit_mode() -record(credit, {credit = 0, mode}). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index f4df0e76..e05ef05a 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -16,16 +16,8 @@ -module(rabbit_log). --behaviour(gen_server). - --export([start_link/0]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - -export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]). - --define(SERVER, ?MODULE). +-export([with_local_io/1]). %%---------------------------------------------------------------------------- @@ -36,8 +28,6 @@ -type(category() :: atom()). -type(level() :: 'info' | 'warning' | 'error'). --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). - -spec(log/3 :: (category(), level(), string()) -> 'ok'). -spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok'). @@ -48,16 +38,24 @@ -spec(error/1 :: (string()) -> 'ok'). -spec(error/2 :: (string(), [any()]) -> 'ok'). +-spec(with_local_io/1 :: (fun (() -> A)) -> A). + -endif. %%---------------------------------------------------------------------------- -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). log(Category, Level, Fmt) -> log(Category, Level, Fmt, []). log(Category, Level, Fmt, Args) when is_list(Args) -> - gen_server:cast(?SERVER, {log, Category, Level, Fmt, Args}). + case level(Level) =< catlevel(Category) of + false -> ok; + true -> F = case Level of + info -> fun error_logger:info_msg/2; + warning -> fun error_logger:warning_msg/2; + error -> fun error_logger:error_msg/2 + end, + with_local_io(fun () -> F(Fmt, Args) end) + end. info(Fmt) -> log(default, info, Fmt). info(Fmt, Args) -> log(default, info, Fmt, Args). @@ -66,41 +64,14 @@ warning(Fmt, Args) -> log(default, warning, Fmt, Args). error(Fmt) -> log(default, error, Fmt). error(Fmt, Args) -> log(default, error, Fmt, Args). -%%-------------------------------------------------------------------- - -init([]) -> - {ok, CatLevelList} = application:get_env(log_levels), - CatLevels = [{Cat, level(Level)} || {Cat, Level} <- CatLevelList], - {ok, orddict:from_list(CatLevels)}. - -handle_call(_Request, _From, State) -> - {noreply, State}. - -handle_cast({log, Category, Level, Fmt, Args}, CatLevels) -> - CatLevel = case orddict:find(Category, CatLevels) of - {ok, L} -> L; - error -> level(info) - end, - case level(Level) =< CatLevel of - false -> ok; - true -> (case Level of - info -> fun error_logger:info_msg/2; - warning -> fun error_logger:warning_msg/2; - error -> fun error_logger:error_msg/2 - end)(Fmt, Args) - end, - {noreply, CatLevels}; -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +catlevel(Category) -> + %% We can get here as part of rabbitmqctl when it is impersonating + %% a node; in which case the env will not be defined. + CatLevelList = case application:get_env(rabbit, log_levels) of + {ok, L} -> L; + undefined -> [] + end, + level(proplists:get_value(Category, CatLevelList, info)). %%-------------------------------------------------------------------- @@ -108,3 +79,19 @@ level(info) -> 3; level(warning) -> 2; level(error) -> 1; level(none) -> 0. + +%% Execute Fun using the IO system of the local node (i.e. the node on +%% which the code is executing). Since this is invoked for every log +%% message, we try to avoid unnecessarily churning group_leader/1. +with_local_io(Fun) -> + GL = group_leader(), + Node = node(), + case node(GL) of + Node -> Fun(); + _ -> group_leader(whereis(user), self()), + try + Fun() + after + group_leader(GL, self()) + end + end. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 2b16b911..9bccf5dd 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -22,7 +22,7 @@ len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, - msg_rates/1, status/1, invoke/3, is_duplicate/2]). + msg_rates/1, info/2, invoke/3, is_duplicate/2]). -export([start/1, stop/0]). @@ -170,10 +170,24 @@ terminate({shutdown, dropped} = Reason, State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}; terminate(Reason, - State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + State = #state { name = QName, + backing_queue = BQ, + backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but %% shouldn't be deleted. Most likely safe shutdown of this - %% node. Thus just let some other slave take over. + %% node. + {ok, Q = #amqqueue{sync_slave_pids = SSPids}} = + rabbit_amqqueue:lookup(QName), + case SSPids =:= [] andalso + rabbit_policy:get(<<"ha-promote-on-shutdown">>, Q) =/= <<"always">> of + true -> %% Remove the whole queue to avoid data loss + rabbit_mirror_queue_misc:log_warning( + QName, "Stopping all nodes on master shutdown since no " + "synchronised slave is available~n", []), + stop_all_slaves(Reason, State); + false -> %% Just let some other slave take over. + ok + end, State #state { backing_queue_state = BQ:terminate(Reason, BQS) }. delete_and_terminate(Reason, State = #state { backing_queue = BQ, @@ -181,7 +195,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ, stop_all_slaves(Reason, State), State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}. -stop_all_slaves(Reason, #state{name = QName, gm = GM}) -> +stop_all_slaves(Reason, #state{name = QName, gm = GM}) -> {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), MRefs = [erlang:monitor(process, Pid) || Pid <- [GM | SPids]], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), @@ -360,10 +374,13 @@ resume(State = #state { backing_queue = BQ, msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:msg_rates(BQS). -status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - BQ:status(BQS) ++ +info(backing_queue_status, + State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:info(backing_queue_status, BQS) ++ [ {mirror_seen, dict:size(State #state.seen_status)}, - {mirror_senders, sets:size(State #state.known_senders)} ]. + {mirror_senders, sets:size(State #state.known_senders)} ]; +info(Item, #state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:info(Item, BQS). invoke(?MODULE, Fun, State) -> Fun(?MODULE, State); diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index b0f092a9..9e8c4a18 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -29,16 +29,19 @@ -include("rabbit.hrl"). --rabbit_boot_step({?MODULE, - [{description, "HA policy validation"}, - {mfa, {rabbit_registry, register, - [policy_validator, <<"ha-mode">>, ?MODULE]}}, - {mfa, {rabbit_registry, register, - [policy_validator, <<"ha-params">>, ?MODULE]}}, - {mfa, {rabbit_registry, register, - [policy_validator, <<"ha-sync-mode">>, ?MODULE]}}, - {requires, rabbit_registry}, - {enables, recovery}]}). +-rabbit_boot_step( + {?MODULE, + [{description, "HA policy validation"}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-mode">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-params">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-sync-mode">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, recovery}]}). %%---------------------------------------------------------------------------- @@ -75,9 +78,10 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> %% get here. case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [Q = #amqqueue { pid = QPid, - slave_pids = SPids, - gm_pids = GMPids }] -> + [Q = #amqqueue { pid = QPid, + slave_pids = SPids, + gm_pids = GMPids, + down_slave_nodes = DSNs}] -> {DeadGM, AliveGM} = lists:partition( fun ({GM, _}) -> lists:member(GM, DeadGMPids) @@ -86,6 +90,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> AlivePids = [Pid || {_GM, Pid} <- AliveGM], Alive = [Pid || Pid <- [QPid | SPids], lists:member(Pid, AlivePids)], + DSNs1 = [node(Pid) || + Pid <- SPids, + not lists:member(Pid, AlivePids)] ++ DSNs, {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> @@ -94,9 +101,10 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. - Q1 = Q#amqqueue{pid = QPid1, - slave_pids = SPids1, - gm_pids = AliveGM}, + Q1 = Q#amqqueue{pid = QPid1, + slave_pids = SPids1, + gm_pids = AliveGM, + down_slave_nodes = DSNs1}, store_updated_slaves(Q1), %% If we add and remove nodes at the same time we %% might tell the old master we need to sync and @@ -106,8 +114,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> _ -> %% Master has changed, and we're not it. %% [1]. - Q1 = Q#amqqueue{slave_pids = Alive, - gm_pids = AliveGM}, + Q1 = Q#amqqueue{slave_pids = Alive, + gm_pids = AliveGM, + down_slave_nodes = DSNs1}, store_updated_slaves(Q1) end, {ok, QPid1, DeadPids} @@ -236,12 +245,16 @@ log(Level, QName, Fmt, Args) -> rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt, [rabbit_misc:rs(QName) | Args]). -store_updated_slaves(Q = #amqqueue{slave_pids = SPids, - sync_slave_pids = SSPids}) -> +store_updated_slaves(Q = #amqqueue{pid = MPid, + slave_pids = SPids, + sync_slave_pids = SSPids, + down_slave_nodes = DSNs}) -> %% TODO now that we clear sync_slave_pids in rabbit_durable_queue, %% do we still need this filtering? SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)], - Q1 = Q#amqqueue{sync_slave_pids = SSPids1}, + DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]], + Q1 = Q#amqqueue{sync_slave_pids = SSPids1, + down_slave_nodes = DSNs1}, ok = rabbit_amqqueue:store_queue(Q1), %% Wake it up so that we emit a stats event rabbit_amqqueue:notify_policy_changed(Q1), @@ -374,16 +387,21 @@ validate_policy(KeyList) -> Mode = proplists:get_value(<<"ha-mode">>, KeyList, none), Params = proplists:get_value(<<"ha-params">>, KeyList, none), SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none), - case {Mode, Params, SyncMode} of - {none, none, none} -> + PromoteOnShutdown = proplists:get_value( + <<"ha-promote-on-shutdown">>, KeyList, none), + case {Mode, Params, SyncMode, PromoteOnShutdown} of + {none, none, none, none} -> ok; - {none, _, _} -> - {error, "ha-mode must be specified to specify ha-params or " - "ha-sync-mode", []}; + {none, _, _, _} -> + {error, "ha-mode must be specified to specify ha-params, " + "ha-sync-mode or ha-promote-on-shutdown", []}; _ -> case module(Mode) of {ok, M} -> case M:validate_policy(Params) of - ok -> validate_sync_mode(SyncMode); + ok -> case validate_sync_mode(SyncMode) of + ok -> validate_pos(PromoteOnShutdown); + E -> E + end; E -> E end; _ -> {error, "~p is not a valid ha-mode value", [Mode]} @@ -398,3 +416,12 @@ validate_sync_mode(SyncMode) -> Mode -> {error, "ha-sync-mode must be \"manual\" " "or \"automatic\", got ~p", [Mode]} end. + +validate_pos(PromoteOnShutdown) -> + case PromoteOnShutdown of + <<"always">> -> ok; + <<"when-synced">> -> ok; + none -> ok; + Mode -> {error, "ha-promote-on-shutdown must be " + "\"always\" or \"when-synced\", got ~p", [Mode]} + end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 11d6a79c..6d0064ab 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -271,8 +271,8 @@ handle_cast({sync_start, Ref, Syncer}, DD, Ref, TRef, Syncer, BQ, BQS, fun (BQN, BQSN) -> BQSN1 = update_ram_duration(BQN, BQSN), - TRefN = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL, - self(), update_ram_duration), + TRefN = rabbit_misc:send_after(?RAM_DURATION_UPDATE_INTERVAL, + self(), update_ram_duration), {TRefN, BQSN1} end) of denied -> noreply(State1); @@ -653,8 +653,9 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> timed -> {ensure_sync_timer(State1), 0 } end. -backing_queue_timeout(State = #state { backing_queue = BQ }) -> - run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State). +backing_queue_timeout(State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State#state{backing_queue_state = BQ:timeout(BQS)}. ensure_sync_timer(State) -> rabbit_misc:ensure_timer(State, #state.sync_timer_ref, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index b682ebd3..c4148bbf 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -42,10 +42,9 @@ -export([table_filter/3]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([format/2, format_many/1, format_stderr/2]). --export([with_local_io/1, local_info_msg/2]). -export([unfold/2, ceil/1, queue_fold/3]). -export([sort_field_table/1]). --export([pid_to_string/1, string_to_pid/1]). +-export([pid_to_string/1, string_to_pid/1, node_to_fake_pid/1]). -export([version_compare/2, version_compare/3]). -export([version_minor_equivalent/2]). -export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). @@ -67,10 +66,11 @@ -export([check_expiry/1]). -export([base64url/1]). -export([interval_operation/4]). --export([ensure_timer/4, stop_timer/2]). +-export([ensure_timer/4, stop_timer/2, send_after/3, cancel_timer/1]). -export([get_parent/0]). -export([store_proc_name/1, store_proc_name/2]). -export([moving_average/4]). +-export([now_to_ms/1]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -81,7 +81,7 @@ -ifdef(use_specs). --export_type([resource_name/0, thunk/1]). +-export_type([resource_name/0, thunk/1, channel_or_connection_exit/0]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). @@ -91,9 +91,10 @@ :: rabbit_types:channel_exit() | rabbit_types:connection_exit()). -type(digraph_label() :: term()). -type(graph_vertex_fun() :: - fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])). + fun (({atom(), [term()]}) -> [{digraph:vertex(), digraph_label()}])). -type(graph_edge_fun() :: - fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])). + fun (({atom(), [term()]}) -> [{digraph:vertex(), digraph:vertex()}])). +-type(tref() :: {'erlang', reference()} | {timer, timer:tref()}). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). @@ -183,8 +184,6 @@ -spec(format/2 :: (string(), [any()]) -> string()). -spec(format_many/1 :: ([{string(), [any()]}]) -> string()). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). --spec(with_local_io/1 :: (fun (() -> A)) -> A). --spec(local_info_msg/2 :: (string(), [any()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). -spec(ceil/1 :: (number()) -> integer()). -spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). @@ -192,6 +191,7 @@ (rabbit_framing:amqp_table()) -> rabbit_framing:amqp_table()). -spec(pid_to_string/1 :: (pid()) -> string()). -spec(string_to_pid/1 :: (string()) -> pid()). +-spec(node_to_fake_pid/1 :: (atom()) -> pid()). -spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt'). -spec(version_compare/3 :: (string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) @@ -209,7 +209,8 @@ [string()]) -> {'ok', {atom(), [{string(), string()}], [string()]}} | 'no_command'). --spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]). +-spec(all_module_attributes/1 :: + (atom()) -> [{atom(), atom(), [term()]}]). -spec(build_acyclic_graph/3 :: (graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}]) -> rabbit_types:ok_or_error2(digraph(), @@ -245,11 +246,16 @@ -> {any(), non_neg_integer()}). -spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A). -spec(stop_timer/2 :: (A, non_neg_integer()) -> A). +-spec(send_after/3 :: (non_neg_integer(), pid(), any()) -> tref()). +-spec(cancel_timer/1 :: (tref()) -> 'ok'). -spec(get_parent/0 :: () -> pid()). -spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok). -spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok). -spec(moving_average/4 :: (float(), float(), float(), float() | 'undefined') -> float()). +-spec(now_to_ms/1 :: ({non_neg_integer(), + non_neg_integer(), + non_neg_integer()}) -> pos_integer()). -endif. %%---------------------------------------------------------------------------- @@ -635,23 +641,6 @@ format_stderr(Fmt, Args) -> end, ok. -%% Execute Fun using the IO system of the local node (i.e. the node on -%% which the code is executing). -with_local_io(Fun) -> - GL = group_leader(), - group_leader(whereis(user), self()), - try - Fun() - after - group_leader(GL, self()) - end. - -%% Log an info message on the local node using the standard logger. -%% Use this if rabbit isn't running and the call didn't originate on -%% the local node (e.g. rabbitmqctl calls). -local_info_msg(Format, Args) -> - with_local_io(fun () -> error_logger:info_msg(Format, Args) end). - unfold(Fun, Init) -> unfold(Fun, [], Init). @@ -705,6 +694,10 @@ string_to_pid(Str) -> throw(Err) end. +%% node(node_to_fake_pid(Node)) =:= Node. +node_to_fake_pid(Node) -> + string_to_pid(format("<~s.0.0.0>", [Node])). + version_compare(A, B, lte) -> case version_compare(A, B) of eq -> true; @@ -849,20 +842,20 @@ module_attributes(Module) -> end. all_module_attributes(Name) -> - Modules = + Targets = lists:usort( lists:append( - [Modules || {App, _, _} <- application:loaded_applications(), - {ok, Modules} <- [application:get_key(App, modules)]])), + [[{App, Module} || Module <- Modules] || + {App, _, _} <- application:loaded_applications(), + {ok, Modules} <- [application:get_key(App, modules)]])), lists:foldl( - fun (Module, Acc) -> + fun ({App, Module}, Acc) -> case lists:append([Atts || {N, Atts} <- module_attributes(Module), N =:= Name]) of [] -> Acc; - Atts -> [{Module, Atts} | Acc] + Atts -> [{App, Module, Atts} | Acc] end - end, [], Modules). - + end, [], Targets). build_acyclic_graph(VertexFun, EdgeFun, Graph) -> G = digraph:new([acyclic]), @@ -870,13 +863,13 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) -> [case digraph:vertex(G, Vertex) of false -> digraph:add_vertex(G, Vertex, Label); _ -> ok = throw({graph_error, {vertex, duplicate, Vertex}}) - end || {Module, Atts} <- Graph, - {Vertex, Label} <- VertexFun(Module, Atts)], + end || GraphElem <- Graph, + {Vertex, Label} <- VertexFun(GraphElem)], [case digraph:add_edge(G, From, To) of {error, E} -> throw({graph_error, {edge, E, From, To}}); _ -> ok - end || {Module, Atts} <- Graph, - {From, To} <- EdgeFun(Module, Atts)], + end || GraphElem <- Graph, + {From, To} <- EdgeFun(GraphElem)], {ok, G} catch {graph_error, Reason} -> true = digraph:delete(G), @@ -1017,7 +1010,9 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse V =:= true orelse V =:= false -> V. -check_expiry(N) when N > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, N}}; +now_to_ms({Mega, Sec, Micro}) -> + (Mega * 1000000 * 1000000 + Sec * 1000000 + Micro) div 1000. + check_expiry(N) when N < 0 -> {error, {value_negative, N}}; check_expiry(_N) -> ok. @@ -1045,7 +1040,7 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) -> ensure_timer(State, Idx, After, Msg) -> case element(Idx, State) of - undefined -> TRef = erlang:send_after(After, self(), Msg), + undefined -> TRef = send_after(After, self(), Msg), setelement(Idx, State, TRef); _ -> State end. @@ -1053,12 +1048,25 @@ ensure_timer(State, Idx, After, Msg) -> stop_timer(State, Idx) -> case element(Idx, State) of undefined -> State; - TRef -> case erlang:cancel_timer(TRef) of - false -> State; - _ -> setelement(Idx, State, undefined) - end + TRef -> cancel_timer(TRef), + setelement(Idx, State, undefined) end. +%% timer:send_after/3 goes through a single timer process but allows +%% long delays. erlang:send_after/3 does not have a bottleneck but +%% only allows max 2^32-1 millis. +-define(MAX_ERLANG_SEND_AFTER, 4294967295). +send_after(Millis, Pid, Msg) when Millis > ?MAX_ERLANG_SEND_AFTER -> + {ok, Ref} = timer:send_after(Millis, Pid, Msg), + {timer, Ref}; +send_after(Millis, Pid, Msg) -> + {erlang, erlang:send_after(Millis, Pid, Msg)}. + +cancel_timer({erlang, Ref}) -> erlang:cancel_timer(Ref), + ok; +cancel_timer({timer, Ref}) -> {ok, cancel} = timer:cancel(Ref), + ok. + store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}). store_proc_name(TypeProcName) -> put(process_name, TypeProcName). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index c6c2c8eb..17fca7bb 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -23,6 +23,7 @@ update_cluster_nodes/1, change_cluster_node_type/1, forget_cluster_node/2, + force_load_next_boot/0, status/0, is_clustered/0, @@ -63,6 +64,7 @@ -spec(update_cluster_nodes/1 :: (node()) -> 'ok'). -spec(change_cluster_node_type/1 :: (node_type()) -> 'ok'). -spec(forget_cluster_node/2 :: (node(), boolean()) -> 'ok'). +-spec(force_load_next_boot/0 :: () -> 'ok'). %% Various queries to get the status of the db -spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} | @@ -114,7 +116,7 @@ init_from_config() -> true -> disc; false -> ram end}, - error_logger:warning_msg( + rabbit_log:warning( "Converting legacy 'cluster_nodes' configuration~n ~w~n" "to~n ~w.~n~n" "Please update the configuration to the new format " @@ -125,17 +127,22 @@ init_from_config() -> {ok, Config} -> Config end, - case find_good_node(nodes_excl_me(TryNodes)) of + case TryNodes of + [] -> init_db_and_upgrade([node()], disc, false); + _ -> auto_cluster(TryNodes, NodeType) + end. + +auto_cluster(TryNodes, NodeType) -> + case find_auto_cluster_node(nodes_excl_me(TryNodes)) of {ok, Node} -> - rabbit_log:info("Node '~p' selected for clustering from " - "configuration~n", [Node]), + rabbit_log:info("Node '~p' selected for auto-clustering~n", [Node]), {ok, {_, DiscNodes, _}} = discover_cluster0(Node), init_db_and_upgrade(DiscNodes, NodeType, true), rabbit_node_monitor:notify_joined_cluster(); none -> - rabbit_log:warning("Could not find any suitable node amongst the " - "ones provided in the configuration: ~p~n", - [TryNodes]), + rabbit_log:warning( + "Could not find any node for auto-clustering from: ~p~n" + "Starting blank node...~n", [TryNodes]), init_db_and_upgrade([node()], disc, false) end. @@ -170,14 +177,13 @@ join_cluster(DiscoveryNode, NodeType) -> reset_gracefully(), %% Join the cluster - rabbit_misc:local_info_msg("Clustering with ~p as ~p node~n", - [ClusterNodes, NodeType]), + rabbit_log:info("Clustering with ~p as ~p node~n", + [ClusterNodes, NodeType]), ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true), rabbit_node_monitor:notify_joined_cluster(), ok; true -> - rabbit_misc:local_info_msg("Already member of cluster: ~p~n", - [ClusterNodes]), + rabbit_log:info("Already member of cluster: ~p~n", [ClusterNodes]), {ok, already_member} end. @@ -186,12 +192,12 @@ join_cluster(DiscoveryNode, NodeType) -> %% persisted messages reset() -> ensure_mnesia_not_running(), - rabbit_misc:local_info_msg("Resetting Rabbit~n", []), + rabbit_log:info("Resetting Rabbit~n", []), reset_gracefully(). force_reset() -> ensure_mnesia_not_running(), - rabbit_misc:local_info_msg("Resetting Rabbit forcefully~n", []), + rabbit_log:info("Resetting Rabbit forcefully~n", []), wipe(). reset_gracefully() -> @@ -247,8 +253,8 @@ update_cluster_nodes(DiscoveryNode) -> %% nodes mnesia:delete_schema([node()]), rabbit_node_monitor:write_cluster_status(Status), - rabbit_misc:local_info_msg("Updating cluster nodes from ~p~n", - [DiscoveryNode]), + rabbit_log:info("Updating cluster nodes from ~p~n", + [DiscoveryNode]), init_db_with_mnesia(AllNodes, node_type(), true, true); false -> e(inconsistent_cluster) @@ -271,7 +277,7 @@ forget_cluster_node(Node, RemoveWhenOffline) -> {true, false} -> remove_node_offline_node(Node); {true, true} -> e(online_node_offline_flag); {false, false} -> e(offline_node_no_offline_flag); - {false, true} -> rabbit_misc:local_info_msg( + {false, true} -> rabbit_log:info( "Removing node ~p from cluster~n", [Node]), case remove_node_if_mnesia_running(Node) of ok -> ok; @@ -303,7 +309,6 @@ remove_node_offline_node(Node) -> e(removing_node_from_offline_node) end. - %%---------------------------------------------------------------------------- %% Queries %%---------------------------------------------------------------------------- @@ -618,10 +623,10 @@ schema_ok_or_move() -> {error, Reason} -> %% NB: we cannot use rabbit_log here since it may not have been %% started yet - error_logger:warning_msg("schema integrity check failed: ~p~n" - "moving database to backup location " - "and recreating schema from scratch~n", - [Reason]), + rabbit_log:warning("schema integrity check failed: ~p~n" + "moving database to backup location " + "and recreating schema from scratch~n", + [Reason]), ok = move_db(), ok = create_schema() end. @@ -647,8 +652,8 @@ move_db() -> ok -> %% NB: we cannot use rabbit_log here since it may not have %% been started yet - error_logger:warning_msg("moved database from ~s to ~s~n", - [MnesiaDir, BackupDir]), + rabbit_log:warning("moved database from ~s to ~s~n", + [MnesiaDir, BackupDir]), ok; {error, Reason} -> throw({error, {cannot_backup_mnesia, MnesiaDir, BackupDir, Reason}}) @@ -694,7 +699,7 @@ leave_cluster(Node) -> end. wait_for(Condition) -> - error_logger:info_msg("Waiting for ~p...~n", [Condition]), + rabbit_log:info("Waiting for ~p...~n", [Condition]), timer:sleep(1000). start_mnesia(CheckConsistency) -> @@ -787,17 +792,23 @@ is_virgin_node() -> false end. -find_good_node([]) -> +find_auto_cluster_node([]) -> none; -find_good_node([Node | Nodes]) -> +find_auto_cluster_node([Node | Nodes]) -> + Fail = fun (Fmt, Args) -> + rabbit_log:warning( + "Could not auto-cluster with ~s: " ++ Fmt, [Node | Args]), + find_auto_cluster_node(Nodes) + end, case rpc:call(Node, rabbit_mnesia, node_info, []) of - {badrpc, _Reason} -> find_good_node(Nodes); + {badrpc, _} = Reason -> Fail("~p~n", [Reason]); %% old delegate hash check - {_OTP, _Rabbit, _Hash, _} -> find_good_node(Nodes); - {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of - {error, _} -> find_good_node(Nodes); - ok -> {ok, Node} - end + {_OTP, Rabbit, _Hash, _} -> Fail("version ~s~n", [Rabbit]); + {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of + {error, _} -> Fail("versions ~p~n", + [{OTP, Rabbit}]); + ok -> {ok, Node} + end end. is_only_clustered_disc_node() -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 9082dbd3..4e92bf39 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -37,8 +37,6 @@ -include("rabbit.hrl"). -include_lib("kernel/include/inet.hrl"). --define(SSL_TIMEOUT, 5). %% seconds - -define(FIRST_TEST_BIND_PORT, 10000). %%---------------------------------------------------------------------------- @@ -168,9 +166,14 @@ ensure_ssl() -> end end. +ssl_timeout() -> + {ok, Val} = application:get_env(rabbit, ssl_handshake_timeout), + Val. + ssl_transform_fun(SslOpts) -> fun (Sock) -> - case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of + Timeout = ssl_timeout(), + case catch ssl:ssl_accept(Sock, SslOpts, Timeout) of {ok, SslSock} -> {ok, #ssl_socket{tcp = Sock, ssl = SslSock}}; {error, timeout} -> @@ -185,7 +188,7 @@ ssl_transform_fun(SslOpts) -> %% form, according to the TLS spec). So we give %% the ssl_connection a little bit of time to send %% such alerts. - timer:sleep(?SSL_TIMEOUT * 1000), + timer:sleep(Timeout), {error, {ssl_upgrade_error, Reason}}; {'EXIT', Reason} -> {error, {ssl_upgrade_failure, Reason}} @@ -205,7 +208,7 @@ tcp_listener_addresses({Host, Port, Family0}) [{IPAddress, Port, Family} || {IPAddress, Family} <- getaddr(Host, Family0)]; tcp_listener_addresses({_Host, Port, _Family0}) -> - error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]), + rabbit_log:error("invalid port ~p - not 0..65535~n", [Port]), throw({error, {invalid_port, Port}}). tcp_listener_addresses_auto(Port) -> @@ -392,7 +395,7 @@ gethostaddr(Host, Family) -> end. host_lookup_error(Host, Reason) -> - error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]), + rabbit_log:error("invalid host ~p - ~p~n", [Host, Reason]), throw({error, {invalid_host, Host, Reason}}). resolve_family({_,_,_,_}, auto) -> inet; diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index ca843e14..7eaeaf50 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -458,6 +458,7 @@ ensure_ping_timer(State) -> State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL, ping_nodes). handle_live_rabbit(Node) -> + ok = rabbit_amqqueue:on_node_up(Node), ok = rabbit_alarm:on_node_up(Node), ok = rabbit_mnesia:on_node_up(Node). diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index c0fb05e2..04db5d5d 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -18,6 +18,7 @@ -include("rabbit.hrl"). -export([setup/0, active/0, read_enabled/1, list/1, dependencies/3]). +-export([ensure/1]). %%---------------------------------------------------------------------------- @@ -31,22 +32,54 @@ -spec(read_enabled/1 :: (file:filename()) -> [plugin_name()]). -spec(dependencies/3 :: (boolean(), [plugin_name()], [#plugin{}]) -> [plugin_name()]). - +-spec(ensure/1 :: (string()) -> {'ok', [atom()], [atom()]} | {error, any()}). -endif. %%---------------------------------------------------------------------------- +ensure(FileJustChanged) -> + {ok, OurFile} = application:get_env(rabbit, enabled_plugins_file), + case OurFile of + FileJustChanged -> + {ok, Dir} = application:get_env(rabbit, plugins_dir), + Enabled = read_enabled(OurFile), + Wanted = dependencies(false, Enabled, list(Dir)), + prepare_plugins(Enabled), + Current = active(), + Start = Wanted -- Current, + Stop = Current -- Wanted, + rabbit:start_apps(Start), + %% We need sync_notify here since mgmt will attempt to look at all + %% the modules for the disabled plugins - if they are unloaded + %% that won't work. + ok = rabbit_event:sync_notify(plugins_changed, [{enabled, Start}, + {disabled, Stop}]), + rabbit:stop_apps(Stop), + clean_plugins(Stop), + {ok, Start, Stop}; + _ -> + {error, {enabled_plugins_mismatch, FileJustChanged, OurFile}} + end. + %% @doc Prepares the file system and installs all enabled plugins. setup() -> - {ok, PluginDir} = application:get_env(rabbit, plugins_dir), {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), + + %% Eliminate the contents of the destination directory + case delete_recursively(ExpandDir) of + ok -> ok; + {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir, + [ExpandDir, E1]}}) + end, + {ok, EnabledFile} = application:get_env(rabbit, enabled_plugins_file), - prepare_plugins(EnabledFile, PluginDir, ExpandDir). + Enabled = read_enabled(EnabledFile), + prepare_plugins(Enabled). %% @doc Lists the plugins which are currently running. active() -> {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), - InstalledPlugins = [ P#plugin.name || P <- list(ExpandDir) ], + InstalledPlugins = plugin_names(list(ExpandDir)), [App || {App, _, _} <- rabbit_misc:which_applications(), lists:member(App, InstalledPlugins)]. @@ -64,10 +97,10 @@ list(PluginsDir) -> [plugin_info(PluginsDir, Plug) || Plug <- EZs ++ FreeApps]), case Problems of [] -> ok; - _ -> error_logger:warning_msg( + _ -> rabbit_log:warning( "Problem reading some plugins: ~p~n", [Problems]) end, - Plugins. + ensure_dependencies(Plugins). %% @doc Read the list of enabled plugins from the supplied term file. read_enabled(PluginsFile) -> @@ -86,15 +119,10 @@ read_enabled(PluginsFile) -> %% the resulting list, otherwise they're skipped. dependencies(Reverse, Sources, AllPlugins) -> {ok, G} = rabbit_misc:build_acyclic_graph( - fun (App, _Deps) -> [{App, App}] end, - fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end, - lists:ukeysort( - 1, [{Name, Deps} || - #plugin{name = Name, - dependencies = Deps} <- AllPlugins] ++ - [{Dep, []} || - #plugin{dependencies = Deps} <- AllPlugins, - Dep <- Deps])), + fun ({App, _Deps}) -> [{App, App}] end, + fun ({App, Deps}) -> [{App, Dep} || Dep <- Deps] end, + [{Name, Deps} || #plugin{name = Name, + dependencies = Deps} <- AllPlugins]), Dests = case Reverse of false -> digraph_utils:reachable(Sources, G); true -> digraph_utils:reaching(Sources, G) @@ -102,27 +130,44 @@ dependencies(Reverse, Sources, AllPlugins) -> true = digraph:delete(G), Dests. +%% Make sure we don't list OTP apps in here, and also that we detect +%% missing dependencies. +ensure_dependencies(Plugins) -> + Names = plugin_names(Plugins), + NotThere = [Dep || #plugin{dependencies = Deps} <- Plugins, + Dep <- Deps, + not lists:member(Dep, Names)], + {OTP, Missing} = lists:partition(fun is_loadable/1, lists:usort(NotThere)), + case Missing of + [] -> ok; + _ -> Blame = [Name || #plugin{name = Name, + dependencies = Deps} <- Plugins, + lists:any(fun (Dep) -> + lists:member(Dep, Missing) + end, Deps)], + throw({error, {missing_dependencies, Missing, Blame}}) + end, + [P#plugin{dependencies = Deps -- OTP} + || P = #plugin{dependencies = Deps} <- Plugins]. + +is_loadable(App) -> + case application:load(App) of + {error, {already_loaded, _}} -> true; + ok -> application:unload(App), + true; + _ -> false + end. + %%---------------------------------------------------------------------------- -prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) -> +prepare_plugins(Enabled) -> + {ok, PluginsDistDir} = application:get_env(rabbit, plugins_dir), + {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), + AllPlugins = list(PluginsDistDir), - Enabled = read_enabled(EnabledFile), ToUnpack = dependencies(false, Enabled, AllPlugins), ToUnpackPlugins = lookup_plugins(ToUnpack, AllPlugins), - case Enabled -- plugin_names(ToUnpackPlugins) of - [] -> ok; - Missing -> error_logger:warning_msg( - "The following enabled plugins were not found: ~p~n", - [Missing]) - end, - - %% Eliminate the contents of the destination directory - case delete_recursively(ExpandDir) of - ok -> ok; - {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir, - [ExpandDir, E1]}}) - end, case filelib:ensure_dir(ExpandDir ++ "/") of ok -> ok; {error, E2} -> throw({error, {cannot_create_plugins_expand_dir, @@ -134,6 +179,20 @@ prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) -> [prepare_dir_plugin(PluginAppDescPath) || PluginAppDescPath <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")]. +clean_plugins(Plugins) -> + {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), + [clean_plugin(Plugin, ExpandDir) || Plugin <- Plugins]. + +clean_plugin(Plugin, ExpandDir) -> + {ok, Mods} = application:get_key(Plugin, modules), + application:unload(Plugin), + [begin + code:soft_purge(Mod), + code:delete(Mod), + false = code:is_loaded(Mod) + end || Mod <- Mods], + delete_recursively(rabbit_misc:format("~s/~s", [ExpandDir, Plugin])). + prepare_dir_plugin(PluginAppDescPath) -> code:add_path(filename:dirname(PluginAppDescPath)), list_to_atom(filename:basename(PluginAppDescPath, ".app")). @@ -172,8 +231,7 @@ plugin_info(Base, {app, App0}) -> mkplugin(Name, Props, Type, Location) -> Version = proplists:get_value(vsn, Props, "0"), Description = proplists:get_value(description, Props, ""), - Dependencies = - filter_applications(proplists:get_value(applications, Props, [])), + Dependencies = proplists:get_value(applications, Props, []), #plugin{name = Name, version = Version, description = Description, dependencies = Dependencies, location = Location, type = Type}. @@ -206,18 +264,6 @@ parse_binary(Bin) -> Err -> {error, {invalid_app, Err}} end. -filter_applications(Applications) -> - [Application || Application <- Applications, - not is_available_app(Application)]. - -is_available_app(Application) -> - case application:load(Application) of - {error, {already_loaded, _}} -> true; - ok -> application:unload(Application), - true; - _ -> false - end. - plugin_names(Plugins) -> [Name || #plugin{name = Name} <- Plugins]. diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl index 89e16f14..278fcf98 100644 --- a/src/rabbit_plugins_main.erl +++ b/src/rabbit_plugins_main.erl @@ -18,23 +18,34 @@ -include("rabbit.hrl"). -export([start/0, stop/0]). +-export([action/6]). +-define(NODE_OPT, "-n"). -define(VERBOSE_OPT, "-v"). -define(MINIMAL_OPT, "-m"). -define(ENABLED_OPT, "-E"). -define(ENABLED_ALL_OPT, "-e"). +-define(OFFLINE_OPT, "--offline"). +-define(ONLINE_OPT, "--online"). +-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}). -define(VERBOSE_DEF, {?VERBOSE_OPT, flag}). -define(MINIMAL_DEF, {?MINIMAL_OPT, flag}). -define(ENABLED_DEF, {?ENABLED_OPT, flag}). -define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}). +-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}). +-define(ONLINE_DEF, {?ONLINE_OPT, flag}). --define(GLOBAL_DEFS, []). +-define(RPC_TIMEOUT, infinity). + +-define(GLOBAL_DEFS(Node), [?NODE_DEF(Node)]). -define(COMMANDS, [{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]}, - enable, - disable]). + {enable, [?OFFLINE_DEF, ?ONLINE_DEF]}, + {disable, [?OFFLINE_DEF, ?ONLINE_DEF]}, + {set, [?OFFLINE_DEF, ?ONLINE_DEF]}, + {sync, []}]). %%---------------------------------------------------------------------------- @@ -51,11 +62,10 @@ start() -> {ok, [[PluginsFile|_]|_]} = init:get_argument(enabled_plugins_file), + {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), {ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir), {Command, Opts, Args} = - case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS, - init:get_plain_arguments()) - of + case parse_arguments(init:get_plain_arguments(), NodeStr) of {ok, Res} -> Res; no_command -> print_error("could not recognise command", []), usage() @@ -67,7 +77,8 @@ start() -> [string:join([atom_to_list(Command) | Args], " ")]) end, - case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of + Node = proplists:get_value(?NODE_OPT, Opts), + case catch action(Command, Node, Args, Opts, PluginsFile, PluginsDir) of ok -> rabbit_misc:quit(0); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> @@ -76,12 +87,23 @@ start() -> {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> PrintInvalidCommandError(), usage(); + {error, {missing_dependencies, Missing, Blame}} -> + print_error("dependent plugins ~p not found; used by ~p.", + [Missing, Blame]), + rabbit_misc:quit(2); {error, Reason} -> print_error("~p", [Reason]), rabbit_misc:quit(2); {error_string, Reason} -> print_error("~s", [Reason]), rabbit_misc:quit(2); + {badrpc, {'EXIT', Reason}} -> + print_error("~p", [Reason]), + rabbit_misc:quit(2); + {badrpc, Reason} -> + print_error("unable to connect to node ~w: ~w", [Node, Reason]), + print_badrpc_diagnostics([Node]), + rabbit_misc:quit(2); Other -> print_error("~p", [Other]), rabbit_misc:quit(2) @@ -92,50 +114,80 @@ stop() -> %%---------------------------------------------------------------------------- -action(list, [], Opts, PluginsFile, PluginsDir) -> - action(list, [".*"], Opts, PluginsFile, PluginsDir); -action(list, [Pat], Opts, PluginsFile, PluginsDir) -> - format_plugins(Pat, Opts, PluginsFile, PluginsDir); +parse_arguments(CmdLine, NodeStr) -> + case rabbit_misc:parse_arguments( + ?COMMANDS, ?GLOBAL_DEFS(NodeStr), CmdLine) of + {ok, {Cmd, Opts0, Args}} -> + Opts = [case K of + ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)}; + _ -> {K, V} + end || {K, V} <- Opts0], + {ok, {Cmd, Opts, Args}}; + E -> + E + end. + +action(list, Node, [], Opts, PluginsFile, PluginsDir) -> + action(list, Node, [".*"], Opts, PluginsFile, PluginsDir); +action(list, Node, [Pat], Opts, PluginsFile, PluginsDir) -> + format_plugins(Node, Pat, Opts, PluginsFile, PluginsDir); -action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) -> +action(enable, Node, ToEnable0, Opts, PluginsFile, PluginsDir) -> case ToEnable0 of [] -> throw({error_string, "Not enough arguments for 'enable'"}); _ -> ok end, AllPlugins = rabbit_plugins:list(PluginsDir), Enabled = rabbit_plugins:read_enabled(PluginsFile), - ImplicitlyEnabled = rabbit_plugins:dependencies(false, - Enabled, AllPlugins), + ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins), ToEnable = [list_to_atom(Name) || Name <- ToEnable0], Missing = ToEnable -- plugin_names(AllPlugins), + case Missing of + [] -> ok; + _ -> throw({error_string, fmt_missing(Missing)}) + end, NewEnabled = lists:usort(Enabled ++ ToEnable), NewImplicitlyEnabled = rabbit_plugins:dependencies(false, NewEnabled, AllPlugins), - MissingDeps = (NewImplicitlyEnabled -- plugin_names(AllPlugins)) -- Missing, - case {Missing, MissingDeps} of - {[], []} -> ok; - {Miss, []} -> throw({error_string, fmt_missing("plugins", Miss)}); - {[], Miss} -> throw({error_string, fmt_missing("dependencies", Miss)}); - {_, _} -> throw({error_string, - fmt_missing("plugins", Missing) ++ - fmt_missing("dependencies", MissingDeps)}) - end, write_enabled_plugins(PluginsFile, NewEnabled), case NewEnabled -- ImplicitlyEnabled of [] -> io:format("Plugin configuration unchanged.~n"); _ -> print_list("The following plugins have been enabled:", - NewImplicitlyEnabled -- ImplicitlyEnabled), - report_change() - end; + NewImplicitlyEnabled -- ImplicitlyEnabled) + end, + action_change( + Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile); + +action(set, Node, ToSet0, Opts, PluginsFile, PluginsDir) -> + ToSet = [list_to_atom(Name) || Name <- ToSet0], + AllPlugins = rabbit_plugins:list(PluginsDir), + Enabled = rabbit_plugins:read_enabled(PluginsFile), + ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins), + Missing = ToSet -- plugin_names(AllPlugins), + case Missing of + [] -> ok; + _ -> throw({error_string, fmt_missing(Missing)}) + end, + NewImplicitlyEnabled = rabbit_plugins:dependencies(false, + ToSet, AllPlugins), + write_enabled_plugins(PluginsFile, ToSet), + case NewImplicitlyEnabled of + [] -> io:format("All plugins are now disabled.~n"); + _ -> print_list("The following plugins are now enabled:", + NewImplicitlyEnabled) + end, + action_change( + Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile); -action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> +action(disable, Node, ToDisable0, Opts, PluginsFile, PluginsDir) -> case ToDisable0 of [] -> throw({error_string, "Not enough arguments for 'disable'"}); _ -> ok end, - ToDisable = [list_to_atom(Name) || Name <- ToDisable0], - Enabled = rabbit_plugins:read_enabled(PluginsFile), AllPlugins = rabbit_plugins:list(PluginsDir), + Enabled = rabbit_plugins:read_enabled(PluginsFile), + ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins), + ToDisable = [list_to_atom(Name) || Name <- ToDisable0], Missing = ToDisable -- plugin_names(AllPlugins), case Missing of [] -> ok; @@ -144,30 +196,35 @@ action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> end, ToDisableDeps = rabbit_plugins:dependencies(true, ToDisable, AllPlugins), NewEnabled = Enabled -- ToDisableDeps, + NewImplicitlyEnabled = rabbit_plugins:dependencies(false, + NewEnabled, AllPlugins), case length(Enabled) =:= length(NewEnabled) of true -> io:format("Plugin configuration unchanged.~n"); - false -> ImplicitlyEnabled = - rabbit_plugins:dependencies(false, Enabled, AllPlugins), - NewImplicitlyEnabled = - rabbit_plugins:dependencies(false, - NewEnabled, AllPlugins), - print_list("The following plugins have been disabled:", + false -> print_list("The following plugins have been disabled:", ImplicitlyEnabled -- NewImplicitlyEnabled), - write_enabled_plugins(PluginsFile, NewEnabled), - report_change() - end. + write_enabled_plugins(PluginsFile, NewEnabled) + end, + action_change( + Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile); + +action(sync, Node, [], _Opts, PluginsFile, _PluginsDir) -> + sync(Node, true, PluginsFile). %%---------------------------------------------------------------------------- -print_error(Format, Args) -> - rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args). +fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args). + +print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args). + +print_badrpc_diagnostics(Nodes) -> + fmt_stderr(rabbit_nodes:diagnostics(Nodes), []). usage() -> io:format("~s", [rabbit_plugins_usage:usage()]), rabbit_misc:quit(1). %% Pretty print a list of plugins. -format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> +format_plugins(Node, Pattern, Opts, PluginsFile, PluginsDir) -> Verbose = proplists:get_bool(?VERBOSE_OPT, Opts), Minimal = proplists:get_bool(?MINIMAL_OPT, Opts), Format = case {Verbose, Minimal} of @@ -182,41 +239,52 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> AvailablePlugins = rabbit_plugins:list(PluginsDir), EnabledExplicitly = rabbit_plugins:read_enabled(PluginsFile), - EnabledImplicitly = - rabbit_plugins:dependencies(false, EnabledExplicitly, - AvailablePlugins) -- EnabledExplicitly, - Missing = [#plugin{name = Name, dependencies = []} || - Name <- ((EnabledExplicitly ++ EnabledImplicitly) -- - plugin_names(AvailablePlugins))], + AllEnabled = rabbit_plugins:dependencies(false, EnabledExplicitly, + AvailablePlugins), + EnabledImplicitly = AllEnabled -- EnabledExplicitly, + {StatusMsg, Running} = + case rpc:call(Node, rabbit_plugins, active, [], ?RPC_TIMEOUT) of + {badrpc, _} -> {"[failed to contact ~s - status not shown]", []}; + Active -> {"* = running on ~s", Active} + end, {ok, RE} = re:compile(Pattern), Plugins = [ Plugin || - Plugin = #plugin{name = Name} <- AvailablePlugins ++ Missing, + Plugin = #plugin{name = Name} <- AvailablePlugins, re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match, - if OnlyEnabled -> lists:member(Name, EnabledExplicitly); - OnlyEnabledAll -> (lists:member(Name, - EnabledExplicitly) or - lists:member(Name, EnabledImplicitly)); + if OnlyEnabled -> lists:member(Name, EnabledExplicitly); + OnlyEnabledAll -> lists:member(Name, EnabledExplicitly) or + lists:member(Name,EnabledImplicitly); true -> true end], Plugins1 = usort_plugins(Plugins), MaxWidth = lists:max([length(atom_to_list(Name)) || #plugin{name = Name} <- Plugins1] ++ [0]), - [format_plugin(P, EnabledExplicitly, EnabledImplicitly, - plugin_names(Missing), Format, MaxWidth) || P <- Plugins1], + case Format of + minimal -> ok; + _ -> io:format(" Configured: E = explicitly enabled; " + "e = implicitly enabled~n" + " | Status: ~s~n" + " |/~n", [rabbit_misc:format(StatusMsg, [Node])]) + end, + [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Running, + Format, MaxWidth) || P <- Plugins1], ok. format_plugin(#plugin{name = Name, version = Version, description = Description, dependencies = Deps}, - EnabledExplicitly, EnabledImplicitly, Missing, - Format, MaxWidth) -> - Glyph = case {lists:member(Name, EnabledExplicitly), - lists:member(Name, EnabledImplicitly), - lists:member(Name, Missing)} of - {true, false, false} -> "[E]"; - {false, true, false} -> "[e]"; - {_, _, true} -> "[!]"; - _ -> "[ ]" - end, + EnabledExplicitly, EnabledImplicitly, Running, Format, + MaxWidth) -> + EnabledGlyph = case {lists:member(Name, EnabledExplicitly), + lists:member(Name, EnabledImplicitly)} of + {true, false} -> "E"; + {false, true} -> "e"; + _ -> " " + end, + RunningGlyph = case lists:member(Name, Running) of + true -> "*"; + false -> " " + end, + Glyph = rabbit_misc:format("[~s~s]", [EnabledGlyph, RunningGlyph]), Opt = fun (_F, A, A) -> ok; ( F, A, _) -> io:format(F, [A]) end, @@ -227,9 +295,9 @@ format_plugin(#plugin{name = Name, version = Version, Opt("~s", Version, undefined), io:format("~n"); verbose -> io:format("~s ~w~n", [Glyph, Name]), - Opt(" Version: \t~s~n", Version, undefined), - Opt(" Dependencies:\t~p~n", Deps, []), - Opt(" Description: \t~s~n", Description, undefined), + Opt(" Version: \t~s~n", Version, undefined), + Opt(" Dependencies:\t~p~n", Deps, []), + Opt(" Description: \t~s~n", Description, undefined), io:format("~n") end. @@ -240,8 +308,8 @@ fmt_list(Header, Plugins) -> lists:flatten( [Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]). -fmt_missing(Desc, Missing) -> - fmt_list("The following " ++ Desc ++ " could not be found:", Missing). +fmt_missing(Missing) -> + fmt_list("The following plugins could not be found:", Missing). usort_plugins(Plugins) -> lists:usort(fun plugins_cmp/2, Plugins). @@ -262,6 +330,51 @@ write_enabled_plugins(PluginsFile, Plugins) -> PluginsFile, Reason}}) end. -report_change() -> - io:format("Plugin configuration has changed. " - "Restart RabbitMQ for changes to take effect.~n"). +action_change(Opts, Node, Old, New, PluginsFile) -> + action_change0(proplists:get_bool(?OFFLINE_OPT, Opts), + proplists:get_bool(?ONLINE_OPT, Opts), + Node, Old, New, PluginsFile). + +action_change0(true, _Online, _Node, Same, Same, _PluginsFile) -> + %% Definitely nothing to do + ok; +action_change0(true, _Online, _Node, _Old, _New, _PluginsFile) -> + io:format("Offline change; changes will take effect at broker restart.~n"); +action_change0(false, Online, Node, _Old, _New, PluginsFile) -> + sync(Node, Online, PluginsFile). + +sync(Node, ForceOnline, PluginsFile) -> + rpc_call(Node, ForceOnline, rabbit_plugins, ensure, [PluginsFile]). + +rpc_call(Node, Online, Mod, Fun, Args) -> + io:format("~nApplying plugin configuration to ~s...", [Node]), + case rpc:call(Node, Mod, Fun, Args) of + {ok, [], []} -> + io:format(" nothing to do.~n", []); + {ok, Start, []} -> + io:format(" started ~b plugin~s.~n", [length(Start), plur(Start)]); + {ok, [], Stop} -> + io:format(" stopped ~b plugin~s.~n", [length(Stop), plur(Stop)]); + {ok, Start, Stop} -> + io:format(" stopped ~b plugin~s and started ~b plugin~s.~n", + [length(Stop), plur(Stop), length(Start), plur(Start)]); + {badrpc, nodedown} = Error -> + io:format(" failed.~n", []), + case Online of + true -> Error; + false -> io:format( + " * Could not contact node ~s.~n" + " Changes will take effect at broker restart.~n" + " * Options: --online - fail if broker cannot be " + "contacted.~n" + " --offline - do not try to contact " + "broker.~n", + [Node]) + end; + Error -> + io:format(" failed.~n", []), + Error + end. + +plur([_]) -> ""; +plur(_) -> "s". diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index fe2b766f..cc88765f 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -34,7 +34,8 @@ register() -> {policy_validator, <<"dead-letter-routing-key">>}, {policy_validator, <<"message-ttl">>}, {policy_validator, <<"expires">>}, - {policy_validator, <<"max-length">>}]], + {policy_validator, <<"max-length">>}, + {policy_validator, <<"max-length-bytes">>}]], ok. validate_policy(Terms) -> @@ -61,13 +62,13 @@ validate_policy0(<<"dead-letter-routing-key">>, Value) -> {error, "~p is not a valid dead letter routing key", [Value]}; validate_policy0(<<"message-ttl">>, Value) - when is_integer(Value), Value >= 0, Value =< ?MAX_EXPIRY_TIMER -> + when is_integer(Value), Value >= 0 -> ok; validate_policy0(<<"message-ttl">>, Value) -> {error, "~p is not a valid message TTL", [Value]}; validate_policy0(<<"expires">>, Value) - when is_integer(Value), Value >= 1, Value =< ?MAX_EXPIRY_TIMER -> + when is_integer(Value), Value >= 1 -> ok; validate_policy0(<<"expires">>, Value) -> {error, "~p is not a valid queue expiry", [Value]}; @@ -76,6 +77,10 @@ validate_policy0(<<"max-length">>, Value) when is_integer(Value), Value >= 0 -> ok; validate_policy0(<<"max-length">>, Value) -> - {error, "~p is not a valid maximum length", [Value]}. - + {error, "~p is not a valid maximum length", [Value]}; +validate_policy0(<<"max-length-bytes">>, Value) + when is_integer(Value), Value >= 0 -> + ok; +validate_policy0(<<"max-length-bytes">>, Value) -> + {error, "~p is not a valid maximum length in bytes", [Value]}. diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 0a69fb32..f5d03360 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -46,17 +46,11 @@ name(#exchange{policy = Policy}) -> name0(Policy). name0(undefined) -> none; name0(Policy) -> pget(name, Policy). -set(Q = #amqqueue{name = Name}) -> rabbit_queue_decorator:set( - Q#amqqueue{policy = set0(Name)}); -set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set( - X#exchange{policy = set0(Name)}). +set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)}; +set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}. set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)). -set(Q = #amqqueue{name = Name}, Ps) -> Q#amqqueue{policy = match(Name, Ps)}; -set(X = #exchange{name = Name}, Ps) -> rabbit_exchange_decorator:set( - X#exchange{policy = match(Name, Ps)}). - get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy); get(Name, #exchange{policy = Policy}) -> get0(Name, Policy); %% Caution - SLOW. @@ -104,12 +98,18 @@ recover0() -> Policies = list(), [rabbit_misc:execute_mnesia_transaction( fun () -> - mnesia:write(rabbit_durable_exchange, set(X, Policies), write) - end) || X <- Xs], + mnesia:write( + rabbit_durable_exchange, + rabbit_exchange_decorator:set( + X#exchange{policy = match(Name, Policies)}), write) + end) || X = #exchange{name = Name} <- Xs], [rabbit_misc:execute_mnesia_transaction( fun () -> - mnesia:write(rabbit_durable_queue, set(Q, Policies), write) - end) || Q <- Qs], + mnesia:write( + rabbit_durable_queue, + rabbit_queue_decorator:set( + Q#amqqueue{policy = match(Name, Policies)}), write) + end) || Q = #amqqueue{name = Name} <- Qs], ok. invalid_file() -> diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl index 6205e2dc..adfe0c7f 100644 --- a/src/rabbit_queue_decorator.erl +++ b/src/rabbit_queue_decorator.erl @@ -2,7 +2,7 @@ -include("rabbit.hrl"). --export([select/1, set/1]). +-export([select/1, set/1, register/2, unregister/1]). %%---------------------------------------------------------------------------- @@ -41,3 +41,24 @@ select(Modules) -> set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}. list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)]. + +register(TypeName, ModuleName) -> + rabbit_registry:register(queue_decorator, TypeName, ModuleName), + [maybe_recover(Q) || Q <- rabbit_amqqueue:list()], + ok. + +unregister(TypeName) -> + rabbit_registry:unregister(queue_decorator, TypeName), + [maybe_recover(Q) || Q <- rabbit_amqqueue:list()], + ok. + +maybe_recover(Q = #amqqueue{name = Name, + decorators = Decs}) -> + #amqqueue{decorators = Decs1} = set(Q), + Old = lists:sort(select(Decs)), + New = lists:sort(select(Decs1)), + case New of + Old -> ok; + _ -> [M:startup(Q) || M <- New -- Old], + rabbit_amqqueue:update_decorators(Name) + end. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 56c19d3f..0f572866 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -21,7 +21,7 @@ publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). --export([add_queue_ttl/0, avoid_zeroes/0]). +-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0]). -define(CLEAN_FILENAME, "clean.dot"). @@ -140,8 +140,11 @@ -define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes -define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)). -%% 16 bytes for md5sum + 8 for expiry --define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES)). +-define(SIZE_BYTES, 4). +-define(SIZE_BITS, (?SIZE_BYTES * 8)). + +%% 16 bytes for md5sum + 8 for expiry + 4 for size +-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)). %% + 2 for seq, bits and prefix -define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)). @@ -168,8 +171,9 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({add_queue_ttl, local, []}). --rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}). +-rabbit_upgrade({add_queue_ttl, local, []}). +-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}). +-rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}). -ifdef(use_specs). @@ -199,7 +203,8 @@ -spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). -spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), contains_predicate(), on_sync_fun()) -> - {'undefined' | non_neg_integer(), qistate()}). + {'undefined' | non_neg_integer(), + 'undefined' | non_neg_integer(), qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). -spec(publish/5 :: (rabbit_types:msg_id(), seq_id(), @@ -415,7 +420,7 @@ init_clean(RecoveredCounts, State) -> end, Segments, RecoveredCounts), %% the counts above include transient messages, which would be the %% wrong thing to return - {undefined, State1 # qistate { segments = Segments1 }}. + {undefined, undefined, State1 # qistate { segments = Segments1 }}. init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% Recover the journal completely. This will also load segments @@ -424,7 +429,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% and the journal. State1 = #qistate { dir = Dir, segments = Segments } = recover_journal(State), - {Segments1, Count, DirtyCount} = + {Segments1, Count, Bytes, DirtyCount} = %% Load each segment in turn and filter out messages that are %% not in the msg_store, by adding acks to the journal. These %% acks only go to the RAM journal as it doesn't matter if we @@ -433,16 +438,18 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% dirty count here, so we can call maybe_flush_journal below %% and avoid unnecessary file system operations. lists:foldl( - fun (Seg, {Segments2, CountAcc, DirtyCount}) -> - {Segment = #segment { unacked = UnackedCount }, Dirty} = + fun (Seg, {Segments2, CountAcc, BytesAcc, DirtyCount}) -> + {{Segment = #segment { unacked = UnackedCount }, Dirty}, + UnackedBytes} = recover_segment(ContainsCheckFun, CleanShutdown, segment_find_or_new(Seg, Dir, Segments2)), {segment_store(Segment, Segments2), - CountAcc + UnackedCount, DirtyCount + Dirty} - end, {Segments, 0, 0}, all_segment_nums(State1)), + CountAcc + UnackedCount, + BytesAcc + UnackedBytes, DirtyCount + Dirty} + end, {Segments, 0, 0, 0}, all_segment_nums(State1)), State2 = maybe_flush_journal(State1 #qistate { segments = Segments1, dirty_count = DirtyCount }), - {Count, State2}. + {Count, Bytes, State2}. terminate(State = #qistate { journal_handle = JournalHdl, segments = Segments }) -> @@ -464,12 +471,16 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack}, - SegmentAndDirtyCount) -> - recover_message(ContainsCheckFun(MsgId), CleanShutdown, - Del, RelSeq, SegmentAndDirtyCount) + fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, Del, no_ack}, + {SegmentAndDirtyCount, Bytes}) -> + {recover_message(ContainsCheckFun(MsgId), CleanShutdown, + Del, RelSeq, SegmentAndDirtyCount), + Bytes + case IsPersistent of + true -> MsgProps#message_properties.size; + false -> 0 + end} end, - {Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, + {{Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, 0}, SegEntries1). recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount) -> @@ -549,13 +560,15 @@ scan_segments(Fun, Acc, State) -> %% expiry/binary manipulation %%---------------------------------------------------------------------------- -create_pub_record_body(MsgId, #message_properties { expiry = Expiry }) -> - [MsgId, expiry_to_binary(Expiry)]. +create_pub_record_body(MsgId, #message_properties { expiry = Expiry, + size = Size }) -> + [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>]. expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. -parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) -> +parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, + Size:?SIZE_BITS>>) -> %% work around for binary data fragmentation. See %% rabbit_msg_file:read_next/2 <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>, @@ -563,7 +576,8 @@ parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) -> ?NO_EXPIRY -> undefined; X -> X end, - {MsgId, #message_properties { expiry = Exp }}. + {MsgId, #message_properties { expiry = Exp, + size = Size }}. %%---------------------------------------------------------------------------- %% journal manipulation @@ -1064,6 +1078,42 @@ avoid_zeroes_segment(<<0:?REL_SEQ_ONLY_PREFIX_BITS, avoid_zeroes_segment(_) -> stop. +%% At upgrade time we just define every message's size as 0 - that +%% will save us a load of faff with the message store, and means we +%% can actually use the clean recovery terms in VQ. It does mean we +%% don't count message bodies from before the migration, but we can +%% live with that. +store_msg_size() -> + foreach_queue_index({fun store_msg_size_journal/1, + fun store_msg_size_segment/1}). + +store_msg_size_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +store_msg_size_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>) -> + {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest}; +store_msg_size_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, + Rest/binary>>) -> + {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest}; +store_msg_size_journal(_) -> + stop. + +store_msg_size_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, Rest/binary>>) -> + {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest}; +store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> + {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, + Rest}; +store_msg_size_segment(_) -> + stop. + + %%---------------------------------------------------------------------------- foreach_queue_index(Funs) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 69f26d86..ca73006a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -27,7 +27,6 @@ -export([conserve_resources/3, server_properties/1]). --define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 30). -define(CHANNEL_TERMINATION_TIMEOUT, 3). @@ -43,7 +42,7 @@ -record(connection, {name, host, peer_host, port, peer_port, protocol, user, timeout_sec, frame_max, channel_max, vhost, client_properties, capabilities, - auth_mechanism, auth_state}). + auth_mechanism, auth_state, connected_at}). -record(throttle, {alarmed_by, last_blocked_by, last_blocked_at}). @@ -55,7 +54,7 @@ peer_host, ssl, peer_cert_subject, peer_cert_issuer, peer_cert_validity, auth_mechanism, ssl_protocol, ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost, - timeout, frame_max, channel_max, client_properties]). + timeout, frame_max, channel_max, client_properties, connected_at]). -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). @@ -189,10 +188,10 @@ server_capabilities(_) -> log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args). socket_error(Reason) when is_atom(Reason) -> - log(error, "error on AMQP connection ~p: ~s~n", + log(error, "Error on AMQP connection ~p: ~s~n", [self(), rabbit_misc:format_inet_error(Reason)]); socket_error(Reason) -> - log(error, "error on AMQP connection ~p:~n~p~n", [self(), Reason]). + log(error, "Error on AMQP connection ~p:~n~p~n", [self(), Reason]). inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). @@ -216,8 +215,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> exit(normal) end, log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]), + {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout), ClientSock = socket_op(Sock, SockTransform), - erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), + erlang:send_after(HandshakeTimeout, self(), handshake_timeout), {PeerHost, PeerPort, Host, Port} = socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), ?store_proc_name(list_to_binary(Name)), @@ -231,13 +231,14 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> peer_port = PeerPort, protocol = none, user = none, - timeout_sec = ?HANDSHAKE_TIMEOUT, + timeout_sec = (HandshakeTimeout / 1000), frame_max = ?FRAME_MIN_SIZE, vhost = none, client_properties = none, capabilities = [], auth_mechanism = none, - auth_state = none}, + auth_state = none, + connected_at = rabbit_misc:now_to_ms(os:timestamp())}, callback = uninitialized_callback, recv_len = 0, pending_recv = false, @@ -410,7 +411,7 @@ handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) rabbit_event:notify( connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref), - State; + rabbit_event:init_stats_timer(State, #v1.stats_timer); handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) -> %% Ignore, we will emit a created event once we start running. State; @@ -548,21 +549,27 @@ wait_for_channel_termination(0, TimerRef, State) -> end; _ -> State end; -wait_for_channel_termination(N, TimerRef, State) -> +wait_for_channel_termination(N, TimerRef, + State = #v1{connection_state = CS, + connection = #connection{ + name = ConnName, + user = User, + vhost = VHost}}) -> receive {'DOWN', _MRef, process, ChPid, Reason} -> {Channel, State1} = channel_cleanup(ChPid, State), case {Channel, termination_kind(Reason)} of - {undefined, _} -> exit({abnormal_dependent_exit, - ChPid, Reason}); - {_, controlled} -> wait_for_channel_termination( - N-1, TimerRef, State1); - {_, uncontrolled} -> log(error, - "AMQP connection ~p, channel ~p - " - "error while terminating:~n~p~n", - [self(), Channel, Reason]), - wait_for_channel_termination( - N-1, TimerRef, State1) + {undefined, _} -> + exit({abnormal_dependent_exit, ChPid, Reason}); + {_, controlled} -> + wait_for_channel_termination(N-1, TimerRef, State1); + {_, uncontrolled} -> + log(error, "Error on AMQP connection ~p (~s, vhost: '~s'," + " user: '~s', state: ~p), channel ~p:" + "error while terminating:~n~p~n", + [self(), ConnName, VHost, User#user.username, + CS, Channel, Reason]), + wait_for_channel_termination(N-1, TimerRef, State1) end; cancel_wait -> exit(channel_termination_timeout) @@ -581,16 +588,24 @@ maybe_close(State) -> termination_kind(normal) -> controlled; termination_kind(_) -> uncontrolled. +log_hard_error(#v1{connection_state = CS, + connection = #connection{ + name = ConnName, + user = User, + vhost = VHost}}, Channel, Reason) -> + log(error, + "Error on AMQP connection ~p (~s, vhost: '~s'," + " user: '~s', state: ~p), channel ~p:~n~p~n", + [self(), ConnName, VHost, User#user.username, CS, Channel, Reason]). + handle_exception(State = #v1{connection_state = closed}, Channel, Reason) -> - log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n", - [self(), closed, Channel, Reason]), + log_hard_error(State, Channel, Reason), State; handle_exception(State = #v1{connection = #connection{protocol = Protocol}, connection_state = CS}, Channel, Reason) when ?IS_RUNNING(State) orelse CS =:= closing -> - log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n", - [self(), CS, Channel, Reason]), + log_hard_error(State, Channel, Reason), {0, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), State1 = close_connection(terminate_channels(State)), @@ -1130,6 +1145,7 @@ ic(channel_max, #connection{channel_max = ChMax}) -> ChMax; ic(client_properties, #connection{client_properties = CP}) -> CP; ic(auth_mechanism, #connection{auth_mechanism = none}) -> none; ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name; +ic(connected_at, #connection{connected_at = T}) -> T; ic(Item, #connection{}) -> throw({bad_argument, Item}). socket_info(Get, Select, #v1{sock = Sock}) -> diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index e23e3e6b..9f837cce 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -23,11 +23,14 @@ -export([start/0, stop/0, store/2, read/1, clear/0]). --export([upgrade_recovery_terms/0, start_link/0]). +-export([start_link/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-export([upgrade_recovery_terms/0, persistent_bytes/0]). + -rabbit_upgrade({upgrade_recovery_terms, local, []}). +-rabbit_upgrade({persistent_bytes, local, [upgrade_recovery_terms]}). %%---------------------------------------------------------------------------- @@ -61,6 +64,8 @@ clear() -> ok = dets:delete_all_objects(?MODULE), flush(). +start_link() -> gen_server:start_link(?MODULE, [], []). + %%---------------------------------------------------------------------------- upgrade_recovery_terms() -> @@ -84,7 +89,20 @@ upgrade_recovery_terms() -> close_table() end. -start_link() -> gen_server:start_link(?MODULE, [], []). +persistent_bytes() -> dets_upgrade(fun persistent_bytes/1). +persistent_bytes(Props) -> Props ++ [{persistent_bytes, 0}]. + +dets_upgrade(Fun)-> + open_table(), + try + ok = dets:foldl(fun ({DirBaseName, Terms}, Acc) -> + store(DirBaseName, Fun(Terms)), + Acc + end, ok, ?MODULE), + ok + after + close_table() + end. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index da75932d..fe2c3b58 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -70,7 +70,13 @@ wait_for_replicated() -> not lists:member({local_content, true}, TabDef)]). wait(TableNames) -> - case mnesia:wait_for_tables(TableNames, 30000) of + %% We might be in ctl here for offline ops, in which case we can't + %% get_env() for the rabbit app. + Timeout = case application:get_env(rabbit, mnesia_table_loading_timeout) of + {ok, T} -> T; + undefined -> 30000 + end, + case mnesia:wait_for_tables(TableNames, Timeout) of ok -> ok; {timeout, BadTabs} -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index da6938bd..a186fb7a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -37,7 +37,7 @@ all_tests() -> ok = supervisor2_tests:test_all(), passed = gm_tests:all_tests(), passed = mirrored_supervisor_tests:all_tests(), - application:set_env(rabbit, file_handles_high_watermark, 10, infinity), + application:set_env(rabbit, file_handles_high_watermark, 10), ok = file_handle_cache:set_limit(10), passed = test_version_equivalance(), passed = test_file_handle_cache(), @@ -1870,22 +1870,20 @@ test_backing_queue() -> {ok, rabbit_variable_queue} -> {ok, FileSizeLimit} = application:get_env(rabbit, msg_store_file_size_limit), - application:set_env(rabbit, msg_store_file_size_limit, 512, - infinity), + application:set_env(rabbit, msg_store_file_size_limit, 512), {ok, MaxJournal} = application:get_env(rabbit, queue_index_max_journal_entries), - application:set_env(rabbit, queue_index_max_journal_entries, 128, - infinity), + application:set_env(rabbit, queue_index_max_journal_entries, 128), passed = test_msg_store(), application:set_env(rabbit, msg_store_file_size_limit, - FileSizeLimit, infinity), + FileSizeLimit), passed = test_queue_index(), passed = test_queue_index_props(), passed = test_variable_queue(), passed = test_variable_queue_delete_msg_store_files_callback(), passed = test_queue_recover(), application:set_env(rabbit, queue_index_max_journal_entries, - MaxJournal, infinity), + MaxJournal), %% We will have restarted the message store, and thus changed %% the order of the children of rabbit_sup. This will cause %% problems if there are subsequent failures - see bug 24262. @@ -2210,13 +2208,13 @@ restart_test_queue(Qi) -> empty_test_queue() -> ok = rabbit_variable_queue:stop(), {ok, _} = rabbit_variable_queue:start([]), - {0, Qi} = init_test_queue(), + {0, 0, Qi} = init_test_queue(), _ = rabbit_queue_index:delete_and_terminate(Qi), ok. with_empty_test_queue(Fun) -> ok = empty_test_queue(), - {0, Qi} = init_test_queue(), + {0, 0, Qi} = init_test_queue(), rabbit_queue_index:delete_and_terminate(Fun(Qi)). restart_app() -> @@ -2235,7 +2233,8 @@ queue_index_publish(SeqIds, Persistent, Qi) -> fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) -> MsgId = rabbit_guid:gen(), QiM = rabbit_queue_index:publish( - MsgId, SeqId, #message_properties{}, Persistent, QiN), + MsgId, SeqId, #message_properties{size = 10}, + Persistent, QiN), ok = rabbit_msg_store:write(MsgId, MsgId, MSCState), {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]} end, {Qi, []}, SeqIds), @@ -2257,7 +2256,7 @@ test_queue_index_props() -> with_empty_test_queue( fun(Qi0) -> MsgId = rabbit_guid:gen(), - Props = #message_properties{expiry=12345}, + Props = #message_properties{expiry=12345, size = 10}, Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0), {[{MsgId, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1), @@ -2287,7 +2286,7 @@ test_queue_index() -> ok = verify_read_with_published(false, false, ReadA, lists:reverse(SeqIdsMsgIdsA)), %% should get length back as 0, as all the msgs were transient - {0, Qi6} = restart_test_queue(Qi4), + {0, 0, Qi6} = restart_test_queue(Qi4), {0, 0, Qi7} = rabbit_queue_index:bounds(Qi6), {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7), {0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8), @@ -2296,7 +2295,8 @@ test_queue_index() -> lists:reverse(SeqIdsMsgIdsB)), %% should get length back as MostOfASegment LenB = length(SeqIdsB), - {LenB, Qi12} = restart_test_queue(Qi10), + BytesB = LenB * 10, + {LenB, BytesB, Qi12} = restart_test_queue(Qi10), {0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12), Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13), {ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14), @@ -2308,7 +2308,7 @@ test_queue_index() -> {0, 0, Qi18} = rabbit_queue_index:bounds(Qi17), %% should get length back as 0 because all persistent %% msgs have been acked - {0, Qi19} = restart_test_queue(Qi18), + {0, 0, Qi19} = restart_test_queue(Qi18), Qi19 end), @@ -2380,11 +2380,11 @@ test_queue_index() -> true, Qi0), Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1), Qi3 = rabbit_queue_index:ack([0], Qi2), - {5, Qi4} = restart_test_queue(Qi3), + {5, 50, Qi4} = restart_test_queue(Qi3), {Qi5, _SeqIdsMsgIdsF} = queue_index_publish([3,6,8], true, Qi4), Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5), Qi7 = rabbit_queue_index:ack([1,2,3], Qi6), - {5, Qi8} = restart_test_queue(Qi7), + {5, 50, Qi8} = restart_test_queue(Qi7), Qi8 end), @@ -2419,7 +2419,8 @@ variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> false -> 1 end}, PayloadFun(N)), - PropFun(N, #message_properties{}), false, self(), VQN) + PropFun(N, #message_properties{size = 10}), + false, self(), VQN) end, VQ, lists:seq(Start, Start + Count - 1))). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -2454,7 +2455,7 @@ with_fresh_variable_queue(Fun) -> spawn_link(fun() -> ok = empty_test_queue(), VQ = variable_queue_init(test_amqqueue(true), false), - S0 = rabbit_variable_queue:status(VQ), + S0 = variable_queue_status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -2579,7 +2580,7 @@ variable_queue_with_holes(VQ0) -> {delta, _, 0, _} -> true; 0 -> true; _ -> false - end || {K, V} <- rabbit_variable_queue:status(VQ8), + end || {K, V} <- variable_queue_status(VQ8), lists:member(K, [q1, delta, q3])], Depth = Count + Interval, Depth = rabbit_variable_queue:depth(VQ8), @@ -2649,17 +2650,20 @@ test_variable_queue_ack_limiting(VQ0) -> %% fetch half the messages {VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3), - VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2}, - {ram_ack_count, Len div 2}, - {ram_msg_count, Len div 2}]), + VQ5 = check_variable_queue_status( + VQ4, [{len, Len div 2}, + {messages_unacknowledged_ram, Len div 2}, + {messages_ready_ram, Len div 2}, + {messages_ram, Len}]), %% ensure all acks go to disk on 0 duration target VQ6 = check_variable_queue_status( variable_queue_set_ram_duration_target(0, VQ5), - [{len, Len div 2}, - {target_ram_count, 0}, - {ram_msg_count, 0}, - {ram_ack_count, 0}]), + [{len, Len div 2}, + {target_ram_count, 0}, + {messages_unacknowledged_ram, 0}, + {messages_ready_ram, 0}, + {messages_ram, 0}]), VQ6. @@ -2760,7 +2764,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> fun (Duration1, VQ4) -> {_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4), io:format("~p:~n~p~n", - [Duration1, rabbit_variable_queue:status(VQ5)]), + [Duration1, variable_queue_status(VQ5)]), VQ6 = variable_queue_set_ram_duration_target( Duration1, VQ5), publish_fetch_and_ack(Churn, Len, VQ6) @@ -2820,11 +2824,16 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> check_variable_queue_status(VQ0, Props) -> VQ1 = variable_queue_wait_for_shuffling_end(VQ0), - S = rabbit_variable_queue:status(VQ1), + S = variable_queue_status(VQ1), io:format("~p~n", [S]), assert_props(S, Props), VQ1. +variable_queue_status(VQ) -> + Keys = rabbit_backing_queue:info_keys() -- [backing_queue_status], + [{K, rabbit_variable_queue:info(K, VQ)} || K <- Keys] ++ + rabbit_variable_queue:info(backing_queue_status, VQ). + variable_queue_wait_for_shuffling_end(VQ) -> case credit_flow:blocked() of false -> VQ; diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index aafd81df..dbc2856d 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -16,7 +16,7 @@ -module(rabbit_trace). --export([init/1, enabled/1, tap_in/2, tap_out/2, start/1, stop/1]). +-export([init/1, enabled/1, tap_in/5, tap_out/5, start/1, stop/1]). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -32,8 +32,12 @@ -spec(init/1 :: (rabbit_types:vhost()) -> state()). -spec(enabled/1 :: (rabbit_types:vhost()) -> boolean()). --spec(tap_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok'). --spec(tap_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok'). +-spec(tap_in/5 :: (rabbit_types:basic_message(), binary(), + rabbit_channel:channel_number(), + rabbit_types:username(), state()) -> 'ok'). +-spec(tap_out/5 :: (rabbit_amqqueue:qmsg(), binary(), + rabbit_channel:channel_number(), + rabbit_types:username(), state()) -> 'ok'). -spec(start/1 :: (rabbit_types:vhost()) -> 'ok'). -spec(stop/1 :: (rabbit_types:vhost()) -> 'ok'). @@ -54,15 +58,27 @@ enabled(VHost) -> {ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS), lists:member(VHost, VHosts). -tap_in(_Msg, none) -> ok; -tap_in(Msg = #basic_message{exchange_name = #resource{name = XName}}, TraceX) -> - trace(TraceX, Msg, <<"publish">>, XName, []). - -tap_out(_Msg, none) -> ok; -tap_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}, TraceX) -> +tap_in(_Msg, _ConnName, _ChannelNum, _Username, none) -> ok; +tap_in(Msg = #basic_message{exchange_name = #resource{name = XName, + virtual_host = VHost}}, + ConnName, ChannelNum, Username, TraceX) -> + trace(TraceX, Msg, <<"publish">>, XName, + [{<<"vhost">>, longstr, VHost}, + {<<"connection">>, longstr, ConnName}, + {<<"channel">>, signedint, ChannelNum}, + {<<"user">>, longstr, Username}]). + +tap_out(_Msg, _ConnName, _ChannelNum, _Username, none) -> ok; +tap_out({#resource{name = QName, virtual_host = VHost}, + _QPid, _QMsgId, Redelivered, Msg}, + ConnName, ChannelNum, Username, TraceX) -> RedeliveredNum = case Redelivered of true -> 1; false -> 0 end, trace(TraceX, Msg, <<"deliver">>, QName, - [{<<"redelivered">>, signedint, RedeliveredNum}]). + [{<<"redelivered">>, signedint, RedeliveredNum}, + {<<"vhost">>, longstr, VHost}, + {<<"connection">>, longstr, ConnName}, + {<<"channel">>, signedint, ChannelNum}, + {<<"user">>, longstr, Username}]). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 8ab35a89..72bf7855 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -190,7 +190,7 @@ die(Msg, Args) -> %% We don't throw or exit here since that gets thrown %% straight out into do_boot, generating an erl_crash.dump %% and displaying any error message in a confusing way. - error_logger:error_msg(Msg, Args), + rabbit_log:error(Msg, Args), Str = rabbit_misc:format( "~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args), io:format(Str), @@ -281,6 +281,4 @@ node_type_legacy() -> false -> ram end. -%% NB: we cannot use rabbit_log here since it may not have been -%% started yet -info(Msg, Args) -> error_logger:info_msg(Msg, Args). +info(Msg, Args) -> rabbit_log:info(Msg, Args). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index b6d37852..1104f373 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -48,6 +48,7 @@ -rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}). -rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}). -rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}). +-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}). %% ------------------------------------------------------------------- @@ -77,6 +78,8 @@ -spec(policy_apply_to/0 :: () -> 'ok'). -spec(queue_decorators/0 :: () -> 'ok'). -spec(internal_system_x/0 :: () -> 'ok'). +-spec(cluster_name/0 :: () -> 'ok'). +-spec(down_slave_nodes/0 :: () -> 'ok'). -endif. @@ -382,6 +385,21 @@ cluster_name_tx() -> [mnesia:delete(T, K, write) || K <- Ks], ok. +down_slave_nodes() -> + ok = down_slave_nodes(rabbit_queue), + ok = down_slave_nodes(rabbit_durable_queue). + +down_slave_nodes(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, Policy, GmPids, Decorators}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, [], Policy, GmPids, Decorators} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ede69748..e97ed491 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -22,7 +22,7 @@ ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, - status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]). + info/2, invoke/3, is_duplicate/2, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -254,13 +254,17 @@ durable, transient_threshold, - len, - persistent_count, + len, %% w/o unacked + bytes, %% w/o unacked + unacked_bytes, + persistent_count, %% w unacked + persistent_bytes, %% w unacked target_ram_count, - ram_msg_count, + ram_msg_count, %% w/o unacked ram_msg_count_prev, ram_ack_count_prev, + ram_bytes, %% w unacked out_counter, in_counter, rates, @@ -343,11 +347,17 @@ transient_threshold :: non_neg_integer(), len :: non_neg_integer(), + bytes :: non_neg_integer(), + unacked_bytes :: non_neg_integer(), + persistent_count :: non_neg_integer(), + persistent_bytes :: non_neg_integer(), target_ram_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), + ram_ack_count_prev :: non_neg_integer(), + ram_bytes :: non_neg_integer(), out_counter :: non_neg_integer(), in_counter :: non_neg_integer(), rates :: rates(), @@ -425,7 +435,7 @@ init(Queue, Recover, AsyncCallback) -> init(#amqqueue { name = QueueName, durable = IsDurable }, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), - init(IsDurable, IndexState, 0, [], + init(IsDurable, IndexState, 0, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, MsgOnDiskFun, AsyncCallback); @@ -440,7 +450,7 @@ init(#amqqueue { name = QueueName, durable = true }, Terms, MsgOnDiskFun, AsyncCallback), TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback), - {DeltaCount, IndexState} = + {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), @@ -448,7 +458,7 @@ init(#amqqueue { name = QueueName, durable = true }, Terms, rabbit_msg_store:contains(MsgId, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, RecoveryTerms, + init(true, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient). process_recovery_terms(Terms=non_clean_shutdown) -> @@ -461,6 +471,7 @@ process_recovery_terms(Terms) -> terminate(_Reason, State) -> State1 = #vqstate { persistent_count = PCount, + persistent_bytes = PBytes, index_state = IndexState, msg_store_clients = {MSCStateP, MSCStateT} } = purge_pending_ack(true, State), @@ -470,7 +481,9 @@ terminate(_Reason, State) -> rabbit_msg_store:client_ref(MSCStateP) end, ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT), - Terms = [{persistent_ref, PRef}, {persistent_count, PCount}], + Terms = [{persistent_ref, PRef}, + {persistent_count, PCount}, + {persistent_bytes, PBytes}], a(State1 #vqstate { index_state = rabbit_queue_index:terminate( Terms, IndexState), msg_store_clients = undefined }). @@ -498,28 +511,35 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, msg_store_clients = MSCState, len = Len, - persistent_count = PCount }) -> + ram_bytes = RamBytes, + persistent_count = PCount, + persistent_bytes = PBytes }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. - {LensByStore, IndexState1} = remove_queue_entries( - fun ?QUEUE:foldl/3, Q4, - orddict:new(), IndexState, MSCState), - {LensByStore1, State1 = #vqstate { q1 = Q1, - index_state = IndexState2, - msg_store_clients = MSCState1 }} = - purge_betas_and_deltas(LensByStore, - State #vqstate { q4 = ?QUEUE:new(), - index_state = IndexState1 }), - {LensByStore2, IndexState3} = remove_queue_entries( - fun ?QUEUE:foldl/3, Q1, - LensByStore1, IndexState2, MSCState1), - PCount1 = PCount - find_persistent_count(LensByStore2), + Stats = {RamBytes, PCount, PBytes}, + {Stats1, IndexState1} = + remove_queue_entries(Q4, Stats, IndexState, MSCState), + + {Stats2, State1 = #vqstate { q1 = Q1, + index_state = IndexState2, + msg_store_clients = MSCState1 }} = + + purge_betas_and_deltas( + Stats1, State #vqstate { q4 = ?QUEUE:new(), + index_state = IndexState1 }), + + {{RamBytes3, PCount3, PBytes3}, IndexState3} = + remove_queue_entries(Q1, Stats2, IndexState2, MSCState1), + {Len, a(State1 #vqstate { q1 = ?QUEUE:new(), index_state = IndexState3, len = 0, + bytes = 0, ram_msg_count = 0, - persistent_count = PCount1 })}. + ram_bytes = RamBytes3, + persistent_count = PCount3, + persistent_bytes = PBytes3 })}. purge_acks(State) -> a(purge_pending_ack(false, State)). @@ -542,11 +562,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, InCount1 = InCount + 1, PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount1, - persistent_count = PCount1, - unconfirmed = UC1 }), + State3 = upd_bytes( + 1, 0, MsgStatus1, + inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount1, + persistent_count = PCount1, + unconfirmed = UC1 })), a(reduce_memory_use(maybe_update_rates(State3))). publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, @@ -565,11 +587,12 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = State2 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - unconfirmed = UC1 }, + State3 = upd_bytes(0, 1, MsgStatus, + State2 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + unconfirmed = UC1 }), {SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}. discard(_MsgId, _ChPid, State) -> State. @@ -638,9 +661,8 @@ ack([SeqId], State) -> index_on_disk = IndexOnDisk }, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, - persistent_count = PCount, ack_out_counter = AckOutCount }} = - remove_pending_ack(SeqId, State), + remove_pending_ack(true, SeqId, State), IndexState1 = case IndexOnDisk of true -> rabbit_queue_index:ack([SeqId], IndexState); false -> IndexState @@ -649,30 +671,24 @@ ack([SeqId], State) -> true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); false -> ok end, - PCount1 = PCount - one_if(IsPersistent), {[MsgId], a(State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, ack_out_counter = AckOutCount + 1 })}; ack(AckTags, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, - persistent_count = PCount, ack_out_counter = AckOutCount }} = lists:foldl( fun (SeqId, {Acc, State2}) -> - {MsgStatus, State3} = remove_pending_ack(SeqId, State2), + {MsgStatus, State3} = remove_pending_ack(true, SeqId, State2), {accumulate_ack(MsgStatus, Acc), State3} end, {accumulate_ack_init(), State}, AckTags), IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], - PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( - orddict:new(), MsgIdsByStore)), {lists:reverse(AllMsgIds), a(State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) })}. requeue(AckTags, #vqstate { delta = Delta, @@ -821,15 +837,30 @@ msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, out = AvgEgressRate } }) -> {AvgIngressRate, AvgEgressRate}. -status(#vqstate { +info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) -> + RamMsgCount; +info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA}) -> + gb_trees:size(RPA); +info(messages_ram, State) -> + info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State); +info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> + PersistentCount; +info(message_bytes, #vqstate{bytes = Bytes, + unacked_bytes = UBytes}) -> + Bytes + UBytes; +info(message_bytes_ready, #vqstate{bytes = Bytes}) -> + Bytes; +info(message_bytes_unacknowledged, #vqstate{unacked_bytes = UBytes}) -> + UBytes; +info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> + RamBytes; +info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> + PersistentBytes; +info(backing_queue_status, #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, - ram_pending_ack = RPA, - disk_pending_ack = DPA, target_ram_count = TargetRamCount, - ram_msg_count = RamMsgCount, next_seq_id = NextSeqId, - persistent_count = PersistentCount, rates = #rates { in = AvgIngressRate, out = AvgEgressRate, ack_in = AvgAckIngressRate, @@ -841,16 +872,14 @@ status(#vqstate { {q3 , ?QUEUE:len(Q3)}, {q4 , ?QUEUE:len(Q4)}, {len , Len}, - {pending_acks , gb_trees:size(RPA) + gb_trees:size(DPA)}, {target_ram_count , TargetRamCount}, - {ram_msg_count , RamMsgCount}, - {ram_ack_count , gb_trees:size(RPA)}, {next_seq_id , NextSeqId}, - {persistent_count , PersistentCount}, {avg_ingress_rate , AvgIngressRate}, {avg_egress_rate , AvgEgressRate}, {avg_ack_ingress_rate, AvgAckIngressRate}, - {avg_ack_egress_rate , AvgAckEgressRate} ]. + {avg_ack_egress_rate , AvgAckEgressRate} ]; +info(Item, _) -> + throw({bad_argument, Item}). invoke(?MODULE, Fun, State) -> Fun(?MODULE, State); invoke( _, _, State) -> State. @@ -863,8 +892,12 @@ is_duplicate(_Msg, State) -> {false, State}. a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, len = Len, + bytes = Bytes, + unacked_bytes = UnackedBytes, persistent_count = PersistentCount, - ram_msg_count = RamMsgCount }) -> + persistent_bytes = PersistentBytes, + ram_msg_count = RamMsgCount, + ram_bytes = RamBytes}) -> E1 = ?QUEUE:is_empty(Q1), E2 = ?QUEUE:is_empty(Q2), ED = Delta#delta.count == 0, @@ -878,9 +911,14 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, true = LZ == (E3 and E4), true = Len >= 0, + true = Bytes >= 0, + true = UnackedBytes >= 0, true = PersistentCount >= 0, + true = PersistentBytes >= 0, true = RamMsgCount >= 0, true = RamMsgCount =< Len, + true = RamBytes >= 0, + true = RamBytes =< Bytes + UnackedBytes, State. @@ -1028,15 +1066,17 @@ expand_delta(_SeqId, #delta { count = Count } = Delta) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -init(IsDurable, IndexState, DeltaCount, Terms, +init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), - DeltaCount1 = + {DeltaCount1, DeltaBytes1} = case Terms of - non_clean_shutdown -> DeltaCount; - _ -> proplists:get_value(persistent_count, - Terms, DeltaCount) + non_clean_shutdown -> {DeltaCount, DeltaBytes}; + _ -> {proplists:get_value(persistent_count, + Terms, DeltaCount), + proplists:get_value(persistent_bytes, + Terms, DeltaBytes)} end, Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of true -> ?BLANK_DELTA; @@ -1061,11 +1101,15 @@ init(IsDurable, IndexState, DeltaCount, Terms, len = DeltaCount1, persistent_count = DeltaCount1, + bytes = DeltaBytes1, + persistent_bytes = DeltaBytes1, target_ram_count = infinity, ram_msg_count = 0, ram_msg_count_prev = 0, ram_ack_count_prev = 0, + ram_bytes = 0, + unacked_bytes = 0, out_counter = 0, in_counter = 0, rates = blank_rates(Now), @@ -1090,9 +1134,11 @@ in_r(MsgStatus = #msg_status { msg = undefined }, true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {Msg, State1 = #vqstate { q4 = Q4a }} = read_msg(MsgStatus, State), - inc_ram_msg_count( - State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { - msg = Msg }, Q4a) }) + upd_ram_bytes( + 1, MsgStatus, + inc_ram_msg_count( + State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { + msg = Msg }, Q4a) })) end; in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. @@ -1123,6 +1169,28 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) -> inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) -> State#vqstate{ram_msg_count = RamMsgCount + 1}. +upd_bytes(SignReady, SignUnacked, + MsgStatus = #msg_status{msg = undefined}, State) -> + upd_bytes0(SignReady, SignUnacked, MsgStatus, State); +upd_bytes(SignReady, SignUnacked, MsgStatus = #msg_status{msg = _}, State) -> + upd_ram_bytes(SignReady + SignUnacked, MsgStatus, + upd_bytes0(SignReady, SignUnacked, MsgStatus, State)). + +upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP}, + State = #vqstate{bytes = Bytes, + unacked_bytes = UBytes, + persistent_bytes = PBytes}) -> + S = msg_size(MsgStatus), + SignTotal = SignReady + SignUnacked, + State#vqstate{bytes = Bytes + SignReady * S, + unacked_bytes = UBytes + SignUnacked * S, + persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}. + +upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) -> + State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}. + +msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. + remove(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, msg_id = MsgId, @@ -1164,58 +1232,64 @@ remove(AckRequired, MsgStatus = #msg_status { PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - + State2 = case AckRequired of + false -> upd_bytes(-1, 0, MsgStatus, State1); + true -> upd_bytes(-1, 1, MsgStatus, State1) + end, {AckTag, maybe_update_rates( - State1 #vqstate {ram_msg_count = RamMsgCount1, + State2 #vqstate {ram_msg_count = RamMsgCount1, out_counter = OutCount + 1, index_state = IndexState2, len = Len - 1, persistent_count = PCount1})}. -purge_betas_and_deltas(LensByStore, +purge_betas_and_deltas(Stats, State = #vqstate { q3 = Q3, index_state = IndexState, msg_store_clients = MSCState }) -> case ?QUEUE:is_empty(Q3) of - true -> {LensByStore, State}; - false -> {LensByStore1, IndexState1} = - remove_queue_entries(fun ?QUEUE:foldl/3, Q3, - LensByStore, IndexState, MSCState), - purge_betas_and_deltas(LensByStore1, + true -> {Stats, State}; + false -> {Stats1, IndexState1} = remove_queue_entries( + Q3, Stats, IndexState, MSCState), + purge_betas_and_deltas(Stats1, maybe_deltas_to_betas( State #vqstate { q3 = ?QUEUE:new(), index_state = IndexState1 })) end. -remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) -> - {MsgIdsByStore, Delivers, Acks} = - Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q), +remove_queue_entries(Q, {RamBytes, PCount, PBytes}, + IndexState, MSCState) -> + {MsgIdsByStore, RamBytes1, PBytes1, Delivers, Acks} = + ?QUEUE:foldl(fun remove_queue_entries1/2, + {orddict:new(), RamBytes, PBytes, [], []}, Q), ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> msg_store_remove(MSCState, IsPersistent, MsgIds) end, ok, MsgIdsByStore), - {sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore), + {{RamBytes1, + PCount - case orddict:find(true, MsgIdsByStore) of + error -> 0; + {ok, Ids} -> length(Ids) + end, + PBytes1}, rabbit_queue_index:ack(Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. remove_queue_entries1( - #msg_status { msg_id = MsgId, seq_id = SeqId, + #msg_status { msg_id = MsgId, seq_id = SeqId, msg = Msg, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, - index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, - {MsgIdsByStore, Delivers, Acks}) -> + index_on_disk = IndexOnDisk, is_persistent = IsPersistent, + msg_props = #message_properties { size = Size } }, + {MsgIdsByStore, RamBytes, PBytes, Delivers, Acks}) -> {case MsgOnDisk of true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, + RamBytes - Size * one_if(Msg =/= undefined), + PBytes - Size * one_if(IsPersistent), cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), cons_if(IndexOnDisk, SeqId, Acks)}. -sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) -> - orddict:fold( - fun (IsPersistent, MsgIds, LensByStore1) -> - orddict:update_counter(IsPersistent, length(MsgIds), LensByStore1) - end, LensByStore, MsgIdsByStore). - %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- @@ -1287,8 +1361,15 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, none -> gb_trees:get(SeqId, DPA) end. -remove_pending_ack(SeqId, State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> +%% First parameter = UpdatePersistentCount +remove_pending_ack(true, SeqId, State) -> + {MsgStatus, State1 = #vqstate { persistent_count = PCount }} = + remove_pending_ack(false, SeqId, State), + PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent), + {MsgStatus, upd_bytes(0, -1, MsgStatus, + State1 # vqstate{ persistent_count = PCount1 })}; +remove_pending_ack(false, SeqId, State = #vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA }) -> case gb_trees:lookup(SeqId, RPA) of {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA), {V, State #vqstate { ram_pending_ack = RPA1 }}; @@ -1338,12 +1419,6 @@ accumulate_ack(#msg_status { seq_id = SeqId, end, [MsgId | AllMsgIds]}. -find_persistent_count(LensByStore) -> - case orddict:find(true, LensByStore) of - error -> 0; - {ok, Len} -> Len - end. - %%---------------------------------------------------------------------------- %% Internal plumbing for confirms (aka publisher acks) %%---------------------------------------------------------------------------- @@ -1391,9 +1466,15 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> {Msg, State1} = read_msg(MsgStatus, State), - {MsgStatus#msg_status { msg = Msg }, inc_ram_msg_count(State1)}; + {MsgStatus#msg_status { msg = Msg }, + upd_ram_bytes(1, MsgStatus, inc_ram_msg_count(State1))}; %% [1] publish_alpha(MsgStatus, State) -> {MsgStatus, inc_ram_msg_count(State)}. +%% [1] We increase the ram_bytes here because we paged the message in +%% to requeue it, not purely because we requeued it. Hence in the +%% second head it's already accounted for as already in memory. OTOH +%% ram_msg_count does not include unacked messages, so it needs +%% incrementing in both heads. publish_beta(MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), @@ -1419,7 +1500,7 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = PubFun(MsgStatus, State1), queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], - Limit, PubFun, State2) + Limit, PubFun, upd_bytes(1, -1, MsgStatus, State2)) end; queue_merge(SeqIds, Q, Front, MsgIds, _Limit, _PubFun, State) -> @@ -1433,13 +1514,14 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = maybe_write_to_disk(true, true, MsgStatus, State1), - {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2} + {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], + upd_bytes(1, -1, MsgStatus, State2)} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 msg_from_pending_ack(SeqId, State) -> {#msg_status { msg_props = MsgProps } = MsgStatus, State1} = - remove_pending_ack(SeqId, State), + remove_pending_ack(false, SeqId, State), {MsgStatus #msg_status { msg_props = MsgProps #message_properties { needs_confirming = false } }, State1}. @@ -1591,8 +1673,10 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, maybe_write_to_disk(true, false, MsgStatus, State), DPA1 = gb_trees:insert(SeqId, m(trim_msg_status(MsgStatus1)), DPA), limit_ram_acks(Quota - 1, - State1 #vqstate { ram_pending_ack = RPA1, - disk_pending_ack = DPA1 }) + upd_ram_bytes( + -1, MsgStatus1, + State1 #vqstate { ram_pending_ack = RPA1, + disk_pending_ack = DPA1 })) end. permitted_beta_count(#vqstate { len = 0 }) -> @@ -1728,9 +1812,12 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> State1 = #vqstate { ram_msg_count = RamMsgCount }} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = Consumer(MsgStatus2, Qa, - State1 #vqstate { - ram_msg_count = RamMsgCount - 1 }), + State2 = Consumer( + MsgStatus2, Qa, + upd_ram_bytes( + -1, MsgStatus2, + State1 #vqstate { + ram_msg_count = RamMsgCount - 1})), push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, State2) end diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index d943b599..3a041508 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -114,8 +114,8 @@ upgrades_required(Scope) -> with_upgrade_graph(Fun, Scope) -> case rabbit_misc:build_acyclic_graph( - fun (Module, Steps) -> vertices(Module, Steps, Scope) end, - fun (Module, Steps) -> edges(Module, Steps, Scope) end, + fun ({_App, Module, Steps}) -> vertices(Module, Steps, Scope) end, + fun ({_App, Module, Steps}) -> edges(Module, Steps, Scope) end, rabbit_misc:all_module_attributes(rabbit_upgrade)) of {ok, G} -> try Fun(G) @@ -161,7 +161,7 @@ heads(G) -> categorise_by_scope(Version) when is_list(Version) -> Categorised = - [{Scope, Name} || {_Module, Attributes} <- + [{Scope, Name} || {_App, _Module, Attributes} <- rabbit_misc:all_module_attributes(rabbit_upgrade), {Name, Scope, _Requires} <- Attributes, lists:member(Name, Version)], |