summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile12
-rw-r--r--docs/rabbitmq-plugins.1.xml87
-rw-r--r--docs/rabbitmq.config.example30
-rw-r--r--docs/rabbitmqctl.1.xml72
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--include/rabbit.hrl27
-rw-r--r--packaging/common/README2
-rwxr-xr-xquickcheck12
-rwxr-xr-xscripts/rabbitmq-env44
-rwxr-xr-xscripts/rabbitmq-plugins8
-rwxr-xr-xscripts/rabbitmq-plugins.bat6
-rwxr-xr-xscripts/rabbitmq-server43
-rwxr-xr-xscripts/rabbitmq-server.bat1
-rwxr-xr-xscripts/rabbitmq-service.bat1
-rwxr-xr-xscripts/rabbitmqctl15
-rwxr-xr-xscripts/rabbitmqctl.bat27
-rw-r--r--src/app_utils.erl19
-rw-r--r--src/gm.erl30
-rw-r--r--src/gm_qc.erl384
-rw-r--r--src/rabbit.erl159
-rw-r--r--src/rabbit_amqqueue.erl91
-rw-r--r--src/rabbit_amqqueue_process.erl28
-rw-r--r--src/rabbit_backing_queue.erl18
-rw-r--r--src/rabbit_channel.erl43
-rw-r--r--src/rabbit_channel_interceptor.erl25
-rw-r--r--src/rabbit_control_main.erl61
-rw-r--r--src/rabbit_dead_letter.erl3
-rw-r--r--src/rabbit_event.erl21
-rw-r--r--src/rabbit_exchange.erl89
-rw-r--r--src/rabbit_exchange_decorator.erl24
-rw-r--r--src/rabbit_mirror_queue_master.erl31
-rw-r--r--src/rabbit_mirror_queue_misc.erl81
-rw-r--r--src/rabbit_mirror_queue_slave.erl5
-rw-r--r--src/rabbit_misc.erl70
-rw-r--r--src/rabbit_mnesia.erl3
-rw-r--r--src/rabbit_networking.erl11
-rw-r--r--src/rabbit_node_monitor.erl1
-rw-r--r--src/rabbit_plugins.erl132
-rw-r--r--src/rabbit_plugins_main.erl257
-rw-r--r--src/rabbit_policies.erl4
-rw-r--r--src/rabbit_policy.erl24
-rw-r--r--src/rabbit_queue_decorator.erl23
-rw-r--r--src/rabbit_reader.erl64
-rw-r--r--src/rabbit_table.erl8
-rw-r--r--src/rabbit_tests.erl30
-rw-r--r--src/rabbit_trace.erl36
-rw-r--r--src/rabbit_upgrade_functions.erl18
-rw-r--r--src/rabbit_variable_queue.erl24
-rw-r--r--src/rabbit_version.erl6
49 files changed, 1681 insertions, 532 deletions
diff --git a/Makefile b/Makefile
index c54b44e5..6dbb650e 100644
--- a/Makefile
+++ b/Makefile
@@ -20,8 +20,6 @@ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml))
WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml $(DOCS_DIR)/rabbitmq-echopid.xml)
USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-plugins.1.xml
USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML)))
-QC_MODULES := rabbit_backing_queue_qc
-QC_TRIALS ?= 100
ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes)
PYTHON=python
@@ -56,6 +54,12 @@ endif
#other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests
ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc)
+ifdef INSTRUMENT_FOR_QC
+ERLC_OPTS += -DINSTR_MOD=gm_qc
+else
+ERLC_OPTS += -DINSTR_MOD=gm
+endif
+
include version.mk
PLUGINS_SRC_DIR?=$(shell [ -d "plugins-src" ] && echo "plugins-src" || echo )
@@ -217,7 +221,8 @@ run-tests: all
echo $$OUT ; echo $$OUT | grep '^{ok, passed}$$' > /dev/null
run-qc: all
- $(foreach MOD,$(QC_MODULES),./quickcheck $(RABBITMQ_NODENAME) $(MOD) $(QC_TRIALS))
+ ./quickcheck $(RABBITMQ_NODENAME) rabbit_backing_queue_qc 100 40
+ ./quickcheck $(RABBITMQ_NODENAME) gm_qc 1000 200
start-background-node: all
-rm -f $(RABBITMQ_MNESIA_DIR).pid
@@ -381,3 +386,4 @@ include $(DEPS_FILE)
endif
.PHONY: run-qc
+
diff --git a/docs/rabbitmq-plugins.1.xml b/docs/rabbitmq-plugins.1.xml
index 8ecb4fc8..f7be2d29 100644
--- a/docs/rabbitmq-plugins.1.xml
+++ b/docs/rabbitmq-plugins.1.xml
@@ -40,6 +40,7 @@
<refsynopsisdiv>
<cmdsynopsis>
<command>rabbitmq-plugins</command>
+ <arg choice="opt">-n <replaceable>node</replaceable></arg>
<arg choice="req"><replaceable>command</replaceable></arg>
<arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg>
</cmdsynopsis>
@@ -62,6 +63,16 @@
enabled. Implicitly enabled plugins are automatically disabled again
when they are no longer required.
</para>
+
+ <para>
+ The <command>enable</command>, <command>disable</command> and
+ <command>set</command> commands will update the plugins file and
+ then attempt to connect to the broker and ensure it is running
+ all enabled plugins. By default if it is not possible to connect
+ to the running broker (for example if it is stopped) then a
+ warning is displayed. Specify <command>--online</command> or
+ <command>--offline</command> to change this behaviour.
+ </para>
</refsect1>
<refsect1>
@@ -97,12 +108,14 @@
</variablelist>
<para>
Lists all plugins, their versions, dependencies and
- descriptions. Each plugin is prefixed with a status
- indicator - [ ] to indicate that the plugin is not
- enabled, [E] to indicate that it is explicitly enabled,
- [e] to indicate that it is implicitly enabled, and [!] to
- indicate that it is enabled but missing and thus not
- operational.
+ descriptions. Each plugin is prefixed with two status
+ indicator characters inside [ ]. The first indicator can
+ be " " to indicate that the plugin is not enabled, "E" to
+ indicate that it is explicitly enabled, "e" to indicate
+ that it is implicitly enabled, or "!" to indicate that it
+ is enabled but missing and thus not operational. The
+ second indicator can be " " to show that the plugin is not
+ running, or "*" to show that it is.
</para>
<para>
If the optional pattern is given, only plugins whose
@@ -130,17 +143,24 @@
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>enable</command> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>enable</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
+ <term>--offline</term>
+ <listitem><para>Just modify the enabled plugins file.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>--online</term>
+ <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>plugin</term>
<listitem><para>One or more plugins to enable.</para></listitem>
</varlistentry>
</variablelist>
<para>
- Enables the specified plugins and all their
- dependencies.
+ Enables the specified plugins and all their dependencies.
</para>
<para role="example-prefix">For example:</para>
@@ -154,17 +174,24 @@
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>disable</command> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>disable</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
+ <term>--offline</term>
+ <listitem><para>Just modify the enabled plugins file.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>--online</term>
+ <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>plugin</term>
<listitem><para>One or more plugins to disable.</para></listitem>
</varlistentry>
</variablelist>
<para>
- Disables the specified plugins and all plugins that
- depend on them.
+ Disables the specified plugins and all their dependencies.
</para>
<para role="example-prefix">For example:</para>
@@ -175,6 +202,42 @@
</para>
</listitem>
</varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>set</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>--offline</term>
+ <listitem><para>Just modify the enabled plugins file.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>--online</term>
+ <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>plugin</term>
+ <listitem><para>Zero or more plugins to enable.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Enables the specified plugins and all their
+ dependencies. Unlike <command>rabbitmq-plugins
+ enable</command> this command ignores and overwrites any
+ existing enabled plugins. <command>rabbitmq-plugins
+ set</command> with no plugin arguments is a legal command
+ meaning "disable all plugins".
+ </para>
+
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmq-plugins set rabbitmq_management</screen>
+ <para role="example">
+ This command enables the <command>management</command>
+ plugin and its dependencies and disables everything else.
+ </para>
+ </listitem>
+ </varlistentry>
+
</variablelist>
</refsect1>
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index a128bfbc..63540568 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -27,6 +27,11 @@
%%
%% {ssl_listeners, [5671]},
+ %% Maximum time for AMQP 0-8/0-9/0-9-1 handshake (after socket connection
+ %% and SSL handshake), in milliseconds.
+ %%
+ %% {handshake_timeout, 10000},
+
%% Log levels (currently just used for connection logging).
%% One of 'info', 'warning', 'error' or 'none', in decreasing order
%% of verbosity. Defaults to 'info'.
@@ -111,6 +116,10 @@
%%
%% {ssl_cert_login_from, common_name},
+ %% SSL handshake timeout, in milliseconds.
+ %%
+ %% {ssl_handshake_timeout, 5000},
+
%%
%% Default User / VHost
%% ====================
@@ -221,7 +230,12 @@
%% Explicitly enable/disable hipe compilation.
%%
- %% {hipe_compile, true}
+ %% {hipe_compile, true},
+
+ %% Timeout used when waiting for Mnesia tables in a cluster to
+ %% become available.
+ %%
+ %% {mnesia_table_loading_timeout, 30000}
]},
@@ -265,9 +279,13 @@
%% {certfile, "/path/to/cert.pem"},
%% {keyfile, "/path/to/key.pem"}]}]},
+ %% One of 'basic', 'detailed' or 'none'. See
+ %% http://www.rabbitmq.com/management.html#fine-stats for more details.
+ %% {rates_mode, basic},
+
%% Configure how long aggregated data (such as message rates and queue
%% lengths) is retained. Please read the plugin's documentation in
- %% https://www.rabbitmq.com/management.html#configuration for more
+ %% http://www.rabbitmq.com/management.html#configuration for more
%% details.
%%
%% {sample_retention_policies,
@@ -276,14 +294,6 @@
%% {detailed, [{10, 5}]}]}
]},
- {rabbitmq_management_agent,
- [%% Misc/Advanced Options
- %%
- %% NB: Change these only if you understand what you are doing!
- %%
- %% {force_fine_statistics, true}
- ]},
-
%% ----------------------------------------------------------------------------
%% RabbitMQ Shovel Plugin
%%
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 01b024a2..908dad03 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -406,11 +406,15 @@
online, except when using the <command>--offline</command> flag.
</para>
<para>
- When using the <command>--offline</command> flag the node you
- connect to will become the canonical source for cluster metadata
- (e.g. which queues exist), even if it was not before. Therefore
- you should use this command on the latest node to shut down if
- at all possible.
+ When using the <command>--offline</command> flag
+ rabbitmqctl will not attempt to connect to a node as
+ normal; instead it will temporarily become the node in
+ order to make the change. This is useful if the node
+ cannot be started normally. In this case the node will
+ become the canonical source for cluster metadata
+ (e.g. which queues exist), even if it was not
+ before. Therefore you should use this command on the
+ latest node to shut down if at all possible.
</para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl -n hare@mcnulty forget_cluster_node rabbit@stringer</screen>
@@ -454,6 +458,44 @@
</listitem>
</varlistentry>
<varlistentry>
+ <term><cmdsynopsis><command>force_boot</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Ensure that the node will start next time, even if it
+ was not the last to shut down.
+ </para>
+ <para>
+ Normally when you shut down a RabbitMQ cluster
+ altogether, the first node you restart should be the
+ last one to go down, since it may have seen things
+ happen that other nodes did not. But sometimes
+ that's not possible: for instance if the entire cluster
+ loses power then all nodes may think they were not the
+ last to shut down.
+ </para>
+ <para>
+ In such a case you can invoke <command>rabbitmqctl
+ force_boot</command> while the node is down. This will
+ tell the node to unconditionally start next time you ask
+ it to. If any changes happened to the cluster after this
+ node shut down, they will be lost.
+ </para>
+ <para>
+ If the last node to go down is permanently lost then you
+ should use <command>rabbitmqctl forget_cluster_node
+ --offline</command> in preference to this command, as it
+ will ensure that mirrored queues which were mastered on
+ the lost node get promoted.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl force_boot</screen>
+ <para role="example">
+ This will force the node not to wait for other nodes
+ next time it is started.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
<term><cmdsynopsis><command>sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis>
</term>
<listitem>
@@ -1148,6 +1190,22 @@
(queue depth).</para></listitem>
</varlistentry>
<varlistentry>
+ <term>messages_ready_ram</term>
+ <listitem><para>Number of messages from messages_ready which are resident in ram.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>messages_unacknowledged_ram</term>
+ <listitem><para>Number of messages from messages_unacknowledged which are resident in ram.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>messages_ram</term>
+ <listitem><para>Total number of messages which are resident in ram.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>messages_persistent</term>
+ <listitem><para>Total number of persistent messages in the queue (will always be 0 for transient queues).</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>consumers</term>
<listitem><para>Number of consumers.</para></listitem>
</varlistentry>
@@ -1475,6 +1533,10 @@
<term>send_pend</term>
<listitem><para>Send queue size.</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>connected_at</term>
+ <listitem><para>Date and time this connection was established, as timestamp.</para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>connectioninfoitem</command>s are
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 7360208a..f26e0f77 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -39,12 +39,15 @@
{server_properties, []},
{collect_statistics, none},
{collect_statistics_interval, 5000},
+ {mnesia_table_loading_timeout, 30000},
{auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
{auth_backends, [rabbit_auth_backend_internal]},
{delegate_count, 16},
{trace_vhosts, []},
{log_levels, [{connection, info}]},
{ssl_cert_login_from, distinguished_name},
+ {ssl_handshake_timeout, 5000},
+ {handshake_timeout, 10000},
{reverse_dns_lookups, false},
{cluster_partition_handling, ignore},
{tcp_listen_options, [binary,
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 5ac3197e..7a40f9eb 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -39,13 +39,25 @@
-record(resource, {virtual_host, kind, name}).
--record(exchange, {name, type, durable, auto_delete, internal, arguments,
- scratches, policy, decorators}).
--record(exchange_serial, {name, next}).
+%% fields described as 'transient' here are cleared when writing to
+%% rabbit_durable_<thing>
+-record(exchange, {
+ name, type, durable, auto_delete, internal, arguments, %% immutable
+ scratches, %% durable, explicitly updated via update_scratch/3
+ policy, %% durable, implicitly updated when policy changes
+ decorators}). %% transient, recalculated in store/1 (i.e. recovery)
+
+-record(amqqueue, {
+ name, durable, auto_delete, exclusive_owner = none, %% immutable
+ arguments, %% immutable
+ pid, %% durable (just so we know home node)
+ slave_pids, sync_slave_pids, %% transient
+ down_slave_nodes, %% durable
+ policy, %% durable, implicit update as above
+ gm_pids, %% transient
+ decorators}). %% transient, recalculated as above
--record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid, slave_pids, sync_slave_pids, policy,
- gm_pids, decorators}).
+-record(exchange_serial, {name, next}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
@@ -105,9 +117,6 @@
-define(DESIRED_HIBERNATE, 10000).
-define(CREDIT_DISC_BOUND, {2000, 500}).
-%% This is dictated by `erlang:send_after' on which we depend to implement TTL.
--define(MAX_EXPIRY_TIMER, 4294967295).
-
-define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
diff --git a/packaging/common/README b/packaging/common/README
index 0a29ee27..35a1523a 100644
--- a/packaging/common/README
+++ b/packaging/common/README
@@ -17,4 +17,4 @@ run as the superuser.
An example configuration file is provided in the same directory as
this README. Copy it to /etc/rabbitmq/rabbitmq.config to use it. The
RabbitMQ server must be restarted after changing the configuration
-file or enabling or disabling plugins.
+file.
diff --git a/quickcheck b/quickcheck
index b5382d75..59da3719 100755
--- a/quickcheck
+++ b/quickcheck
@@ -7,17 +7,21 @@
%% NodeStr is a local broker node name
%% ModStr is the module containing quickcheck properties
%% TrialsStr is the number of trials
-main([NodeStr, ModStr, TrialsStr]) ->
+main([NodeStr, ModStr, NumTestsStr, MaxSizeStr]) ->
{ok, Hostname} = inet:gethostname(),
Node = list_to_atom(NodeStr ++ "@" ++ Hostname),
Mod = list_to_atom(ModStr),
- Trials = erlang:list_to_integer(TrialsStr),
+ NumTests = erlang:list_to_integer(NumTestsStr),
+ MaxSize = erlang:list_to_integer(MaxSizeStr),
case rpc:call(Node, code, ensure_loaded, [proper]) of
{module, proper} ->
case rpc:call(Node, proper, module,
- [Mod] ++ [[{numtests, Trials}, {constraint_tries, 200}]]) of
+ [Mod] ++ [[{numtests, NumTests},
+ {max_size, MaxSize},
+ {constraint_tries, 200}]]) of
[] -> ok;
- _ -> quit(1)
+ R -> io:format("~p.~n", [R]),
+ quit(1)
end;
{badrpc, Reason} ->
io:format("Could not contact node ~p: ~p.~n", [Node, Reason]),
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 69d5a9c9..74298440 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -58,3 +58,47 @@ fi
## Get configuration variables from the configure environment file
[ -f ${CONF_ENV_FILE} ] && . ${CONF_ENV_FILE} || true
+
+##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
+
+DEFAULT_NODE_IP_ADDRESS=auto
+DEFAULT_NODE_PORT=5672
+[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
+[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
+
+[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS}
+[ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT}
+
+[ "x" = "x$RABBITMQ_DIST_PORT" ] && RABBITMQ_DIST_PORT=${DIST_PORT}
+[ "x" = "x$RABBITMQ_DIST_PORT" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_DIST_PORT=$((${DEFAULT_NODE_PORT} + 20000))
+[ "x" = "x$RABBITMQ_DIST_PORT" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_DIST_PORT=$((${RABBITMQ_NODE_PORT} + 20000))
+
+[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
+[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS}
+[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE}
+[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE}
+[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE}
+[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS}
+[ "x" = "x$RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS" ] && RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=${SERVER_ADDITIONAL_ERL_ARGS}
+[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR}
+[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}
+
+[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${PID_FILE}
+[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${RABBITMQ_MNESIA_DIR}.pid
+
+[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR}
+[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand
+
+[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
+
+[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR}
+
+## Log rotation
+[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS}
+[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}.log"
+[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS=${SASL_LOGS}
+[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}-sasl.log"
+
+[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS}
+
+##--- End of overridden <var_name> variables
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins
index bd7d0b6a..8b5b30b7 100755
--- a/scripts/rabbitmq-plugins
+++ b/scripts/rabbitmq-plugins
@@ -19,13 +19,6 @@
# Non-empty defaults should be set in rabbitmq-env
. `dirname $0`/rabbitmq-env
-##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
-
-[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
-[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR}
-
-##--- End of overridden <var_name> variables
-
exec ${ERL_DIR}erl \
-pa "${RABBITMQ_HOME}/ebin" \
-noinput \
@@ -35,4 +28,5 @@ exec ${ERL_DIR}erl \
-s rabbit_plugins_main \
-enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \
-plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \
+ -nodename $RABBITMQ_NODENAME \
-extra "$@"
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index a535ebad..61e39e38 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -31,6 +31,10 @@ if "!RABBITMQ_BASE!"=="" (
set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
+if "!RABBITMQ_NODENAME!"=="" (
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
+)
+
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
@@ -51,7 +55,7 @@ if "!RABBITMQ_PLUGINS_DIR!"=="" (
set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index bd397441..686c90cf 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -19,48 +19,6 @@
# Non-empty defaults should be set in rabbitmq-env
. `dirname $0`/rabbitmq-env
-##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
-
-DEFAULT_NODE_IP_ADDRESS=auto
-DEFAULT_NODE_PORT=5672
-[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
-[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
-
-[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS}
-[ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT}
-
-[ "x" = "x$RABBITMQ_DIST_PORT" ] && RABBITMQ_DIST_PORT=${DIST_PORT}
-[ "x" = "x$RABBITMQ_DIST_PORT" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_DIST_PORT=$((${DEFAULT_NODE_PORT} + 20000))
-[ "x" = "x$RABBITMQ_DIST_PORT" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_DIST_PORT=$((${RABBITMQ_NODE_PORT} + 20000))
-
-[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
-[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS}
-[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE}
-[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE}
-[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE}
-[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS}
-
-[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR}
-[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}
-
-[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${PID_FILE}
-[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${RABBITMQ_MNESIA_DIR}.pid
-
-[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR}
-[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand
-
-[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
-
-[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR}
-
-## Log rotation
-[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS}
-[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}.log"
-[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS=${SASL_LOGS}
-[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}-sasl.log"
-
-##--- End of overridden <var_name> variables
-
RABBITMQ_START_RABBIT=
[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT=" -noinput"
[ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT="$RABBITMQ_START_RABBIT -s rabbit boot "
@@ -131,6 +89,7 @@ exec ${ERL_DIR}erl \
${RABBITMQ_CONFIG_ARG} \
+W w \
${RABBITMQ_SERVER_ERL_ARGS} \
+ ${RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS} \
${RABBITMQ_LISTEN_ARG} \
-sasl errlog_type error \
-sasl sasl_error_logger false \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 043204fa..e2312406 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -147,6 +147,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
-kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
!RABBITMQ_SERVER_ERL_ARGS! ^
+!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
-sasl errlog_type error ^
-sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 895561d4..fb2703f2 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -235,6 +235,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
!RABBITMQ_SERVER_START_ARGS! ^
+!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
!RABBITMQ_DIST_ARG! ^
!STARVAR!
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 309abf2a..31fe0afc 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -19,20 +19,21 @@
# Non-empty defaults should be set in rabbitmq-env
. `dirname $0`/rabbitmq-env
-##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
-
-[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
-[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS}
-
-##--- End of overridden <var_name> variables
+# rabbitmqctl starts distribution itself, so we need to make sure epmd
+# is running.
+${ERL_DIR}erl -sname rabbitmqctl-prelaunch-$$ -noinput -eval 'erlang:halt().'
+# We specify Mnesia dir and sasl error logger since some actions
+# (e.g. forget_cluster_node --offline) require us to impersonate the
+# real node.
exec ${ERL_DIR}erl \
-pa "${RABBITMQ_HOME}/ebin" \
-noinput \
-hidden \
${RABBITMQ_CTL_ERL_ARGS} \
- -sname rabbitmqctl$$ \
-boot "${CLEAN_BOOT_FILE}" \
+ -sasl errlog_type error \
+ -mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \
-s rabbit_control_main \
-nodename $RABBITMQ_NODENAME \
-extra "$@"
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index 8e8ba1bd..7a2c454c 100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -23,6 +23,10 @@ set TDP0=%~dp0
set STAR=%*
setlocal enabledelayedexpansion
+if "!RABBITMQ_BASE!"=="" (
+ set RABBITMQ_BASE=!APPDATA!\RabbitMQ
+)
+
if "!COMPUTERNAME!"=="" (
set COMPUTERNAME=localhost
)
@@ -31,6 +35,14 @@ if "!RABBITMQ_NODENAME!"=="" (
set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
+if "!RABBITMQ_MNESIA_BASE!"=="" (
+ set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE!/db
+)
+
+if "!RABBITMQ_MNESIA_DIR!"=="" (
+ set RABBITMQ_MNESIA_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-mnesia
+)
+
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
@@ -43,7 +55,20 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" (
exit /B
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM!!TIME:~9! -s rabbit_control_main -nodename !RABBITMQ_NODENAME! -extra !STAR!
+rem rabbitmqctl starts distribution itself, so we need to make sure epmd
+rem is running.
+"!ERLANG_HOME!\bin\erl.exe" -sname rabbitmqctl-prelaunch-!RANDOM!!TIME:~9! -noinput -eval "erlang:halt()."
+
+"!ERLANG_HOME!\bin\erl.exe" ^
+-pa "!TDP0!..\ebin" ^
+-noinput ^
+-hidden ^
+!RABBITMQ_CTL_ERL_ARGS! ^
+-sasl errlog_type error ^
+-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
+-s rabbit_control_main ^
+-nodename !RABBITMQ_NODENAME! ^
+-extra !STAR!
endlocal
endlocal
diff --git a/src/app_utils.erl b/src/app_utils.erl
index 0479ce66..87e6fa0b 100644
--- a/src/app_utils.erl
+++ b/src/app_utils.erl
@@ -17,7 +17,7 @@
-export([load_applications/1, start_applications/1, start_applications/2,
stop_applications/1, stop_applications/2, app_dependency_order/2,
- wait_for_applications/1]).
+ app_dependencies/1]).
-ifdef(use_specs).
@@ -28,8 +28,8 @@
-spec stop_applications([atom()]) -> 'ok'.
-spec start_applications([atom()], error_handler()) -> 'ok'.
-spec stop_applications([atom()], error_handler()) -> 'ok'.
--spec wait_for_applications([atom()]) -> 'ok'.
-spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()].
+-spec app_dependencies(atom()) -> [atom()].
-endif.
@@ -68,14 +68,10 @@ stop_applications(Apps, ErrorHandler) ->
ErrorHandler,
Apps).
-
-wait_for_applications(Apps) ->
- [wait_for_application(App) || App <- Apps], ok.
-
app_dependency_order(RootApps, StripUnreachable) ->
{ok, G} = rabbit_misc:build_acyclic_graph(
- fun (App, _Deps) -> [{App, App}] end,
- fun (App, Deps) -> [{Dep, App} || Dep <- Deps] end,
+ fun ({App, _Deps}) -> [{App, App}] end,
+ fun ({App, Deps}) -> [{Dep, App} || Dep <- Deps] end,
[{App, app_dependencies(App)} ||
{App, _Desc, _Vsn} <- application:loaded_applications()]),
try
@@ -92,13 +88,6 @@ app_dependency_order(RootApps, StripUnreachable) ->
%%---------------------------------------------------------------------------
%% Private API
-wait_for_application(Application) ->
- case lists:keymember(Application, 1, rabbit_misc:which_applications()) of
- true -> ok;
- false -> timer:sleep(1000),
- wait_for_application(Application)
- end.
-
load_applications(Worklist, Loaded) ->
case queue:out(Worklist) of
{empty, _WorkList} ->
diff --git a/src/gm.erl b/src/gm.erl
index 2235da33..3113f449 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -388,6 +388,9 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_info/3]).
+%% For INSTR_MOD callbacks
+-export([call/3, cast/2, monitor/1, demonitor/1]).
+
-ifndef(use_specs).
-export([behaviour_info/1]).
-endif.
@@ -616,6 +619,16 @@ handle_call({add_on_right, NewMember}, _From,
members_state = MembersState1 }),
handle_callback_result({Result, {ok, Group}, State1}).
+%% add_on_right causes a catchup to be sent immediately from the left,
+%% so we can never see this from the left neighbour. However, it's
+%% possible for the right neighbour to send us a check_neighbours
+%% immediately before that. We can't possibly handle it, but if we're
+%% in this state we know a catchup is coming imminently anyway. So
+%% just ignore it.
+handle_cast({?TAG, _ReqVer, check_neighbours},
+ State = #state { members_state = undefined }) ->
+ noreply(State);
+
handle_cast({?TAG, ReqVer, Msg},
State = #state { view = View,
members_state = MembersState,
@@ -1177,8 +1190,8 @@ can_erase_view_member(Self, Self, _LA, _LP) -> false;
can_erase_view_member(_Self, _Id, N, N) -> true;
can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
-neighbour_cast(N, Msg) -> gen_server2:cast(get_pid(N), Msg).
-neighbour_call(N, Msg) -> gen_server2:call(get_pid(N), Msg, infinity).
+neighbour_cast(N, Msg) -> ?INSTR_MOD:cast(get_pid(N), Msg).
+neighbour_call(N, Msg) -> ?INSTR_MOD:call(get_pid(N), Msg, infinity).
%% ---------------------------------------------------------------------------
%% View monitoring and maintanence
@@ -1192,7 +1205,7 @@ ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
{RealNeighbour, MRef};
ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
- true = erlang:demonitor(MRef),
+ true = ?INSTR_MOD:demonitor(MRef),
Msg = {?TAG, Ver, check_neighbours},
ok = neighbour_cast(RealNeighbour, Msg),
ok = case Neighbour of
@@ -1202,7 +1215,7 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
{Neighbour, maybe_monitor(Neighbour, Self)}.
maybe_monitor( Self, Self) -> undefined;
-maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)).
+maybe_monitor(Other, _Self) -> ?INSTR_MOD:monitor(get_pid(Other)).
check_neighbours(State = #state { self = Self,
left = Left,
@@ -1461,3 +1474,12 @@ last_pub( [], LP) -> LP;
last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List),
true = PubNum > LP, %% ASSERTION
PubNum.
+
+%% ---------------------------------------------------------------------------
+
+%% Uninstrumented versions
+
+call(Pid, Msg, Timeout) -> gen_server2:call(Pid, Msg, Timeout).
+cast(Pid, Msg) -> gen_server2:cast(Pid, Msg).
+monitor(Pid) -> erlang:monitor(process, Pid).
+demonitor(MRef) -> erlang:demonitor(MRef).
diff --git a/src/gm_qc.erl b/src/gm_qc.erl
new file mode 100644
index 00000000..394cbcbd
--- /dev/null
+++ b/src/gm_qc.erl
@@ -0,0 +1,384 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(gm_qc).
+-ifdef(use_proper_qc).
+
+-include_lib("proper/include/proper.hrl").
+
+-define(GROUP, test_group).
+-define(MAX_SIZE, 5).
+-define(MSG_TIMEOUT, 1000000). %% micros
+
+-export([prop_gm_test/0]).
+
+-behaviour(proper_statem).
+-export([initial_state/0, command/1, precondition/2, postcondition/3,
+ next_state/3]).
+
+-behaviour(gm).
+-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
+
+%% Helpers
+-export([do_join/0, do_leave/1, do_send/1, do_proceed1/1, do_proceed2/2]).
+
+%% For insertion into gm
+-export([call/3, cast/2, monitor/1, demonitor/1, execute_mnesia_transaction/1]).
+
+-record(state, {seq, %% symbolic and dynamic
+ instrumented, %% dynamic only
+ outstanding, %% dynamic only
+ monitors, %% dynamic only
+ all_join, %% for symbolic
+ to_join, %% dynamic only
+ to_leave %% for symbolic
+ }).
+
+prop_gm_test() ->
+ case ?INSTR_MOD of
+ ?MODULE -> ok;
+ _ -> exit(compile_with_INSTRUMENT_FOR_QC)
+ end,
+ process_flag(trap_exit, true),
+ erlang:register(?MODULE, self()),
+ ?FORALL(Cmds, commands(?MODULE), gm_test(Cmds)).
+
+gm_test(Cmds) ->
+ {_H, State, Res} = run_commands(?MODULE, Cmds),
+ cleanup(State),
+ ?WHENFAIL(
+ io:format("Result: ~p~n", [Res]),
+ aggregate(command_names(Cmds), Res =:= ok)).
+
+cleanup(S) ->
+ S2 = ensure_joiners_joined_and_msgs_received(S),
+ All = gms_joined(S2),
+ All = gms(S2), %% assertion - none to join
+ check_stale_members(All),
+ [gm:leave(GM) || GM <- All],
+ drain_and_proceed_gms(S2),
+ [await_death(GM) || GM <- All],
+ gm:forget_group(?GROUP),
+ ok.
+
+check_stale_members(All) ->
+ GMs = [P || P <- processes(), is_gm_process(?GROUP, P)],
+ case GMs -- All of
+ [] -> ok;
+ Rest -> exit({forgot, Rest})
+ end.
+
+is_gm_process(Group, P) ->
+ case process_info(P, dictionary) of
+ undefined -> false;
+ {dictionary, D} -> {gm, Group} =:= proplists:get_value(process_name, D)
+ end.
+
+await_death(P) ->
+ MRef = erlang:monitor(process, P),
+ await_death(MRef, P).
+
+await_death(MRef, P) ->
+ receive
+ {'DOWN', MRef, process, P, _} -> ok;
+ {'DOWN', _, _, _, _} -> await_death(MRef, P);
+ {'EXIT', _, normal} -> await_death(MRef, P);
+ {'EXIT', _, Reason} -> exit(Reason);
+ {joined, _GM} -> await_death(MRef, P);
+ {left, _GM} -> await_death(MRef, P);
+ Anything -> exit({stray_msg, Anything})
+ end.
+
+%% ---------------------------------------------------------------------------
+%% proper_statem
+%% ---------------------------------------------------------------------------
+
+initial_state() -> #state{seq = 1,
+ outstanding = dict:new(),
+ instrumented = dict:new(),
+ monitors = dict:new(),
+ all_join = sets:new(),
+ to_join = sets:new(),
+ to_leave = sets:new()}.
+
+command(S) ->
+ case {length(gms_symb_not_left(S)), length(gms_symb(S))} of
+ {0, 0} -> qc_join(S);
+ {0, _} -> frequency([{1, qc_join(S)},
+ {3, qc_proceed1(S)},
+ {5, qc_proceed2(S)}]);
+ _ -> frequency([{1, qc_join(S)},
+ {1, qc_leave(S)},
+ {10, qc_send(S)},
+ {5, qc_proceed1(S)},
+ {15, qc_proceed2(S)}])
+ end.
+
+qc_join(_S) -> {call,?MODULE,do_join, []}.
+qc_leave(S) -> {call,?MODULE,do_leave,[oneof(gms_symb_not_left(S))]}.
+qc_send(S) -> {call,?MODULE,do_send, [oneof(gms_symb_not_left(S))]}.
+qc_proceed1(S) -> {call,?MODULE,do_proceed1, [oneof(gms_symb(S))]}.
+qc_proceed2(S) -> {call,?MODULE,do_proceed2, [oneof(gms_symb(S)),
+ oneof(gms_symb(S))]}.
+
+precondition(S, {call, ?MODULE, do_join, []}) ->
+ length(gms_symb(S)) < ?MAX_SIZE;
+
+precondition(_S, {call, ?MODULE, do_leave, [_GM]}) ->
+ true;
+
+precondition(_S, {call, ?MODULE, do_send, [_GM]}) ->
+ true;
+
+precondition(_S, {call, ?MODULE, do_proceed1, [_GM]}) ->
+ true;
+
+precondition(_S, {call, ?MODULE, do_proceed2, [GM1, GM2]}) ->
+ GM1 =/= GM2.
+
+postcondition(_S, {call, _M, _F, _A}, _Res) ->
+ true.
+
+next_state(S = #state{to_join = ToSet,
+ all_join = AllSet}, GM, {call, ?MODULE, do_join, []}) ->
+ S#state{to_join = sets:add_element(GM, ToSet),
+ all_join = sets:add_element(GM, AllSet)};
+
+next_state(S = #state{to_leave = Set}, _Res, {call, ?MODULE, do_leave, [GM]}) ->
+ S#state{to_leave = sets:add_element(GM, Set)};
+
+next_state(S = #state{seq = Seq,
+ outstanding = Outstanding}, _Res,
+ {call, ?MODULE, do_send, [GM]}) ->
+ case is_pid(GM) andalso lists:member(GM, gms_joined(S)) of
+ true ->
+ %% Dynamic state, i.e. runtime
+ Msg = [{sequence, Seq},
+ {sent_to, GM},
+ {dests, gms_joined(S)}],
+ gm:broadcast(GM, Msg),
+ Outstanding1 = dict:map(
+ fun (_GM, Set) ->
+ gb_sets:add_element(Msg, Set)
+ end, Outstanding),
+ drain(S#state{seq = Seq + 1,
+ outstanding = Outstanding1});
+ false ->
+ S
+ end;
+
+next_state(S, _Res, {call, ?MODULE, do_proceed1, [Pid]}) ->
+ proceed(Pid, S);
+
+next_state(S, _Res, {call, ?MODULE, do_proceed2, [From, To]}) ->
+ proceed({From, To}, S).
+
+proceed(K, S = #state{instrumented = Msgs}) ->
+ case dict:find(K, Msgs) of
+ {ok, Q} -> case queue:out(Q) of
+ {{value, Thing}, Q2} ->
+ S2 = proceed(K, Thing, S),
+ S2#state{instrumented = dict:store(K, Q2, Msgs)};
+ {empty, _} ->
+ S
+ end;
+ error -> S
+ end.
+
+%% ---------------------------------------------------------------------------
+%% GM
+%% ---------------------------------------------------------------------------
+
+joined(Pid, _Members) -> Pid ! {joined, self()},
+ ok.
+members_changed(_Pid, _Bs, _Ds) -> ok.
+handle_msg(Pid, _From, Msg) -> Pid ! {gm, self(), Msg}, ok.
+terminate(Pid, _Reason) -> Pid ! {left, self()}.
+
+%% ---------------------------------------------------------------------------
+%% Helpers
+%% ---------------------------------------------------------------------------
+
+do_join() ->
+ {ok, GM} = gm:start_link(?GROUP, ?MODULE, self(),
+ fun execute_mnesia_transaction/1),
+ GM.
+
+do_leave(GM) ->
+ gm:leave(GM),
+ GM.
+
+%% We need to update the state, so do the work in next_state
+do_send( _GM) -> ok.
+do_proceed1(_Pid) -> ok.
+do_proceed2(_From, _To) -> ok.
+
+%% All GMs, joined and to join
+gms(#state{outstanding = Outstanding,
+ to_join = ToJoin}) ->
+ dict:fetch_keys(Outstanding) ++ sets:to_list(ToJoin).
+
+%% All GMs, joined and to join
+gms_joined(#state{outstanding = Outstanding}) ->
+ dict:fetch_keys(Outstanding).
+
+%% All GMs including those that have left (symbolic)
+gms_symb(#state{all_join = AllJoin}) ->
+ sets:to_list(AllJoin).
+
+%% All GMs not including those that have left (symbolic)
+gms_symb_not_left(#state{all_join = AllJoin,
+ to_leave = ToLeave}) ->
+ sets:to_list(sets:subtract(AllJoin, ToLeave)).
+
+drain(S) ->
+ receive
+ Msg -> drain(handle_msg(Msg, S))
+ after 10 -> S
+ end.
+
+drain_and_proceed_gms(S0) ->
+ S = #state{instrumented = Msgs} = drain(S0),
+ case dict:size(Msgs) of
+ 0 -> S;
+ _ -> S1 = dict:fold(
+ fun (Key, Q, Si) ->
+ lists:foldl(
+ fun (Msg, Sij) ->
+ proceed(Key, Msg, Sij)
+ end, Si, queue:to_list(Q))
+ end, S, Msgs),
+ drain_and_proceed_gms(S1#state{instrumented = dict:new()})
+ end.
+
+handle_msg({gm, GM, Msg}, S = #state{outstanding = Outstanding}) ->
+ case dict:find(GM, Outstanding) of
+ {ok, Set} ->
+ Set2 = gb_sets:del_element(Msg, Set),
+ S#state{outstanding = dict:store(GM, Set2, Outstanding)};
+ error ->
+ %% Message from GM that has already died. OK.
+ S
+ end;
+handle_msg({instrumented, Key, Thing}, S = #state{instrumented = Msgs}) ->
+ Q1 = case dict:find(Key, Msgs) of
+ {ok, Q} -> queue:in(Thing, Q);
+ error -> queue:from_list([Thing])
+ end,
+ S#state{instrumented = dict:store(Key, Q1, Msgs)};
+handle_msg({joined, GM}, S = #state{outstanding = Outstanding,
+ to_join = ToJoin}) ->
+ S#state{outstanding = dict:store(GM, gb_sets:empty(), Outstanding),
+ to_join = sets:del_element(GM, ToJoin)};
+handle_msg({left, GM}, S = #state{outstanding = Outstanding,
+ to_join = ToJoin}) ->
+ true = dict:is_key(GM, Outstanding) orelse sets:is_element(GM, ToJoin),
+ S#state{outstanding = dict:erase(GM, Outstanding),
+ to_join = sets:del_element(GM, ToJoin)};
+handle_msg({'DOWN', MRef, _, From, _} = Msg, S = #state{monitors = Mons}) ->
+ To = dict:fetch(MRef, Mons),
+ handle_msg({instrumented, {From, To}, {info, Msg}},
+ S#state{monitors = dict:erase(MRef, Mons)});
+handle_msg({'EXIT', _From, normal}, S) ->
+ S;
+handle_msg({'EXIT', _From, Reason}, _S) ->
+ %% We just trapped exits to get nicer SASL logging.
+ exit(Reason).
+
+proceed({_From, To}, {cast, Msg}, S) -> gen_server2:cast(To, Msg), S;
+proceed({_From, To}, {info, Msg}, S) -> To ! Msg, S;
+proceed({From, _To}, {wait, Ref}, S) -> From ! {proceed, Ref}, S;
+proceed({From, To}, {mon, Ref}, S) -> add_monitor(From, To, Ref, S);
+proceed(_Pid, {demon, MRef}, S) -> erlang:demonitor(MRef), S;
+proceed(Pid, {wait, Ref}, S) -> Pid ! {proceed, Ref}, S.
+
+%% NB From here is To in handle_msg/DOWN above, since the msg is going
+%% the other way
+add_monitor(From, To, Ref, S = #state{monitors = Mons}) ->
+ MRef = erlang:monitor(process, To),
+ From ! {mref, Ref, MRef},
+ S#state{monitors = dict:store(MRef, From, Mons)}.
+
+%% ----------------------------------------------------------------------------
+%% Assertions
+%% ----------------------------------------------------------------------------
+
+ensure_joiners_joined_and_msgs_received(S0) ->
+ S = drain_and_proceed_gms(S0),
+ case outstanding_joiners(S) of
+ true -> ensure_joiners_joined_and_msgs_received(S);
+ false -> case outstanding_msgs(S) of
+ [] -> S;
+ Out -> exit({outstanding_msgs, Out})
+ end
+ end.
+
+outstanding_joiners(#state{to_join = ToJoin}) ->
+ sets:size(ToJoin) > 0.
+
+outstanding_msgs(#state{outstanding = Outstanding}) ->
+ dict:fold(fun (GM, Set, OS) ->
+ case gb_sets:is_empty(Set) of
+ true -> OS;
+ false -> [{GM, gb_sets:to_list(Set)} | OS]
+ end
+ end, [], Outstanding).
+
+%% ---------------------------------------------------------------------------
+%% For insertion into GM
+%% ---------------------------------------------------------------------------
+
+call(Pid, Msg, infinity) ->
+ Ref = make_ref(),
+ whereis(?MODULE) ! {instrumented, {self(), Pid}, {wait, Ref}},
+ receive
+ {proceed, Ref} -> ok
+ end,
+ gen_server2:call(Pid, Msg, infinity).
+
+cast(Pid, Msg) ->
+ whereis(?MODULE) ! {instrumented, {self(), Pid}, {cast, Msg}},
+ ok.
+
+monitor(Pid) ->
+ Ref = make_ref(),
+ whereis(?MODULE) ! {instrumented, {self(), Pid}, {mon, Ref}},
+ receive
+ {mref, Ref, MRef} -> MRef
+ end.
+
+demonitor(MRef) ->
+ whereis(?MODULE) ! {instrumented, self(), {demon, MRef}},
+ true.
+
+execute_mnesia_transaction(Fun) ->
+ Ref = make_ref(),
+ whereis(?MODULE) ! {instrumented, self(), {wait, Ref}},
+ receive
+ {proceed, Ref} -> ok
+ end,
+ rabbit_misc:execute_mnesia_transaction(Fun).
+
+-else.
+
+-export([prop_disabled/0]).
+
+prop_disabled() ->
+ exit({compiled_without_proper,
+ "PropEr was not present during compilation of the test module. "
+ "Hence all tests are disabled."}).
+
+-endif.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 29e38c1f..191f04a4 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -22,10 +22,9 @@
stop_and_halt/0, await_startup/0, status/0, is_running/0,
is_running/1, environment/0, rotate_logs/1, force_event_refresh/1,
start_fhc/0]).
-
-export([start/2, stop/1]).
-
--export([log_location/1]). %% for testing
+-export([start_apps/1, stop_apps/1]).
+-export([log_location/1, config_files/0]). %% for testing and mgmt-agent
%%---------------------------------------------------------------------------
%% Boot steps.
@@ -202,6 +201,7 @@
%% practice 2 processes seems just as fast as any other number > 1,
%% and keeps the progress bar realistic-ish.
-define(HIPE_PROCESSES, 2).
+-define(ASYNC_THREADS_WARNING_THRESHOLD, 8).
%%----------------------------------------------------------------------------
@@ -211,6 +211,7 @@
%% this really should be an abstract type
-type(log_location() :: 'tty' | 'undefined' | file:filename()).
-type(param() :: atom()).
+-type(app_name() :: atom()).
-spec(start/0 :: () -> 'ok').
-spec(boot/0 :: () -> 'ok').
@@ -242,6 +243,8 @@
-spec(maybe_insert_default_data/0 :: () -> 'ok').
-spec(boot_delegate/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
+-spec(start_apps/1 :: ([app_name()]) -> 'ok').
+-spec(stop_apps/1 :: ([app_name()]) -> 'ok').
-endif.
@@ -312,9 +315,7 @@ start() ->
ok = ensure_working_log_handlers(),
rabbit_node_monitor:prepare_cluster_status_files(),
rabbit_mnesia:check_cluster_consistency(),
- ok = app_utils:start_applications(
- app_startup_order(), fun handle_app_error/2),
- ok = log_broker_started(rabbit_plugins:active())
+ broker_start()
end).
boot() ->
@@ -329,21 +330,14 @@ boot() ->
%% the upgrade, since if we are a secondary node the
%% primary node will have forgotten us
rabbit_mnesia:check_cluster_consistency(),
- Plugins = rabbit_plugins:setup(),
- ToBeLoaded = Plugins ++ ?APPS,
- ok = app_utils:load_applications(ToBeLoaded),
- StartupApps = app_utils:app_dependency_order(ToBeLoaded,
- false),
- ok = app_utils:start_applications(
- StartupApps, fun handle_app_error/2),
- ok = log_broker_started(Plugins)
+ broker_start()
end).
-handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) ->
- throw({could_not_start, App, Reason});
-
-handle_app_error(App, Reason) ->
- throw({could_not_start, App, Reason}).
+broker_start() ->
+ Plugins = rabbit_plugins:setup(),
+ ToBeLoaded = Plugins ++ ?APPS,
+ start_apps(ToBeLoaded),
+ ok = log_broker_started(rabbit_plugins:active()).
start_it(StartFun) ->
Marker = spawn_link(fun() -> receive stop -> ok end end),
@@ -371,10 +365,11 @@ start_it(StartFun) ->
stop() ->
case whereis(rabbit_boot) of
undefined -> ok;
- _ -> await_startup()
+ _ -> await_startup(true)
end,
rabbit_log:info("Stopping RabbitMQ~n"),
- ok = app_utils:stop_applications(app_shutdown_order()).
+ Apps = ?APPS ++ rabbit_plugins:active(),
+ stop_apps(app_utils:app_dependency_order(Apps, true)).
stop_and_halt() ->
try
@@ -385,8 +380,51 @@ stop_and_halt() ->
end,
ok.
+start_apps(Apps) ->
+ app_utils:load_applications(Apps),
+ OrderedApps = app_utils:app_dependency_order(Apps, false),
+ case lists:member(rabbit, Apps) of
+ false -> run_boot_steps(Apps); %% plugin activation
+ true -> ok %% will run during start of rabbit app
+ end,
+ ok = app_utils:start_applications(OrderedApps,
+ handle_app_error(could_not_start)).
+
+stop_apps(Apps) ->
+ ok = app_utils:stop_applications(
+ Apps, handle_app_error(error_during_shutdown)),
+ case lists:member(rabbit, Apps) of
+ false -> run_cleanup_steps(Apps); %% plugin deactivation
+ true -> ok %% it's all going anyway
+ end,
+ ok.
+
+handle_app_error(Term) ->
+ fun(App, {bad_return, {_MFA, {'EXIT', {ExitReason, _}}}}) ->
+ throw({Term, App, ExitReason});
+ (App, Reason) ->
+ throw({Term, App, Reason})
+ end.
+
+run_cleanup_steps(Apps) ->
+ [run_step(Name, Attrs, cleanup) || {_, Name, Attrs} <- find_steps(Apps)],
+ ok.
+
await_startup() ->
- app_utils:wait_for_applications(app_startup_order()).
+ await_startup(false).
+
+await_startup(HaveSeenRabbitBoot) ->
+ %% We don't take absence of rabbit_boot as evidence we've started,
+ %% since there's a small window before it is registered.
+ case whereis(rabbit_boot) of
+ undefined -> case HaveSeenRabbitBoot orelse is_running() of
+ true -> ok;
+ false -> timer:sleep(100),
+ await_startup(false)
+ end;
+ _ -> timer:sleep(100),
+ await_startup(true)
+ end.
status() ->
S1 = [{pid, list_to_integer(os:getpid())},
@@ -437,6 +475,9 @@ listeners() ->
ip_address = IP,
port = Port} <- Listeners, Node =:= node()].
+%% TODO this only determines if the rabbit application has started,
+%% not if it is running, never mind plugins. It would be nice to have
+%% more nuance here.
is_running() -> is_running(node()).
is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit).
@@ -468,7 +509,8 @@ start(normal, []) ->
true = register(rabbit, self()),
print_banner(),
log_banner(),
- [ok = run_boot_step(Step) || Step <- boot_steps()],
+ warn_if_kernel_config_dubious(),
+ run_boot_steps(),
{ok, SupPid};
Error ->
Error
@@ -483,42 +525,42 @@ stop(_State) ->
ok.
%%---------------------------------------------------------------------------
-%% application life cycle
+%% boot step logic
-app_startup_order() ->
- ok = app_utils:load_applications(?APPS),
- app_utils:app_dependency_order(?APPS, false).
+run_boot_steps() ->
+ run_boot_steps([App || {App, _, _} <- application:loaded_applications()]).
-app_shutdown_order() ->
- Apps = ?APPS ++ rabbit_plugins:active(),
- app_utils:app_dependency_order(Apps, true).
+run_boot_steps(Apps) ->
+ [ok = run_step(Step, Attrs, mfa) || {_, Step, Attrs} <- find_steps(Apps)],
+ ok.
-%%---------------------------------------------------------------------------
-%% boot step logic
+find_steps(Apps) ->
+ All = sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)),
+ [Step || {App, _, _} = Step <- All, lists:member(App, Apps)].
-run_boot_step({_StepName, Attributes}) ->
- case [MFA || {mfa, MFA} <- Attributes] of
+run_step(StepName, Attributes, AttributeName) ->
+ case [MFA || {Key, MFA} <- Attributes,
+ Key =:= AttributeName] of
[] ->
ok;
MFAs ->
[try
apply(M,F,A)
of
- ok -> ok;
- {error, Reason} -> boot_error(Reason, not_available)
+ ok -> ok;
+ {error, Reason} -> boot_error({boot_step, StepName, Reason},
+ not_available)
catch
- _:Reason -> boot_error(Reason, erlang:get_stacktrace())
+ _:Reason -> boot_error({boot_step, StepName, Reason},
+ erlang:get_stacktrace())
end || {M,F,A} <- MFAs],
ok
end.
-boot_steps() ->
- sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)).
+vertices({AppName, _Module, Steps}) ->
+ [{StepName, {AppName, StepName, Atts}} || {StepName, Atts} <- Steps].
-vertices(_Module, Steps) ->
- [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps].
-
-edges(_Module, Steps) ->
+edges({_AppName, _Module, Steps}) ->
[case Key of
requires -> {StepName, OtherStep};
enables -> {OtherStep, StepName}
@@ -527,7 +569,7 @@ edges(_Module, Steps) ->
Key =:= requires orelse Key =:= enables].
sort_boot_steps(UnsortedSteps) ->
- case rabbit_misc:build_acyclic_graph(fun vertices/2, fun edges/2,
+ case rabbit_misc:build_acyclic_graph(fun vertices/1, fun edges/1,
UnsortedSteps) of
{ok, G} ->
%% Use topological sort to find a consistent ordering (if
@@ -541,8 +583,8 @@ sort_boot_steps(UnsortedSteps) ->
digraph:delete(G),
%% Check that all mentioned {M,F,A} triples are exported.
case [{StepName, {M,F,A}} ||
- {StepName, Attributes} <- SortedSteps,
- {mfa, {M,F,A}} <- Attributes,
+ {_App, StepName, Attributes} <- SortedSteps,
+ {mfa, {M,F,A}} <- Attributes,
not erlang:function_exported(M, F, length(A))] of
[] -> SortedSteps;
MissingFunctions -> basic_boot_error(
@@ -782,6 +824,31 @@ log_banner() ->
end || S <- Settings]),
error_logger:info_msg("~s", [Banner]).
+warn_if_kernel_config_dubious() ->
+ case erlang:system_info(kernel_poll) of
+ true -> ok;
+ false -> error_logger:warning_msg(
+ "Kernel poll (epoll, kqueue, etc) is disabled. Throughput "
+ "and CPU utilization may worsen.~n")
+ end,
+ AsyncThreads = erlang:system_info(thread_pool_size),
+ case AsyncThreads < ?ASYNC_THREADS_WARNING_THRESHOLD of
+ true -> error_logger:warning_msg(
+ "Erlang VM is running with ~b I/O threads, "
+ "file I/O performance may worsen~n", [AsyncThreads]);
+ false -> ok
+ end,
+ IDCOpts = case application:get_env(kernel, inet_default_connect_options) of
+ undefined -> [];
+ {ok, Val} -> Val
+ end,
+ case proplists:get_value(nodelay, IDCOpts, false) of
+ false -> error_logger:warning_msg(
+ "Nagle's algorithm is enabled for sockets, "
+ "network I/O latency will be higher~n");
+ true -> ok
+ end.
+
home_dir() ->
case init:get_argument(home) of
{ok, [[Home]]} -> Home;
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1aba7ecb..4e23dbd2 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -18,7 +18,7 @@
-export([recover/0, stop/0, start/1, declare/5, declare/6,
delete_immediately/1, delete/3, purge/1, forget_all_durable/1]).
--export([pseudo_queue/2]).
+-export([pseudo_queue/2, immutable/1]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
@@ -29,8 +29,8 @@
-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
--export([on_node_down/1]).
--export([update/2, store_queue/1, policy_changed/2]).
+-export([on_node_up/1, on_node_down/1]).
+-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]).
-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
cancel_sync_mirrors/1]).
@@ -174,9 +174,12 @@
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()).
+-spec(immutable/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()).
-spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(update_decorators/1 :: (name()) -> 'ok').
-spec(policy_changed/2 ::
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-spec(start_mirroring/1 :: (pid()) -> 'ok').
@@ -254,15 +257,17 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
%% effect) this might not be possible to satisfy.
declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
ok = check_declare_arguments(QueueName, Args),
- Q = rabbit_policy:set(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
- exclusive_owner = Owner,
- pid = none,
- slave_pids = [],
- sync_slave_pids = [],
- gm_pids = []}),
+ Q = rabbit_queue_decorator:set(
+ rabbit_policy:set(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = none,
+ slave_pids = [],
+ sync_slave_pids = [],
+ down_slave_nodes = [],
+ gm_pids = []})),
Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity).
@@ -308,12 +313,24 @@ store_queue(Q = #amqqueue{durable = true}) ->
ok = mnesia:write(rabbit_durable_queue,
Q#amqqueue{slave_pids = [],
sync_slave_pids = [],
- gm_pids = []}, write),
- ok = mnesia:write(rabbit_queue, Q, write),
- ok;
+ gm_pids = [],
+ decorators = undefined}, write),
+ store_queue_ram(Q);
store_queue(Q = #amqqueue{durable = false}) ->
- ok = mnesia:write(rabbit_queue, Q, write),
- ok.
+ store_queue_ram(Q).
+
+store_queue_ram(Q) ->
+ ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q), write).
+
+update_decorators(Name) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ case mnesia:wread({rabbit_queue, Name}) of
+ [Q] -> store_queue_ram(Q),
+ ok;
+ [] -> ok
+ end
+ end).
policy_changed(Q1 = #amqqueue{decorators = Decorators1},
Q2 = #amqqueue{decorators = Decorators2}) ->
@@ -650,15 +667,23 @@ forget_all_durable(Node) ->
fun () ->
Qs = mnesia:match_object(rabbit_durable_queue,
#amqqueue{_ = '_'}, write),
- [rabbit_binding:process_deletions(
- internal_delete1(Name)) ||
- #amqqueue{name = Name, pid = Pid} = Q <- Qs,
- node(Pid) =:= Node,
- rabbit_policy:get(<<"ha-mode">>, Q) =:= undefined],
+ [forget_node_for_queue(Q) || #amqqueue{pid = Pid} = Q <- Qs,
+ node(Pid) =:= Node],
ok
end),
ok.
+forget_node_for_queue(#amqqueue{name = Name,
+ down_slave_nodes = []}) ->
+ %% No slaves to recover from, queue is gone
+ rabbit_binding:process_deletions(internal_delete1(Name));
+
+forget_node_for_queue(Q = #amqqueue{down_slave_nodes = [H|T]}) ->
+ %% Promote a slave while down - it'll happily recover as a master
+ Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H),
+ down_slave_nodes = T},
+ ok = mnesia:write(rabbit_durable_queue, Q1, write).
+
run_backing_queue(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
@@ -674,6 +699,20 @@ stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring).
sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors).
cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors).
+on_node_up(Node) ->
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ Qs = mnesia:match_object(rabbit_queue,
+ #amqqueue{_ = '_'}, write),
+ [case lists:member(Node, DSNs) of
+ true -> DSNs1 = DSNs -- [Node],
+ store_queue(
+ Q#amqqueue{down_slave_nodes = DSNs1});
+ false -> ok
+ end || #amqqueue{down_slave_nodes = DSNs} = Q <- Qs],
+ ok
+ end).
+
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
@@ -709,6 +748,14 @@ pseudo_queue(QueueName, Pid) ->
pid = Pid,
slave_pids = []}.
+immutable(Q) -> Q#amqqueue{pid = none,
+ slave_pids = none,
+ sync_slave_pids = none,
+ down_slave_nodes = none,
+ gm_pids = none,
+ policy = none,
+ decorators = none}.
+
deliver([], _Delivery, _Flow) ->
%% /dev/null optimisation
[];
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9b785303..63b18655 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -84,6 +84,7 @@
memory,
slave_pids,
synchronised_slave_pids,
+ down_slave_nodes,
backing_queue_status,
state
]).
@@ -102,7 +103,8 @@
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
-info_keys() -> ?INFO_KEYS.
+info_keys() -> ?INFO_KEYS ++ rabbit_backing_queue:info_keys().
+statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys().
%%----------------------------------------------------------------------------
@@ -385,12 +387,12 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined,
V when V > 0 -> V + 999; %% always fire later
_ -> 0
end) div 1000,
- TRef = erlang:send_after(After, self(), {drop_expired, Version}),
+ TRef = rabbit_misc:send_after(After, self(), {drop_expired, Version}),
State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
ttl_timer_expiry = TExpiry})
when Expiry + 1000 < TExpiry ->
- case erlang:cancel_timer(TRef) of
+ case rabbit_misc:cancel_timer(TRef) of
false -> State;
_ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
end;
@@ -810,19 +812,25 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
+i(down_slave_nodes, #q{q = #amqqueue{name = Name,
+ durable = Durable}}) ->
+ {ok, Q = #amqqueue{down_slave_nodes = Nodes}} =
+ rabbit_amqqueue:lookup(Name),
+ case Durable andalso rabbit_mirror_queue_misc:is_mirrored(Q) of
+ false -> '';
+ true -> Nodes
+ end;
i(state, #q{status = running}) -> credit_flow:state();
i(state, #q{status = State}) -> State;
-i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
- BQ:status(BQS);
-i(Item, _) ->
- throw({bad_argument, Item}).
+i(Item, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
+ BQ:info(Item, BQS).
emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
ExtraKs = [K || {K, _} <- Extra],
- Infos = [{K, V} || {K, V} <- infos(?STATISTICS_KEYS, State),
+ Infos = [{K, V} || {K, V} <- infos(statistics_keys(), State),
not lists:member(K, ExtraKs)],
rabbit_event:notify(queue_stats, Extra ++ Infos).
@@ -922,7 +930,7 @@ handle_call({init, Recover}, From,
end;
handle_call(info, _From, State) ->
- reply(infos(?INFO_KEYS, State), State);
+ reply(infos(info_keys(), State), State);
handle_call({info, Items}, _From, State) ->
try
@@ -1165,7 +1173,7 @@ handle_cast({force_event_refresh, Ref},
emit_consumer_created(
Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref)
end,
- noreply(State);
+ noreply(rabbit_event:init_stats_timer(State, #q.stats_timer));
handle_cast(notify_decorators, State) ->
notify_decorators(State),
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 8f37bf60..9e5f0813 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -16,6 +16,12 @@
-module(rabbit_backing_queue).
+-export([info_keys/0]).
+
+-define(INFO_KEYS, [messages_ram, messages_ready_ram,
+ messages_unacknowledged_ram, messages_persistent,
+ backing_queue_status]).
+
-ifdef(use_specs).
%% We can't specify a per-queue ack/state with callback signatures
@@ -37,6 +43,8 @@
-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)).
-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
+-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
+
%% Called on startup with a list of durable queue names. The queues
%% aren't being started at this point, but this call allows the
%% backing queue to perform any checking necessary for the consistency
@@ -216,9 +224,7 @@
%% inbound messages and outbound messages at the moment.
-callback msg_rates(state()) -> {float(), float()}.
-%% Exists for debugging purposes, to be able to expose state via
-%% rabbitmqctl list_queues backing_queue_status
--callback status(state()) -> [{atom(), any()}].
+-callback info(atom(), state()) -> any().
%% Passed a function to be invoked with the relevant backing queue's
%% state. Useful for when the backing queue or other components need
@@ -243,9 +249,11 @@ behaviour_info(callbacks) ->
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
- {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, {status, 1},
- {invoke, 3}, {is_duplicate, 2}] ;
+ {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, {info_keys, 0},
+ {infos, 2}, {invoke, 3}, {is_duplicate, 2}] ;
behaviour_info(_Other) ->
undefined.
-endif.
+
+info_keys() -> ?INFO_KEYS.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 043ec7e3..d2f6719c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -341,7 +341,7 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) ->
handle_cast({force_event_refresh, Ref}, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State),
Ref),
- noreply(State);
+ noreply(rabbit_event:init_stats_timer(State, #ch.stats_timer));
handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) ->
%% NB: don't call noreply/1 since we don't want to send confirms.
@@ -433,17 +433,22 @@ send(_Command, #ch{state = closing}) ->
send(Command, #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Command).
-handle_exception(Reason, State = #ch{protocol = Protocol,
- channel = Channel,
- writer_pid = WriterPid,
- reader_pid = ReaderPid,
- conn_pid = ConnPid}) ->
+handle_exception(Reason, State = #ch{protocol = Protocol,
+ channel = Channel,
+ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
+ conn_pid = ConnPid,
+ conn_name = ConnName,
+ virtual_host = VHost,
+ user = User}) ->
%% something bad's happened: notify_queues may not be 'ok'
{_Result, State1} = notify_queues(State),
case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of
{Channel, CloseMethod} ->
- rabbit_log:error("connection ~p, channel ~p - soft error:~n~p~n",
- [ConnPid, Channel, Reason]),
+ rabbit_log:error("Channel error on connection ~p (~s, vhost: '~s',"
+ " user: '~s'), channel ~p:~n~p~n",
+ [ConnPid, ConnName, VHost, User#user.username,
+ Channel, Reason]),
ok = rabbit_writer:send_command(WriterPid, CloseMethod),
{noreply, State1};
{0, _} ->
@@ -668,8 +673,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
mandatory = Mandatory},
Content, State = #ch{virtual_host = VHostPath,
tx = Tx,
+ channel = ChannelNum,
confirm_enabled = ConfirmEnabled,
- trace_state = TraceState}) ->
+ trace_state = TraceState,
+ user = #user{username = Username},
+ conn_name = ConnName}) ->
check_msg_size(Content),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
@@ -690,7 +698,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
- rabbit_trace:tap_in(Message, TraceState),
+ rabbit_trace:tap_in(Message, ConnName, ChannelNum,
+ Username, TraceState),
Delivery = rabbit_basic:delivery(
Mandatory, DoConfirm, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
@@ -992,7 +1001,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
QueueName,
fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
Q, Durable, AutoDelete, Args, Owner),
- rabbit_amqqueue:stat(Q)
+ maybe_stat(NoWait, Q)
end) of
{ok, MessageCount, ConsumerCount} ->
return_queue_declare_ok(QueueName, NoWait, MessageCount,
@@ -1048,7 +1057,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
{{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
rabbit_amqqueue:with_or_die(
- QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
+ QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end),
ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
State);
@@ -1204,6 +1213,9 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
E
end.
+maybe_stat(false, Q) -> rabbit_amqqueue:stat(Q);
+maybe_stat(true, _Q) -> {ok, 0, 0}.
+
consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
queue_monitors = QMons,
@@ -1365,7 +1377,10 @@ record_sent(ConsumerTag, AckRequired,
Msg = {QName, QPid, MsgId, Redelivered, _Message},
State = #ch{unacked_message_q = UAMQ,
next_tag = DeliveryTag,
- trace_state = TraceState}) ->
+ trace_state = TraceState,
+ user = #user{username = Username},
+ conn_name = ConnName,
+ channel = ChannelNum}) ->
?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of
{none, true} -> get;
{none, false} -> get_no_ack;
@@ -1376,7 +1391,7 @@ record_sent(ConsumerTag, AckRequired,
true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State);
false -> ok
end,
- rabbit_trace:tap_out(Msg, TraceState),
+ rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
UAMQ1 = case AckRequired of
true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
UAMQ);
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
index 81c17fbf..db9349ac 100644
--- a/src/rabbit_channel_interceptor.erl
+++ b/src/rabbit_channel_interceptor.erl
@@ -33,7 +33,7 @@
-callback description() -> [proplists:property()].
-callback intercept(original_method(), rabbit_types:vhost()) ->
- rabbit_types:ok_or_error2(processed_method(), any()).
+ processed_method() | rabbit_misc:channel_or_connection_exit().
%% Whether the interceptor wishes to intercept the amqp method
-callback applies_to(intercept_method()) -> boolean().
@@ -62,20 +62,15 @@ intercept_method(M, VHost) ->
intercept_method(M, _VHost, []) ->
M;
intercept_method(M, VHost, [I]) ->
- case I:intercept(M, VHost) of
- {ok, M2} ->
- case validate_method(M, M2) of
- true ->
- M2;
- _ ->
- internal_error("Interceptor: ~p expected "
- "to return method: ~p but returned: ~p",
- [I, rabbit_misc:method_record_type(M),
- rabbit_misc:method_record_type(M2)])
- end;
- {error, Reason} ->
- internal_error("Interceptor: ~p failed with reason: ~p",
- [I, Reason])
+ M2 = I:intercept(M, VHost),
+ case validate_method(M, M2) of
+ true ->
+ M2;
+ _ ->
+ internal_error("Interceptor: ~p expected "
+ "to return method: ~p but returned: ~p",
+ [I, rabbit_misc:method_record_type(M),
+ rabbit_misc:method_record_type(M2)])
end;
intercept_method(M, _VHost, Is) ->
internal_error("More than one interceptor for method: ~p -- ~p",
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 451f4d70..70fe10e6 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -54,6 +54,7 @@
change_cluster_node_type,
update_cluster_nodes,
{forget_cluster_node, [?OFFLINE_DEF]},
+ force_boot,
cluster_status,
{sync_queue, [?VHOST_DEF]},
{cancel_sync_queue, [?VHOST_DEF]},
@@ -114,6 +115,12 @@
{"Policies", rabbit_policy, list_formatted, info_keys},
{"Parameters", rabbit_runtime_parameters, list_formatted, info_keys}]).
+-define(COMMANDS_NOT_REQUIRING_APP,
+ [stop, stop_app, start_app, wait, reset, force_reset, rotate_logs,
+ join_cluster, change_cluster_node_type, update_cluster_nodes,
+ forget_cluster_node, cluster_status, status, environment, eval,
+ force_boot]).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -131,6 +138,7 @@
%%----------------------------------------------------------------------------
start() ->
+ start_distribution(),
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
{Command, Opts, Args} =
case parse_arguments(init:get_plain_arguments(), NodeStr) of
@@ -154,7 +162,7 @@ start() ->
%% The reason we don't use a try/catch here is that rpc:call turns
%% thrown errors into normal return values
- case catch action(Command, Node, Args, Opts, Inform) of
+ case catch do_action(Command, Node, Args, Opts, Inform) of
ok ->
case Quiet of
true -> ok;
@@ -249,6 +257,15 @@ parse_arguments(CmdLine, NodeStr) ->
%%----------------------------------------------------------------------------
+do_action(Command, Node, Args, Opts, Inform) ->
+ case lists:member(Command, ?COMMANDS_NOT_REQUIRING_APP) of
+ false -> case ensure_app_running(Node) of
+ ok -> action(Command, Node, Args, Opts, Inform);
+ E -> E
+ end;
+ true -> action(Command, Node, Args, Opts, Inform)
+ end.
+
action(stop, Node, Args, _Opts, Inform) ->
Inform("Stopping and halting node ~p", [Node]),
Res = call(Node, {rabbit, stop_and_halt, []}),
@@ -302,8 +319,19 @@ action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) ->
ClusterNode = list_to_atom(ClusterNodeS),
RemoveWhenOffline = proplists:get_bool(?OFFLINE_OPT, Opts),
Inform("Removing node ~p from cluster", [ClusterNode]),
- rpc_call(Node, rabbit_mnesia, forget_cluster_node,
- [ClusterNode, RemoveWhenOffline]);
+ case RemoveWhenOffline of
+ true -> become(Node),
+ rabbit_mnesia:forget_cluster_node(ClusterNode, true);
+ false -> rpc_call(Node, rabbit_mnesia, forget_cluster_node,
+ [ClusterNode, false])
+ end;
+
+action(force_boot, Node, [], _Opts, Inform) ->
+ Inform("Forcing boot for Mnesia dir ~s", [mnesia:system_info(directory)]),
+ case rabbit:is_running(Node) of
+ false -> rabbit_mnesia:force_load_next_boot();
+ true -> {error, rabbit_running}
+ end;
action(sync_queue, Node, [Q], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
@@ -650,6 +678,22 @@ exit_loop(Port) ->
{Port, _} -> exit_loop(Port)
end.
+start_distribution() ->
+ CtlNodeName = rabbit_misc:format("rabbitmqctl-~s", [os:getpid()]),
+ {ok, _} = net_kernel:start([list_to_atom(CtlNodeName), shortnames]).
+
+become(BecomeNode) ->
+ case net_adm:ping(BecomeNode) of
+ pong -> exit({node_running, BecomeNode});
+ pang -> io:format(" * Impersonating node: ~s...", [BecomeNode]),
+ error_logger:tty(false),
+ ok = net_kernel:stop(),
+ {ok, _} = net_kernel:start([BecomeNode, shortnames]),
+ io:format(" done~n", []),
+ Dir = mnesia:system_info(directory),
+ io:format(" * Mnesia directory : ~s~n", [Dir])
+ end.
+
%%----------------------------------------------------------------------------
default_if_empty(List, Default) when is_list(List) ->
@@ -715,6 +759,17 @@ unsafe_rpc(Node, Mod, Fun, Args) ->
Normal -> Normal
end.
+ensure_app_running(Node) ->
+ case call(Node, {rabbit, is_running, []}) of
+ true -> ok;
+ false -> {error_string,
+ rabbit_misc:format(
+ "rabbit application is not running on node ~s.~n"
+ " * Suggestion: start it with \"rabbitmqctl start_app\" "
+ "and try again", [Node])};
+ Other -> Other
+ end.
+
call(Node, {Mod, Fun, Args}) ->
rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index ec32e687..728bc431 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -129,6 +129,9 @@ is_cycle(Queue, Deaths) ->
{longstr, <<"rejected">>} =/=
rabbit_misc:table_lookup(D, <<"reason">>);
(_) ->
+ %% There was something we didn't expect, therefore
+ %% a client must have put it there, therefore the
+ %% cycle was not "fully automatic".
false
end, Cycle ++ [H])
end.
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index b867223b..a33103fd 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -23,6 +23,7 @@
ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]).
-export([stats_level/2, if_enabled/3]).
-export([notify/2, notify/3, notify_if/3]).
+-export([sync_notify/2, sync_notify/3]).
%%----------------------------------------------------------------------------
@@ -61,6 +62,9 @@
-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
-spec(notify/3 :: (event_type(), event_props(), reference() | 'none') -> 'ok').
-spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok').
+-spec(sync_notify/2 :: (event_type(), event_props()) -> 'ok').
+-spec(sync_notify/3 :: (event_type(), event_props(),
+ reference() | 'none') -> 'ok').
-endif.
@@ -145,7 +149,16 @@ notify_if(false, _Type, _Props) -> ok.
notify(Type, Props) -> notify(Type, Props, none).
notify(Type, Props, Ref) ->
- gen_event:notify(?MODULE, #event{type = Type,
- props = Props,
- reference = Ref,
- timestamp = os:timestamp()}).
+ gen_event:notify(?MODULE, event_cons(Type, Props, Ref)).
+
+sync_notify(Type, Props) -> sync_notify(Type, Props, none).
+
+sync_notify(Type, Props, Ref) ->
+ gen_event:sync_notify(?MODULE, event_cons(Type, Props, Ref)).
+
+event_cons(Type, Props, Ref) ->
+ #event{type = Type,
+ props = Props,
+ reference = Ref,
+ timestamp = os:timestamp()}.
+
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 4d4a2a58..a1772f0a 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -20,7 +20,8 @@
-export([recover/0, policy_changed/2, callback/4, declare/6,
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
- lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3,
+ lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
+ update_scratch/3, update_decorators/1, immutable/1,
info_keys/0, info/1, info/2, info_all/1, info_all/2,
route/2, delete/2, validate_binding/2]).
%% these must be run inside a mnesia tx
@@ -61,6 +62,7 @@
-spec(lookup_or_die/1 ::
(name()) -> rabbit_types:exchange() |
rabbit_types:channel_exit()).
+-spec(list/0 :: () -> [rabbit_types:exchange()]).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]).
-spec(lookup_scratch/2 :: (name(), atom()) ->
rabbit_types:ok(term()) |
@@ -70,6 +72,8 @@
(name(),
fun((rabbit_types:exchange()) -> rabbit_types:exchange()))
-> not_found | rabbit_types:exchange()).
+-spec(update_decorators/1 :: (name()) -> 'ok').
+-spec(immutable/1 :: (rabbit_types:exchange()) -> rabbit_types:exchange()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()).
-spec(info/2 ::
@@ -106,24 +110,15 @@ recover() ->
mnesia:read({rabbit_exchange, XName}) =:= []
end,
fun (X, Tx) ->
- case Tx of
- true -> store(X);
- false -> ok
- end,
- callback(X, create, map_create_tx(Tx), [X])
+ X1 = case Tx of
+ true -> store_ram(X);
+ false -> rabbit_exchange_decorator:set(X)
+ end,
+ callback(X1, create, map_create_tx(Tx), [X1])
end,
rabbit_durable_exchange),
- report_missing_decorators(Xs),
[XName || #exchange{name = XName} <- Xs].
-report_missing_decorators(Xs) ->
- Mods = lists:usort(lists:append([rabbit_exchange_decorator:select(raw, D) ||
- #exchange{decorators = D} <- Xs])),
- case [M || M <- Mods, code:which(M) =:= non_existing] of
- [] -> ok;
- M -> rabbit_log:warning("Missing exchange decorators: ~p~n", [M])
- end.
-
callback(X = #exchange{type = XType,
decorators = Decorators}, Fun, Serial0, Args) ->
Serial = if is_function(Serial0) -> Serial0;
@@ -158,12 +153,13 @@ serial(#exchange{name = XName} = X) ->
end.
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
- X = rabbit_policy:set(#exchange{name = XName,
- type = Type,
- durable = Durable,
- auto_delete = AutoDelete,
- internal = Internal,
- arguments = Args}),
+ X = rabbit_exchange_decorator:set(
+ rabbit_policy:set(#exchange{name = XName,
+ type = Type,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ internal = Internal,
+ arguments = Args})),
XT = type_to_module(Type),
%% We want to upset things if it isn't ok
ok = XT:validate(X),
@@ -171,13 +167,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
fun () ->
case mnesia:wread({rabbit_exchange, XName}) of
[] ->
- store(X),
- ok = case Durable of
- true -> mnesia:write(rabbit_durable_exchange,
- X, write);
- false -> ok
- end,
- {new, X};
+ {new, store(X)};
[ExistingX] ->
{existing, ExistingX}
end
@@ -195,7 +185,19 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
map_create_tx(true) -> transaction;
map_create_tx(false) -> none.
-store(X) -> ok = mnesia:write(rabbit_exchange, X, write).
+
+store(X = #exchange{durable = true}) ->
+ mnesia:write(rabbit_durable_exchange, X#exchange{decorators = undefined},
+ write),
+ store_ram(X);
+store(X = #exchange{durable = false}) ->
+ store_ram(X).
+
+store_ram(X) ->
+ X1 = rabbit_exchange_decorator:set(X),
+ ok = mnesia:write(rabbit_exchange, rabbit_exchange_decorator:set(X1),
+ write),
+ X1.
%% Used with binaries sent over the wire; the type may not exist.
check_type(TypeBin) ->
@@ -243,6 +245,8 @@ lookup_or_die(Name) ->
{error, not_found} -> rabbit_misc:not_found(Name)
end.
+list() -> mnesia:dirty_match_object(rabbit_exchange, #exchange{_ = '_'}).
+
%% Not dirty_match_object since that would not be transactional when used in a
%% tx context
list(VHostPath) ->
@@ -287,20 +291,27 @@ update_scratch(Name, App, Fun) ->
ok
end).
+update_decorators(Name) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ case mnesia:wread({rabbit_exchange, Name}) of
+ [X] -> store_ram(X),
+ ok;
+ [] -> ok
+ end
+ end).
+
update(Name, Fun) ->
case mnesia:wread({rabbit_exchange, Name}) of
- [X = #exchange{durable = Durable}] ->
- X1 = Fun(X),
- ok = mnesia:write(rabbit_exchange, X1, write),
- case Durable of
- true -> ok = mnesia:write(rabbit_durable_exchange, X1, write);
- _ -> ok
- end,
- X1;
- [] ->
- not_found
+ [X] -> X1 = Fun(X),
+ store(X1);
+ [] -> not_found
end.
+immutable(X) -> X#exchange{scratches = none,
+ policy = none,
+ decorators = none}.
+
info_keys() -> ?INFO_KEYS.
map(VHostPath, F) ->
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index 2f056b1b..900f9c32 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([select/2, set/1]).
+-export([select/2, set/1, register/2, unregister/1]).
%% This is like an exchange type except that:
%%
@@ -104,3 +104,25 @@ list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
cons_if_eq(Select, Select, Item, List) -> [Item | List];
cons_if_eq(_Select, _Other, _Item, List) -> List.
+
+register(TypeName, ModuleName) ->
+ rabbit_registry:register(exchange_decorator, TypeName, ModuleName),
+ [maybe_recover(X) || X <- rabbit_exchange:list()],
+ ok.
+
+unregister(TypeName) ->
+ rabbit_registry:unregister(exchange_decorator, TypeName),
+ [maybe_recover(X) || X <- rabbit_exchange:list()],
+ ok.
+
+maybe_recover(X = #exchange{name = Name,
+ decorators = Decs}) ->
+ #exchange{decorators = Decs1} = set(X),
+ Old = lists:sort(select(all, Decs)),
+ New = lists:sort(select(all, Decs1)),
+ case New of
+ Old -> ok;
+ _ -> %% TODO create a tx here for non-federation decorators
+ [M:create(none, X) || M <- New -- Old],
+ rabbit_exchange:update_decorators(Name)
+ end.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 2b16b911..9bccf5dd 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -22,7 +22,7 @@
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
- msg_rates/1, status/1, invoke/3, is_duplicate/2]).
+ msg_rates/1, info/2, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
@@ -170,10 +170,24 @@ terminate({shutdown, dropped} = Reason,
State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)};
terminate(Reason,
- State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ State = #state { name = QName,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
%% shouldn't be deleted. Most likely safe shutdown of this
- %% node. Thus just let some other slave take over.
+ %% node.
+ {ok, Q = #amqqueue{sync_slave_pids = SSPids}} =
+ rabbit_amqqueue:lookup(QName),
+ case SSPids =:= [] andalso
+ rabbit_policy:get(<<"ha-promote-on-shutdown">>, Q) =/= <<"always">> of
+ true -> %% Remove the whole queue to avoid data loss
+ rabbit_mirror_queue_misc:log_warning(
+ QName, "Stopping all nodes on master shutdown since no "
+ "synchronised slave is available~n", []),
+ stop_all_slaves(Reason, State);
+ false -> %% Just let some other slave take over.
+ ok
+ end,
State #state { backing_queue_state = BQ:terminate(Reason, BQS) }.
delete_and_terminate(Reason, State = #state { backing_queue = BQ,
@@ -181,7 +195,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ,
stop_all_slaves(Reason, State),
State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
-stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
+stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
MRefs = [erlang:monitor(process, Pid) || Pid <- [GM | SPids]],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
@@ -360,10 +374,13 @@ resume(State = #state { backing_queue = BQ,
msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:msg_rates(BQS).
-status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
- BQ:status(BQS) ++
+info(backing_queue_status,
+ State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:info(backing_queue_status, BQS) ++
[ {mirror_seen, dict:size(State #state.seen_status)},
- {mirror_senders, sets:size(State #state.known_senders)} ].
+ {mirror_senders, sets:size(State #state.known_senders)} ];
+info(Item, #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:info(Item, BQS).
invoke(?MODULE, Fun, State) ->
Fun(?MODULE, State);
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index b0f092a9..9e8c4a18 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -29,16 +29,19 @@
-include("rabbit.hrl").
--rabbit_boot_step({?MODULE,
- [{description, "HA policy validation"},
- {mfa, {rabbit_registry, register,
- [policy_validator, <<"ha-mode">>, ?MODULE]}},
- {mfa, {rabbit_registry, register,
- [policy_validator, <<"ha-params">>, ?MODULE]}},
- {mfa, {rabbit_registry, register,
- [policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
- {requires, rabbit_registry},
- {enables, recovery}]}).
+-rabbit_boot_step(
+ {?MODULE,
+ [{description, "HA policy validation"},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-mode">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-params">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
%%----------------------------------------------------------------------------
@@ -75,9 +78,10 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
%% get here.
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [Q = #amqqueue { pid = QPid,
- slave_pids = SPids,
- gm_pids = GMPids }] ->
+ [Q = #amqqueue { pid = QPid,
+ slave_pids = SPids,
+ gm_pids = GMPids,
+ down_slave_nodes = DSNs}] ->
{DeadGM, AliveGM} = lists:partition(
fun ({GM, _}) ->
lists:member(GM, DeadGMPids)
@@ -86,6 +90,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
AlivePids = [Pid || {_GM, Pid} <- AliveGM],
Alive = [Pid || Pid <- [QPid | SPids],
lists:member(Pid, AlivePids)],
+ DSNs1 = [node(Pid) ||
+ Pid <- SPids,
+ not lists:member(Pid, AlivePids)] ++ DSNs,
{QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
@@ -94,9 +101,10 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
- Q1 = Q#amqqueue{pid = QPid1,
- slave_pids = SPids1,
- gm_pids = AliveGM},
+ Q1 = Q#amqqueue{pid = QPid1,
+ slave_pids = SPids1,
+ gm_pids = AliveGM,
+ down_slave_nodes = DSNs1},
store_updated_slaves(Q1),
%% If we add and remove nodes at the same time we
%% might tell the old master we need to sync and
@@ -106,8 +114,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
_ ->
%% Master has changed, and we're not it.
%% [1].
- Q1 = Q#amqqueue{slave_pids = Alive,
- gm_pids = AliveGM},
+ Q1 = Q#amqqueue{slave_pids = Alive,
+ gm_pids = AliveGM,
+ down_slave_nodes = DSNs1},
store_updated_slaves(Q1)
end,
{ok, QPid1, DeadPids}
@@ -236,12 +245,16 @@ log(Level, QName, Fmt, Args) ->
rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt,
[rabbit_misc:rs(QName) | Args]).
-store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
- sync_slave_pids = SSPids}) ->
+store_updated_slaves(Q = #amqqueue{pid = MPid,
+ slave_pids = SPids,
+ sync_slave_pids = SSPids,
+ down_slave_nodes = DSNs}) ->
%% TODO now that we clear sync_slave_pids in rabbit_durable_queue,
%% do we still need this filtering?
SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)],
- Q1 = Q#amqqueue{sync_slave_pids = SSPids1},
+ DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]],
+ Q1 = Q#amqqueue{sync_slave_pids = SSPids1,
+ down_slave_nodes = DSNs1},
ok = rabbit_amqqueue:store_queue(Q1),
%% Wake it up so that we emit a stats event
rabbit_amqqueue:notify_policy_changed(Q1),
@@ -374,16 +387,21 @@ validate_policy(KeyList) ->
Mode = proplists:get_value(<<"ha-mode">>, KeyList, none),
Params = proplists:get_value(<<"ha-params">>, KeyList, none),
SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none),
- case {Mode, Params, SyncMode} of
- {none, none, none} ->
+ PromoteOnShutdown = proplists:get_value(
+ <<"ha-promote-on-shutdown">>, KeyList, none),
+ case {Mode, Params, SyncMode, PromoteOnShutdown} of
+ {none, none, none, none} ->
ok;
- {none, _, _} ->
- {error, "ha-mode must be specified to specify ha-params or "
- "ha-sync-mode", []};
+ {none, _, _, _} ->
+ {error, "ha-mode must be specified to specify ha-params, "
+ "ha-sync-mode or ha-promote-on-shutdown", []};
_ ->
case module(Mode) of
{ok, M} -> case M:validate_policy(Params) of
- ok -> validate_sync_mode(SyncMode);
+ ok -> case validate_sync_mode(SyncMode) of
+ ok -> validate_pos(PromoteOnShutdown);
+ E -> E
+ end;
E -> E
end;
_ -> {error, "~p is not a valid ha-mode value", [Mode]}
@@ -398,3 +416,12 @@ validate_sync_mode(SyncMode) ->
Mode -> {error, "ha-sync-mode must be \"manual\" "
"or \"automatic\", got ~p", [Mode]}
end.
+
+validate_pos(PromoteOnShutdown) ->
+ case PromoteOnShutdown of
+ <<"always">> -> ok;
+ <<"when-synced">> -> ok;
+ none -> ok;
+ Mode -> {error, "ha-promote-on-shutdown must be "
+ "\"always\" or \"when-synced\", got ~p", [Mode]}
+ end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 11d6a79c..cc06ae44 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -653,8 +653,9 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
timed -> {ensure_sync_timer(State1), 0 }
end.
-backing_queue_timeout(State = #state { backing_queue = BQ }) ->
- run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State).
+backing_queue_timeout(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State#state{backing_queue_state = BQ:timeout(BQS)}.
ensure_sync_timer(State) ->
rabbit_misc:ensure_timer(State, #state.sync_timer_ref,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 006bbadf..09355f3f 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -45,7 +45,7 @@
-export([with_local_io/1, local_info_msg/2]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
--export([pid_to_string/1, string_to_pid/1]).
+-export([pid_to_string/1, string_to_pid/1, node_to_fake_pid/1]).
-export([version_compare/2, version_compare/3]).
-export([version_minor_equivalent/2]).
-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
@@ -67,10 +67,11 @@
-export([check_expiry/1]).
-export([base64url/1]).
-export([interval_operation/4]).
--export([ensure_timer/4, stop_timer/2]).
+-export([ensure_timer/4, stop_timer/2, send_after/3, cancel_timer/1]).
-export([get_parent/0]).
-export([store_proc_name/1, store_proc_name/2]).
-export([moving_average/4]).
+-export([now_to_ms/1]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -81,7 +82,7 @@
-ifdef(use_specs).
--export_type([resource_name/0, thunk/1]).
+-export_type([resource_name/0, thunk/1, channel_or_connection_exit/0]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
@@ -94,6 +95,7 @@
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])).
-type(graph_edge_fun() ::
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])).
+-type(tref() :: {'erlang', reference()} | {timer, timer:tref()}).
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
@@ -192,6 +194,7 @@
(rabbit_framing:amqp_table()) -> rabbit_framing:amqp_table()).
-spec(pid_to_string/1 :: (pid()) -> string()).
-spec(string_to_pid/1 :: (string()) -> pid()).
+-spec(node_to_fake_pid/1 :: (atom()) -> pid()).
-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt').
-spec(version_compare/3 ::
(string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt'))
@@ -209,7 +212,8 @@
[string()])
-> {'ok', {atom(), [{string(), string()}], [string()]}} |
'no_command').
--spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
+-spec(all_module_attributes/1 ::
+ (atom()) -> [{atom(), atom(), [term()]}]).
-spec(build_acyclic_graph/3 ::
(graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}])
-> rabbit_types:ok_or_error2(digraph(),
@@ -245,11 +249,16 @@
-> {any(), non_neg_integer()}).
-spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A).
-spec(stop_timer/2 :: (A, non_neg_integer()) -> A).
+-spec(send_after/3 :: (non_neg_integer(), pid(), any()) -> tref()).
+-spec(cancel_timer/1 :: (tref()) -> 'ok').
-spec(get_parent/0 :: () -> pid()).
-spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok).
-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok).
-spec(moving_average/4 :: (float(), float(), float(), float() | 'undefined')
-> float()).
+-spec(now_to_ms/1 :: ({non_neg_integer(),
+ non_neg_integer(),
+ non_neg_integer()}) -> pos_integer()).
-endif.
%%----------------------------------------------------------------------------
@@ -705,6 +714,10 @@ string_to_pid(Str) ->
throw(Err)
end.
+%% node(node_to_fake_pid(Node)) =:= Node.
+node_to_fake_pid(Node) ->
+ string_to_pid(format("<~s.0.0.0>", [Node])).
+
version_compare(A, B, lte) ->
case version_compare(A, B) of
eq -> true;
@@ -849,20 +862,20 @@ module_attributes(Module) ->
end.
all_module_attributes(Name) ->
- Modules =
+ Targets =
lists:usort(
lists:append(
- [Modules || {App, _, _} <- application:loaded_applications(),
- {ok, Modules} <- [application:get_key(App, modules)]])),
+ [[{App, Module} || Module <- Modules] ||
+ {App, _, _} <- application:loaded_applications(),
+ {ok, Modules} <- [application:get_key(App, modules)]])),
lists:foldl(
- fun (Module, Acc) ->
+ fun ({App, Module}, Acc) ->
case lists:append([Atts || {N, Atts} <- module_attributes(Module),
N =:= Name]) of
[] -> Acc;
- Atts -> [{Module, Atts} | Acc]
+ Atts -> [{App, Module, Atts} | Acc]
end
- end, [], Modules).
-
+ end, [], Targets).
build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
G = digraph:new([acyclic]),
@@ -870,13 +883,13 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
[case digraph:vertex(G, Vertex) of
false -> digraph:add_vertex(G, Vertex, Label);
_ -> ok = throw({graph_error, {vertex, duplicate, Vertex}})
- end || {Module, Atts} <- Graph,
- {Vertex, Label} <- VertexFun(Module, Atts)],
+ end || GraphElem <- Graph,
+ {Vertex, Label} <- VertexFun(GraphElem)],
[case digraph:add_edge(G, From, To) of
{error, E} -> throw({graph_error, {edge, E, From, To}});
_ -> ok
- end || {Module, Atts} <- Graph,
- {From, To} <- EdgeFun(Module, Atts)],
+ end || GraphElem <- Graph,
+ {From, To} <- EdgeFun(GraphElem)],
{ok, G}
catch {graph_error, Reason} ->
true = digraph:delete(G),
@@ -1017,7 +1030,9 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
V =:= true orelse V =:= false ->
V.
-check_expiry(N) when N > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, N}};
+now_to_ms({Mega, Sec, Micro}) ->
+ (Mega * 1000000 * 1000000 + Sec * 1000000 + Micro) div 1000.
+
check_expiry(N) when N < 0 -> {error, {value_negative, N}};
check_expiry(_N) -> ok.
@@ -1045,7 +1060,7 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) ->
ensure_timer(State, Idx, After, Msg) ->
case element(Idx, State) of
- undefined -> TRef = erlang:send_after(After, self(), Msg),
+ undefined -> TRef = send_after(After, self(), Msg),
setelement(Idx, State, TRef);
_ -> State
end.
@@ -1053,12 +1068,25 @@ ensure_timer(State, Idx, After, Msg) ->
stop_timer(State, Idx) ->
case element(Idx, State) of
undefined -> State;
- TRef -> case erlang:cancel_timer(TRef) of
- false -> State;
- _ -> setelement(Idx, State, undefined)
- end
+ TRef -> cancel_timer(TRef),
+ setelement(Idx, State, undefined)
end.
+%% timer:send_after/3 goes through a single timer process but allows
+%% long delays. erlang:send_after/3 does not have a bottleneck but
+%% only allows max 2^32-1 millis.
+-define(MAX_ERLANG_SEND_AFTER, 4294967295).
+send_after(Millis, Pid, Msg) when Millis > ?MAX_ERLANG_SEND_AFTER ->
+ {ok, Ref} = timer:send_after(Millis, Pid, Msg),
+ {timer, Ref};
+send_after(Millis, Pid, Msg) ->
+ {erlang, erlang:send_after(Millis, Pid, Msg)}.
+
+cancel_timer({erlang, Ref}) -> erlang:cancel_timer(Ref),
+ ok;
+cancel_timer({timer, Ref}) -> {ok, cancel} = timer:cancel(Ref),
+ ok.
+
store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
store_proc_name(TypeProcName) -> put(process_name, TypeProcName).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index c6c2c8eb..630d9853 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -23,6 +23,7 @@
update_cluster_nodes/1,
change_cluster_node_type/1,
forget_cluster_node/2,
+ force_load_next_boot/0,
status/0,
is_clustered/0,
@@ -63,6 +64,7 @@
-spec(update_cluster_nodes/1 :: (node()) -> 'ok').
-spec(change_cluster_node_type/1 :: (node_type()) -> 'ok').
-spec(forget_cluster_node/2 :: (node(), boolean()) -> 'ok').
+-spec(force_load_next_boot/0 :: () -> 'ok').
%% Various queries to get the status of the db
-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
@@ -303,7 +305,6 @@ remove_node_offline_node(Node) ->
e(removing_node_from_offline_node)
end.
-
%%----------------------------------------------------------------------------
%% Queries
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 9082dbd3..96448f32 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -37,8 +37,6 @@
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
--define(SSL_TIMEOUT, 5). %% seconds
-
-define(FIRST_TEST_BIND_PORT, 10000).
%%----------------------------------------------------------------------------
@@ -168,9 +166,14 @@ ensure_ssl() ->
end
end.
+ssl_timeout() ->
+ {ok, Val} = application:get_env(rabbit, ssl_handshake_timeout),
+ Val.
+
ssl_transform_fun(SslOpts) ->
fun (Sock) ->
- case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of
+ Timeout = ssl_timeout(),
+ case catch ssl:ssl_accept(Sock, SslOpts, Timeout) of
{ok, SslSock} ->
{ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
{error, timeout} ->
@@ -185,7 +188,7 @@ ssl_transform_fun(SslOpts) ->
%% form, according to the TLS spec). So we give
%% the ssl_connection a little bit of time to send
%% such alerts.
- timer:sleep(?SSL_TIMEOUT * 1000),
+ timer:sleep(Timeout),
{error, {ssl_upgrade_error, Reason}};
{'EXIT', Reason} ->
{error, {ssl_upgrade_failure, Reason}}
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index ca843e14..7eaeaf50 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -458,6 +458,7 @@ ensure_ping_timer(State) ->
State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL, ping_nodes).
handle_live_rabbit(Node) ->
+ ok = rabbit_amqqueue:on_node_up(Node),
ok = rabbit_alarm:on_node_up(Node),
ok = rabbit_mnesia:on_node_up(Node).
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index c0fb05e2..9acaa1d4 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -18,6 +18,7 @@
-include("rabbit.hrl").
-export([setup/0, active/0, read_enabled/1, list/1, dependencies/3]).
+-export([ensure/1]).
%%----------------------------------------------------------------------------
@@ -31,22 +32,54 @@
-spec(read_enabled/1 :: (file:filename()) -> [plugin_name()]).
-spec(dependencies/3 :: (boolean(), [plugin_name()], [#plugin{}]) ->
[plugin_name()]).
-
+-spec(ensure/1 :: (string()) -> {'ok', [atom()], [atom()]} | {error, any()}).
-endif.
%%----------------------------------------------------------------------------
+ensure(FileJustChanged) ->
+ {ok, OurFile} = application:get_env(rabbit, enabled_plugins_file),
+ case OurFile of
+ FileJustChanged ->
+ {ok, Dir} = application:get_env(rabbit, plugins_dir),
+ Enabled = read_enabled(OurFile),
+ Wanted = dependencies(false, Enabled, list(Dir)),
+ prepare_plugins(Enabled),
+ Current = active(),
+ Start = Wanted -- Current,
+ Stop = Current -- Wanted,
+ rabbit:start_apps(Start),
+ %% We need sync_notify here since mgmt will attempt to look at all
+ %% the modules for the disabled plugins - if they are unloaded
+ %% that won't work.
+ ok = rabbit_event:sync_notify(plugins_changed, [{enabled, Start},
+ {disabled, Stop}]),
+ rabbit:stop_apps(Stop),
+ clean_plugins(Stop),
+ {ok, Start, Stop};
+ _ ->
+ {error, {enabled_plugins_mismatch, FileJustChanged, OurFile}}
+ end.
+
%% @doc Prepares the file system and installs all enabled plugins.
setup() ->
- {ok, PluginDir} = application:get_env(rabbit, plugins_dir),
{ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+
+ %% Eliminate the contents of the destination directory
+ case delete_recursively(ExpandDir) of
+ ok -> ok;
+ {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir,
+ [ExpandDir, E1]}})
+ end,
+
{ok, EnabledFile} = application:get_env(rabbit, enabled_plugins_file),
- prepare_plugins(EnabledFile, PluginDir, ExpandDir).
+ Enabled = read_enabled(EnabledFile),
+ prepare_plugins(Enabled).
%% @doc Lists the plugins which are currently running.
active() ->
{ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
- InstalledPlugins = [ P#plugin.name || P <- list(ExpandDir) ],
+ InstalledPlugins = plugin_names(list(ExpandDir)),
[App || {App, _, _} <- rabbit_misc:which_applications(),
lists:member(App, InstalledPlugins)].
@@ -67,7 +100,7 @@ list(PluginsDir) ->
_ -> error_logger:warning_msg(
"Problem reading some plugins: ~p~n", [Problems])
end,
- Plugins.
+ ensure_dependencies(Plugins).
%% @doc Read the list of enabled plugins from the supplied term file.
read_enabled(PluginsFile) ->
@@ -86,15 +119,10 @@ read_enabled(PluginsFile) ->
%% the resulting list, otherwise they're skipped.
dependencies(Reverse, Sources, AllPlugins) ->
{ok, G} = rabbit_misc:build_acyclic_graph(
- fun (App, _Deps) -> [{App, App}] end,
- fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end,
- lists:ukeysort(
- 1, [{Name, Deps} ||
- #plugin{name = Name,
- dependencies = Deps} <- AllPlugins] ++
- [{Dep, []} ||
- #plugin{dependencies = Deps} <- AllPlugins,
- Dep <- Deps])),
+ fun ({App, _Deps}) -> [{App, App}] end,
+ fun ({App, Deps}) -> [{App, Dep} || Dep <- Deps] end,
+ [{Name, Deps} || #plugin{name = Name,
+ dependencies = Deps} <- AllPlugins]),
Dests = case Reverse of
false -> digraph_utils:reachable(Sources, G);
true -> digraph_utils:reaching(Sources, G)
@@ -102,27 +130,44 @@ dependencies(Reverse, Sources, AllPlugins) ->
true = digraph:delete(G),
Dests.
+%% Make sure we don't list OTP apps in here, and also that we detect
+%% missing dependencies.
+ensure_dependencies(Plugins) ->
+ Names = plugin_names(Plugins),
+ NotThere = [Dep || #plugin{dependencies = Deps} <- Plugins,
+ Dep <- Deps,
+ not lists:member(Dep, Names)],
+ {OTP, Missing} = lists:partition(fun is_loadable/1, lists:usort(NotThere)),
+ case Missing of
+ [] -> ok;
+ _ -> Blame = [Name || #plugin{name = Name,
+ dependencies = Deps} <- Plugins,
+ lists:any(fun (Dep) ->
+ lists:member(Dep, Missing)
+ end, Deps)],
+ throw({error, {missing_dependencies, Missing, Blame}})
+ end,
+ [P#plugin{dependencies = Deps -- OTP}
+ || P = #plugin{dependencies = Deps} <- Plugins].
+
+is_loadable(App) ->
+ case application:load(App) of
+ {error, {already_loaded, _}} -> true;
+ ok -> application:unload(App),
+ true;
+ _ -> false
+ end.
+
%%----------------------------------------------------------------------------
-prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) ->
+prepare_plugins(Enabled) ->
+ {ok, PluginsDistDir} = application:get_env(rabbit, plugins_dir),
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+
AllPlugins = list(PluginsDistDir),
- Enabled = read_enabled(EnabledFile),
ToUnpack = dependencies(false, Enabled, AllPlugins),
ToUnpackPlugins = lookup_plugins(ToUnpack, AllPlugins),
- case Enabled -- plugin_names(ToUnpackPlugins) of
- [] -> ok;
- Missing -> error_logger:warning_msg(
- "The following enabled plugins were not found: ~p~n",
- [Missing])
- end,
-
- %% Eliminate the contents of the destination directory
- case delete_recursively(ExpandDir) of
- ok -> ok;
- {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir,
- [ExpandDir, E1]}})
- end,
case filelib:ensure_dir(ExpandDir ++ "/") of
ok -> ok;
{error, E2} -> throw({error, {cannot_create_plugins_expand_dir,
@@ -134,6 +179,20 @@ prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) ->
[prepare_dir_plugin(PluginAppDescPath) ||
PluginAppDescPath <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")].
+clean_plugins(Plugins) ->
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+ [clean_plugin(Plugin, ExpandDir) || Plugin <- Plugins].
+
+clean_plugin(Plugin, ExpandDir) ->
+ {ok, Mods} = application:get_key(Plugin, modules),
+ application:unload(Plugin),
+ [begin
+ code:soft_purge(Mod),
+ code:delete(Mod),
+ false = code:is_loaded(Mod)
+ end || Mod <- Mods],
+ delete_recursively(rabbit_misc:format("~s/~s", [ExpandDir, Plugin])).
+
prepare_dir_plugin(PluginAppDescPath) ->
code:add_path(filename:dirname(PluginAppDescPath)),
list_to_atom(filename:basename(PluginAppDescPath, ".app")).
@@ -172,8 +231,7 @@ plugin_info(Base, {app, App0}) ->
mkplugin(Name, Props, Type, Location) ->
Version = proplists:get_value(vsn, Props, "0"),
Description = proplists:get_value(description, Props, ""),
- Dependencies =
- filter_applications(proplists:get_value(applications, Props, [])),
+ Dependencies = proplists:get_value(applications, Props, []),
#plugin{name = Name, version = Version, description = Description,
dependencies = Dependencies, location = Location, type = Type}.
@@ -206,18 +264,6 @@ parse_binary(Bin) ->
Err -> {error, {invalid_app, Err}}
end.
-filter_applications(Applications) ->
- [Application || Application <- Applications,
- not is_available_app(Application)].
-
-is_available_app(Application) ->
- case application:load(Application) of
- {error, {already_loaded, _}} -> true;
- ok -> application:unload(Application),
- true;
- _ -> false
- end.
-
plugin_names(Plugins) ->
[Name || #plugin{name = Name} <- Plugins].
diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl
index 89e16f14..278fcf98 100644
--- a/src/rabbit_plugins_main.erl
+++ b/src/rabbit_plugins_main.erl
@@ -18,23 +18,34 @@
-include("rabbit.hrl").
-export([start/0, stop/0]).
+-export([action/6]).
+-define(NODE_OPT, "-n").
-define(VERBOSE_OPT, "-v").
-define(MINIMAL_OPT, "-m").
-define(ENABLED_OPT, "-E").
-define(ENABLED_ALL_OPT, "-e").
+-define(OFFLINE_OPT, "--offline").
+-define(ONLINE_OPT, "--online").
+-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
-define(VERBOSE_DEF, {?VERBOSE_OPT, flag}).
-define(MINIMAL_DEF, {?MINIMAL_OPT, flag}).
-define(ENABLED_DEF, {?ENABLED_OPT, flag}).
-define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}).
+-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}).
+-define(ONLINE_DEF, {?ONLINE_OPT, flag}).
--define(GLOBAL_DEFS, []).
+-define(RPC_TIMEOUT, infinity).
+
+-define(GLOBAL_DEFS(Node), [?NODE_DEF(Node)]).
-define(COMMANDS,
[{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]},
- enable,
- disable]).
+ {enable, [?OFFLINE_DEF, ?ONLINE_DEF]},
+ {disable, [?OFFLINE_DEF, ?ONLINE_DEF]},
+ {set, [?OFFLINE_DEF, ?ONLINE_DEF]},
+ {sync, []}]).
%%----------------------------------------------------------------------------
@@ -51,11 +62,10 @@
start() ->
{ok, [[PluginsFile|_]|_]} =
init:get_argument(enabled_plugins_file),
+ {ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
{ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir),
{Command, Opts, Args} =
- case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS,
- init:get_plain_arguments())
- of
+ case parse_arguments(init:get_plain_arguments(), NodeStr) of
{ok, Res} -> Res;
no_command -> print_error("could not recognise command", []),
usage()
@@ -67,7 +77,8 @@ start() ->
[string:join([atom_to_list(Command) | Args], " ")])
end,
- case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of
+ Node = proplists:get_value(?NODE_OPT, Opts),
+ case catch action(Command, Node, Args, Opts, PluginsFile, PluginsDir) of
ok ->
rabbit_misc:quit(0);
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
@@ -76,12 +87,23 @@ start() ->
{'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} ->
PrintInvalidCommandError(),
usage();
+ {error, {missing_dependencies, Missing, Blame}} ->
+ print_error("dependent plugins ~p not found; used by ~p.",
+ [Missing, Blame]),
+ rabbit_misc:quit(2);
{error, Reason} ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
{error_string, Reason} ->
print_error("~s", [Reason]),
rabbit_misc:quit(2);
+ {badrpc, {'EXIT', Reason}} ->
+ print_error("~p", [Reason]),
+ rabbit_misc:quit(2);
+ {badrpc, Reason} ->
+ print_error("unable to connect to node ~w: ~w", [Node, Reason]),
+ print_badrpc_diagnostics([Node]),
+ rabbit_misc:quit(2);
Other ->
print_error("~p", [Other]),
rabbit_misc:quit(2)
@@ -92,50 +114,80 @@ stop() ->
%%----------------------------------------------------------------------------
-action(list, [], Opts, PluginsFile, PluginsDir) ->
- action(list, [".*"], Opts, PluginsFile, PluginsDir);
-action(list, [Pat], Opts, PluginsFile, PluginsDir) ->
- format_plugins(Pat, Opts, PluginsFile, PluginsDir);
+parse_arguments(CmdLine, NodeStr) ->
+ case rabbit_misc:parse_arguments(
+ ?COMMANDS, ?GLOBAL_DEFS(NodeStr), CmdLine) of
+ {ok, {Cmd, Opts0, Args}} ->
+ Opts = [case K of
+ ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
+ _ -> {K, V}
+ end || {K, V} <- Opts0],
+ {ok, {Cmd, Opts, Args}};
+ E ->
+ E
+ end.
+
+action(list, Node, [], Opts, PluginsFile, PluginsDir) ->
+ action(list, Node, [".*"], Opts, PluginsFile, PluginsDir);
+action(list, Node, [Pat], Opts, PluginsFile, PluginsDir) ->
+ format_plugins(Node, Pat, Opts, PluginsFile, PluginsDir);
-action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
+action(enable, Node, ToEnable0, Opts, PluginsFile, PluginsDir) ->
case ToEnable0 of
[] -> throw({error_string, "Not enough arguments for 'enable'"});
_ -> ok
end,
AllPlugins = rabbit_plugins:list(PluginsDir),
Enabled = rabbit_plugins:read_enabled(PluginsFile),
- ImplicitlyEnabled = rabbit_plugins:dependencies(false,
- Enabled, AllPlugins),
+ ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins),
ToEnable = [list_to_atom(Name) || Name <- ToEnable0],
Missing = ToEnable -- plugin_names(AllPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> throw({error_string, fmt_missing(Missing)})
+ end,
NewEnabled = lists:usort(Enabled ++ ToEnable),
NewImplicitlyEnabled = rabbit_plugins:dependencies(false,
NewEnabled, AllPlugins),
- MissingDeps = (NewImplicitlyEnabled -- plugin_names(AllPlugins)) -- Missing,
- case {Missing, MissingDeps} of
- {[], []} -> ok;
- {Miss, []} -> throw({error_string, fmt_missing("plugins", Miss)});
- {[], Miss} -> throw({error_string, fmt_missing("dependencies", Miss)});
- {_, _} -> throw({error_string,
- fmt_missing("plugins", Missing) ++
- fmt_missing("dependencies", MissingDeps)})
- end,
write_enabled_plugins(PluginsFile, NewEnabled),
case NewEnabled -- ImplicitlyEnabled of
[] -> io:format("Plugin configuration unchanged.~n");
_ -> print_list("The following plugins have been enabled:",
- NewImplicitlyEnabled -- ImplicitlyEnabled),
- report_change()
- end;
+ NewImplicitlyEnabled -- ImplicitlyEnabled)
+ end,
+ action_change(
+ Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile);
+
+action(set, Node, ToSet0, Opts, PluginsFile, PluginsDir) ->
+ ToSet = [list_to_atom(Name) || Name <- ToSet0],
+ AllPlugins = rabbit_plugins:list(PluginsDir),
+ Enabled = rabbit_plugins:read_enabled(PluginsFile),
+ ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins),
+ Missing = ToSet -- plugin_names(AllPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> throw({error_string, fmt_missing(Missing)})
+ end,
+ NewImplicitlyEnabled = rabbit_plugins:dependencies(false,
+ ToSet, AllPlugins),
+ write_enabled_plugins(PluginsFile, ToSet),
+ case NewImplicitlyEnabled of
+ [] -> io:format("All plugins are now disabled.~n");
+ _ -> print_list("The following plugins are now enabled:",
+ NewImplicitlyEnabled)
+ end,
+ action_change(
+ Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile);
-action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
+action(disable, Node, ToDisable0, Opts, PluginsFile, PluginsDir) ->
case ToDisable0 of
[] -> throw({error_string, "Not enough arguments for 'disable'"});
_ -> ok
end,
- ToDisable = [list_to_atom(Name) || Name <- ToDisable0],
- Enabled = rabbit_plugins:read_enabled(PluginsFile),
AllPlugins = rabbit_plugins:list(PluginsDir),
+ Enabled = rabbit_plugins:read_enabled(PluginsFile),
+ ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins),
+ ToDisable = [list_to_atom(Name) || Name <- ToDisable0],
Missing = ToDisable -- plugin_names(AllPlugins),
case Missing of
[] -> ok;
@@ -144,30 +196,35 @@ action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
end,
ToDisableDeps = rabbit_plugins:dependencies(true, ToDisable, AllPlugins),
NewEnabled = Enabled -- ToDisableDeps,
+ NewImplicitlyEnabled = rabbit_plugins:dependencies(false,
+ NewEnabled, AllPlugins),
case length(Enabled) =:= length(NewEnabled) of
true -> io:format("Plugin configuration unchanged.~n");
- false -> ImplicitlyEnabled =
- rabbit_plugins:dependencies(false, Enabled, AllPlugins),
- NewImplicitlyEnabled =
- rabbit_plugins:dependencies(false,
- NewEnabled, AllPlugins),
- print_list("The following plugins have been disabled:",
+ false -> print_list("The following plugins have been disabled:",
ImplicitlyEnabled -- NewImplicitlyEnabled),
- write_enabled_plugins(PluginsFile, NewEnabled),
- report_change()
- end.
+ write_enabled_plugins(PluginsFile, NewEnabled)
+ end,
+ action_change(
+ Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile);
+
+action(sync, Node, [], _Opts, PluginsFile, _PluginsDir) ->
+ sync(Node, true, PluginsFile).
%%----------------------------------------------------------------------------
-print_error(Format, Args) ->
- rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
+fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args).
+
+print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
+
+print_badrpc_diagnostics(Nodes) ->
+ fmt_stderr(rabbit_nodes:diagnostics(Nodes), []).
usage() ->
io:format("~s", [rabbit_plugins_usage:usage()]),
rabbit_misc:quit(1).
%% Pretty print a list of plugins.
-format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
+format_plugins(Node, Pattern, Opts, PluginsFile, PluginsDir) ->
Verbose = proplists:get_bool(?VERBOSE_OPT, Opts),
Minimal = proplists:get_bool(?MINIMAL_OPT, Opts),
Format = case {Verbose, Minimal} of
@@ -182,41 +239,52 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
AvailablePlugins = rabbit_plugins:list(PluginsDir),
EnabledExplicitly = rabbit_plugins:read_enabled(PluginsFile),
- EnabledImplicitly =
- rabbit_plugins:dependencies(false, EnabledExplicitly,
- AvailablePlugins) -- EnabledExplicitly,
- Missing = [#plugin{name = Name, dependencies = []} ||
- Name <- ((EnabledExplicitly ++ EnabledImplicitly) --
- plugin_names(AvailablePlugins))],
+ AllEnabled = rabbit_plugins:dependencies(false, EnabledExplicitly,
+ AvailablePlugins),
+ EnabledImplicitly = AllEnabled -- EnabledExplicitly,
+ {StatusMsg, Running} =
+ case rpc:call(Node, rabbit_plugins, active, [], ?RPC_TIMEOUT) of
+ {badrpc, _} -> {"[failed to contact ~s - status not shown]", []};
+ Active -> {"* = running on ~s", Active}
+ end,
{ok, RE} = re:compile(Pattern),
Plugins = [ Plugin ||
- Plugin = #plugin{name = Name} <- AvailablePlugins ++ Missing,
+ Plugin = #plugin{name = Name} <- AvailablePlugins,
re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match,
- if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
- OnlyEnabledAll -> (lists:member(Name,
- EnabledExplicitly) or
- lists:member(Name, EnabledImplicitly));
+ if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
+ OnlyEnabledAll -> lists:member(Name, EnabledExplicitly) or
+ lists:member(Name,EnabledImplicitly);
true -> true
end],
Plugins1 = usort_plugins(Plugins),
MaxWidth = lists:max([length(atom_to_list(Name)) ||
#plugin{name = Name} <- Plugins1] ++ [0]),
- [format_plugin(P, EnabledExplicitly, EnabledImplicitly,
- plugin_names(Missing), Format, MaxWidth) || P <- Plugins1],
+ case Format of
+ minimal -> ok;
+ _ -> io:format(" Configured: E = explicitly enabled; "
+ "e = implicitly enabled~n"
+ " | Status: ~s~n"
+ " |/~n", [rabbit_misc:format(StatusMsg, [Node])])
+ end,
+ [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Running,
+ Format, MaxWidth) || P <- Plugins1],
ok.
format_plugin(#plugin{name = Name, version = Version,
description = Description, dependencies = Deps},
- EnabledExplicitly, EnabledImplicitly, Missing,
- Format, MaxWidth) ->
- Glyph = case {lists:member(Name, EnabledExplicitly),
- lists:member(Name, EnabledImplicitly),
- lists:member(Name, Missing)} of
- {true, false, false} -> "[E]";
- {false, true, false} -> "[e]";
- {_, _, true} -> "[!]";
- _ -> "[ ]"
- end,
+ EnabledExplicitly, EnabledImplicitly, Running, Format,
+ MaxWidth) ->
+ EnabledGlyph = case {lists:member(Name, EnabledExplicitly),
+ lists:member(Name, EnabledImplicitly)} of
+ {true, false} -> "E";
+ {false, true} -> "e";
+ _ -> " "
+ end,
+ RunningGlyph = case lists:member(Name, Running) of
+ true -> "*";
+ false -> " "
+ end,
+ Glyph = rabbit_misc:format("[~s~s]", [EnabledGlyph, RunningGlyph]),
Opt = fun (_F, A, A) -> ok;
( F, A, _) -> io:format(F, [A])
end,
@@ -227,9 +295,9 @@ format_plugin(#plugin{name = Name, version = Version,
Opt("~s", Version, undefined),
io:format("~n");
verbose -> io:format("~s ~w~n", [Glyph, Name]),
- Opt(" Version: \t~s~n", Version, undefined),
- Opt(" Dependencies:\t~p~n", Deps, []),
- Opt(" Description: \t~s~n", Description, undefined),
+ Opt(" Version: \t~s~n", Version, undefined),
+ Opt(" Dependencies:\t~p~n", Deps, []),
+ Opt(" Description: \t~s~n", Description, undefined),
io:format("~n")
end.
@@ -240,8 +308,8 @@ fmt_list(Header, Plugins) ->
lists:flatten(
[Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]).
-fmt_missing(Desc, Missing) ->
- fmt_list("The following " ++ Desc ++ " could not be found:", Missing).
+fmt_missing(Missing) ->
+ fmt_list("The following plugins could not be found:", Missing).
usort_plugins(Plugins) ->
lists:usort(fun plugins_cmp/2, Plugins).
@@ -262,6 +330,51 @@ write_enabled_plugins(PluginsFile, Plugins) ->
PluginsFile, Reason}})
end.
-report_change() ->
- io:format("Plugin configuration has changed. "
- "Restart RabbitMQ for changes to take effect.~n").
+action_change(Opts, Node, Old, New, PluginsFile) ->
+ action_change0(proplists:get_bool(?OFFLINE_OPT, Opts),
+ proplists:get_bool(?ONLINE_OPT, Opts),
+ Node, Old, New, PluginsFile).
+
+action_change0(true, _Online, _Node, Same, Same, _PluginsFile) ->
+ %% Definitely nothing to do
+ ok;
+action_change0(true, _Online, _Node, _Old, _New, _PluginsFile) ->
+ io:format("Offline change; changes will take effect at broker restart.~n");
+action_change0(false, Online, Node, _Old, _New, PluginsFile) ->
+ sync(Node, Online, PluginsFile).
+
+sync(Node, ForceOnline, PluginsFile) ->
+ rpc_call(Node, ForceOnline, rabbit_plugins, ensure, [PluginsFile]).
+
+rpc_call(Node, Online, Mod, Fun, Args) ->
+ io:format("~nApplying plugin configuration to ~s...", [Node]),
+ case rpc:call(Node, Mod, Fun, Args) of
+ {ok, [], []} ->
+ io:format(" nothing to do.~n", []);
+ {ok, Start, []} ->
+ io:format(" started ~b plugin~s.~n", [length(Start), plur(Start)]);
+ {ok, [], Stop} ->
+ io:format(" stopped ~b plugin~s.~n", [length(Stop), plur(Stop)]);
+ {ok, Start, Stop} ->
+ io:format(" stopped ~b plugin~s and started ~b plugin~s.~n",
+ [length(Stop), plur(Stop), length(Start), plur(Start)]);
+ {badrpc, nodedown} = Error ->
+ io:format(" failed.~n", []),
+ case Online of
+ true -> Error;
+ false -> io:format(
+ " * Could not contact node ~s.~n"
+ " Changes will take effect at broker restart.~n"
+ " * Options: --online - fail if broker cannot be "
+ "contacted.~n"
+ " --offline - do not try to contact "
+ "broker.~n",
+ [Node])
+ end;
+ Error ->
+ io:format(" failed.~n", []),
+ Error
+ end.
+
+plur([_]) -> "";
+plur(_) -> "s".
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index fe2b766f..3558cf98 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -61,13 +61,13 @@ validate_policy0(<<"dead-letter-routing-key">>, Value) ->
{error, "~p is not a valid dead letter routing key", [Value]};
validate_policy0(<<"message-ttl">>, Value)
- when is_integer(Value), Value >= 0, Value =< ?MAX_EXPIRY_TIMER ->
+ when is_integer(Value), Value >= 0 ->
ok;
validate_policy0(<<"message-ttl">>, Value) ->
{error, "~p is not a valid message TTL", [Value]};
validate_policy0(<<"expires">>, Value)
- when is_integer(Value), Value >= 1, Value =< ?MAX_EXPIRY_TIMER ->
+ when is_integer(Value), Value >= 1 ->
ok;
validate_policy0(<<"expires">>, Value) ->
{error, "~p is not a valid queue expiry", [Value]};
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 0a69fb32..f5d03360 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -46,17 +46,11 @@ name(#exchange{policy = Policy}) -> name0(Policy).
name0(undefined) -> none;
name0(Policy) -> pget(name, Policy).
-set(Q = #amqqueue{name = Name}) -> rabbit_queue_decorator:set(
- Q#amqqueue{policy = set0(Name)});
-set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set(
- X#exchange{policy = set0(Name)}).
+set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
+set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)).
-set(Q = #amqqueue{name = Name}, Ps) -> Q#amqqueue{policy = match(Name, Ps)};
-set(X = #exchange{name = Name}, Ps) -> rabbit_exchange_decorator:set(
- X#exchange{policy = match(Name, Ps)}).
-
get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy);
get(Name, #exchange{policy = Policy}) -> get0(Name, Policy);
%% Caution - SLOW.
@@ -104,12 +98,18 @@ recover0() ->
Policies = list(),
[rabbit_misc:execute_mnesia_transaction(
fun () ->
- mnesia:write(rabbit_durable_exchange, set(X, Policies), write)
- end) || X <- Xs],
+ mnesia:write(
+ rabbit_durable_exchange,
+ rabbit_exchange_decorator:set(
+ X#exchange{policy = match(Name, Policies)}), write)
+ end) || X = #exchange{name = Name} <- Xs],
[rabbit_misc:execute_mnesia_transaction(
fun () ->
- mnesia:write(rabbit_durable_queue, set(Q, Policies), write)
- end) || Q <- Qs],
+ mnesia:write(
+ rabbit_durable_queue,
+ rabbit_queue_decorator:set(
+ Q#amqqueue{policy = match(Name, Policies)}), write)
+ end) || Q = #amqqueue{name = Name} <- Qs],
ok.
invalid_file() ->
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
index 6205e2dc..adfe0c7f 100644
--- a/src/rabbit_queue_decorator.erl
+++ b/src/rabbit_queue_decorator.erl
@@ -2,7 +2,7 @@
-include("rabbit.hrl").
--export([select/1, set/1]).
+-export([select/1, set/1, register/2, unregister/1]).
%%----------------------------------------------------------------------------
@@ -41,3 +41,24 @@ select(Modules) ->
set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}.
list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)].
+
+register(TypeName, ModuleName) ->
+ rabbit_registry:register(queue_decorator, TypeName, ModuleName),
+ [maybe_recover(Q) || Q <- rabbit_amqqueue:list()],
+ ok.
+
+unregister(TypeName) ->
+ rabbit_registry:unregister(queue_decorator, TypeName),
+ [maybe_recover(Q) || Q <- rabbit_amqqueue:list()],
+ ok.
+
+maybe_recover(Q = #amqqueue{name = Name,
+ decorators = Decs}) ->
+ #amqqueue{decorators = Decs1} = set(Q),
+ Old = lists:sort(select(Decs)),
+ New = lists:sort(select(Decs1)),
+ case New of
+ Old -> ok;
+ _ -> [M:startup(Q) || M <- New -- Old],
+ rabbit_amqqueue:update_decorators(Name)
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e6e94e28..2ac24f97 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -27,7 +27,6 @@
-export([conserve_resources/3, server_properties/1]).
--define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 30).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
@@ -43,7 +42,7 @@
-record(connection, {name, host, peer_host, port, peer_port,
protocol, user, timeout_sec, frame_max, channel_max, vhost,
client_properties, capabilities,
- auth_mechanism, auth_state}).
+ auth_mechanism, auth_state, connected_at}).
-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at}).
@@ -55,7 +54,7 @@
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
peer_cert_validity, auth_mechanism, ssl_protocol,
ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
- timeout, frame_max, channel_max, client_properties]).
+ timeout, frame_max, channel_max, client_properties, connected_at]).
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
@@ -189,10 +188,10 @@ server_capabilities(_) ->
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
socket_error(Reason) when is_atom(Reason) ->
- log(error, "error on AMQP connection ~p: ~s~n",
+ log(error, "Error on AMQP connection ~p: ~s~n",
[self(), rabbit_misc:format_inet_error(Reason)]);
socket_error(Reason) ->
- log(error, "error on AMQP connection ~p:~n~p~n", [self(), Reason]).
+ log(error, "Error on AMQP connection ~p:~n~p~n", [self(), Reason]).
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
@@ -216,8 +215,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
exit(normal)
end,
log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]),
+ {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout),
ClientSock = socket_op(Sock, SockTransform),
- erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
+ erlang:send_after(HandshakeTimeout, self(), handshake_timeout),
{PeerHost, PeerPort, Host, Port} =
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
?store_proc_name(list_to_binary(Name)),
@@ -231,13 +231,14 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
peer_port = PeerPort,
protocol = none,
user = none,
- timeout_sec = ?HANDSHAKE_TIMEOUT,
+ timeout_sec = (HandshakeTimeout / 1000),
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
client_properties = none,
capabilities = [],
auth_mechanism = none,
- auth_state = none},
+ auth_state = none,
+ connected_at = rabbit_misc:now_to_ms(os:timestamp())},
callback = uninitialized_callback,
recv_len = 0,
pending_recv = false,
@@ -410,7 +411,7 @@ handle_other({'$gen_cast', {force_event_refresh, Ref}}, State)
rabbit_event:notify(
connection_created,
[{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref),
- State;
+ rabbit_event:init_stats_timer(State, #v1.stats_timer);
handle_other({'$gen_cast', force_event_refresh}, State) ->
%% Ignore, we will emit a created event once we start running.
State;
@@ -548,21 +549,27 @@ wait_for_channel_termination(0, TimerRef, State) ->
end;
_ -> State
end;
-wait_for_channel_termination(N, TimerRef, State) ->
+wait_for_channel_termination(N, TimerRef,
+ State = #v1{connection_state = CS,
+ connection = #connection{
+ name = ConnName,
+ user = User,
+ vhost = VHost}}) ->
receive
{'DOWN', _MRef, process, ChPid, Reason} ->
{Channel, State1} = channel_cleanup(ChPid, State),
case {Channel, termination_kind(Reason)} of
- {undefined, _} -> exit({abnormal_dependent_exit,
- ChPid, Reason});
- {_, controlled} -> wait_for_channel_termination(
- N-1, TimerRef, State1);
- {_, uncontrolled} -> log(error,
- "AMQP connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
- [self(), Channel, Reason]),
- wait_for_channel_termination(
- N-1, TimerRef, State1)
+ {undefined, _} ->
+ exit({abnormal_dependent_exit, ChPid, Reason});
+ {_, controlled} ->
+ wait_for_channel_termination(N-1, TimerRef, State1);
+ {_, uncontrolled} ->
+ log(error, "Error on AMQP connection ~p (~s, vhost: '~s',"
+ " user: '~s', state: ~p), channel ~p:"
+ "error while terminating:~n~p~n",
+ [self(), ConnName, VHost, User#user.username,
+ CS, Channel, Reason]),
+ wait_for_channel_termination(N-1, TimerRef, State1)
end;
cancel_wait ->
exit(channel_termination_timeout)
@@ -581,16 +588,24 @@ maybe_close(State) ->
termination_kind(normal) -> controlled;
termination_kind(_) -> uncontrolled.
+log_hard_error(#v1{connection_state = CS,
+ connection = #connection{
+ name = ConnName,
+ user = User,
+ vhost = VHost}}, Channel, Reason) ->
+ log(error,
+ "Error on AMQP connection ~p (~s, vhost: '~s',"
+ " user: '~s', state: ~p), channel ~p:~n~p~n",
+ [self(), ConnName, VHost, User#user.username, CS, Channel, Reason]).
+
handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
- log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
- [self(), closed, Channel, Reason]),
+ log_hard_error(State, Channel, Reason),
State;
handle_exception(State = #v1{connection = #connection{protocol = Protocol},
connection_state = CS},
Channel, Reason)
when ?IS_RUNNING(State) orelse CS =:= closing ->
- log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
- [self(), CS, Channel, Reason]),
+ log_hard_error(State, Channel, Reason),
{0, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
State1 = close_connection(terminate_channels(State)),
@@ -1130,6 +1145,7 @@ ic(channel_max, #connection{channel_max = ChMax}) -> ChMax;
ic(client_properties, #connection{client_properties = CP}) -> CP;
ic(auth_mechanism, #connection{auth_mechanism = none}) -> none;
ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name;
+ic(connected_at, #connection{connected_at = T}) -> T;
ic(Item, #connection{}) -> throw({bad_argument, Item}).
socket_info(Get, Select, #v1{sock = Sock}) ->
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index da75932d..fe2c3b58 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -70,7 +70,13 @@ wait_for_replicated() ->
not lists:member({local_content, true}, TabDef)]).
wait(TableNames) ->
- case mnesia:wait_for_tables(TableNames, 30000) of
+ %% We might be in ctl here for offline ops, in which case we can't
+ %% get_env() for the rabbit app.
+ Timeout = case application:get_env(rabbit, mnesia_table_loading_timeout) of
+ {ok, T} -> T;
+ undefined -> 30000
+ end,
+ case mnesia:wait_for_tables(TableNames, Timeout) of
ok ->
ok;
{timeout, BadTabs} ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index da6938bd..34a8cc5c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2454,7 +2454,7 @@ with_fresh_variable_queue(Fun) ->
spawn_link(fun() ->
ok = empty_test_queue(),
VQ = variable_queue_init(test_amqqueue(true), false),
- S0 = rabbit_variable_queue:status(VQ),
+ S0 = variable_queue_status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta,
{delta, undefined, 0, undefined}},
@@ -2579,7 +2579,7 @@ variable_queue_with_holes(VQ0) ->
{delta, _, 0, _} -> true;
0 -> true;
_ -> false
- end || {K, V} <- rabbit_variable_queue:status(VQ8),
+ end || {K, V} <- variable_queue_status(VQ8),
lists:member(K, [q1, delta, q3])],
Depth = Count + Interval,
Depth = rabbit_variable_queue:depth(VQ8),
@@ -2649,17 +2649,20 @@ test_variable_queue_ack_limiting(VQ0) ->
%% fetch half the messages
{VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3),
- VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2},
- {ram_ack_count, Len div 2},
- {ram_msg_count, Len div 2}]),
+ VQ5 = check_variable_queue_status(
+ VQ4, [{len, Len div 2},
+ {messages_unacknowledged_ram, Len div 2},
+ {messages_ready_ram, Len div 2},
+ {messages_ram, Len}]),
%% ensure all acks go to disk on 0 duration target
VQ6 = check_variable_queue_status(
variable_queue_set_ram_duration_target(0, VQ5),
- [{len, Len div 2},
- {target_ram_count, 0},
- {ram_msg_count, 0},
- {ram_ack_count, 0}]),
+ [{len, Len div 2},
+ {target_ram_count, 0},
+ {messages_unacknowledged_ram, 0},
+ {messages_ready_ram, 0},
+ {messages_ram, 0}]),
VQ6.
@@ -2760,7 +2763,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
fun (Duration1, VQ4) ->
{_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4),
io:format("~p:~n~p~n",
- [Duration1, rabbit_variable_queue:status(VQ5)]),
+ [Duration1, variable_queue_status(VQ5)]),
VQ6 = variable_queue_set_ram_duration_target(
Duration1, VQ5),
publish_fetch_and_ack(Churn, Len, VQ6)
@@ -2820,11 +2823,16 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
check_variable_queue_status(VQ0, Props) ->
VQ1 = variable_queue_wait_for_shuffling_end(VQ0),
- S = rabbit_variable_queue:status(VQ1),
+ S = variable_queue_status(VQ1),
io:format("~p~n", [S]),
assert_props(S, Props),
VQ1.
+variable_queue_status(VQ) ->
+ Keys = rabbit_backing_queue:info_keys() -- [backing_queue_status],
+ [{K, rabbit_variable_queue:info(K, VQ)} || K <- Keys] ++
+ rabbit_variable_queue:info(backing_queue_status, VQ).
+
variable_queue_wait_for_shuffling_end(VQ) ->
case credit_flow:blocked() of
false -> VQ;
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index aafd81df..dbc2856d 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -16,7 +16,7 @@
-module(rabbit_trace).
--export([init/1, enabled/1, tap_in/2, tap_out/2, start/1, stop/1]).
+-export([init/1, enabled/1, tap_in/5, tap_out/5, start/1, stop/1]).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -32,8 +32,12 @@
-spec(init/1 :: (rabbit_types:vhost()) -> state()).
-spec(enabled/1 :: (rabbit_types:vhost()) -> boolean()).
--spec(tap_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok').
--spec(tap_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok').
+-spec(tap_in/5 :: (rabbit_types:basic_message(), binary(),
+ rabbit_channel:channel_number(),
+ rabbit_types:username(), state()) -> 'ok').
+-spec(tap_out/5 :: (rabbit_amqqueue:qmsg(), binary(),
+ rabbit_channel:channel_number(),
+ rabbit_types:username(), state()) -> 'ok').
-spec(start/1 :: (rabbit_types:vhost()) -> 'ok').
-spec(stop/1 :: (rabbit_types:vhost()) -> 'ok').
@@ -54,15 +58,27 @@ enabled(VHost) ->
{ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS),
lists:member(VHost, VHosts).
-tap_in(_Msg, none) -> ok;
-tap_in(Msg = #basic_message{exchange_name = #resource{name = XName}}, TraceX) ->
- trace(TraceX, Msg, <<"publish">>, XName, []).
-
-tap_out(_Msg, none) -> ok;
-tap_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}, TraceX) ->
+tap_in(_Msg, _ConnName, _ChannelNum, _Username, none) -> ok;
+tap_in(Msg = #basic_message{exchange_name = #resource{name = XName,
+ virtual_host = VHost}},
+ ConnName, ChannelNum, Username, TraceX) ->
+ trace(TraceX, Msg, <<"publish">>, XName,
+ [{<<"vhost">>, longstr, VHost},
+ {<<"connection">>, longstr, ConnName},
+ {<<"channel">>, signedint, ChannelNum},
+ {<<"user">>, longstr, Username}]).
+
+tap_out(_Msg, _ConnName, _ChannelNum, _Username, none) -> ok;
+tap_out({#resource{name = QName, virtual_host = VHost},
+ _QPid, _QMsgId, Redelivered, Msg},
+ ConnName, ChannelNum, Username, TraceX) ->
RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
trace(TraceX, Msg, <<"deliver">>, QName,
- [{<<"redelivered">>, signedint, RedeliveredNum}]).
+ [{<<"redelivered">>, signedint, RedeliveredNum},
+ {<<"vhost">>, longstr, VHost},
+ {<<"connection">>, longstr, ConnName},
+ {<<"channel">>, signedint, ChannelNum},
+ {<<"user">>, longstr, Username}]).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index b6d37852..1104f373 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -48,6 +48,7 @@
-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}).
-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}).
-rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}).
+-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}).
%% -------------------------------------------------------------------
@@ -77,6 +78,8 @@
-spec(policy_apply_to/0 :: () -> 'ok').
-spec(queue_decorators/0 :: () -> 'ok').
-spec(internal_system_x/0 :: () -> 'ok').
+-spec(cluster_name/0 :: () -> 'ok').
+-spec(down_slave_nodes/0 :: () -> 'ok').
-endif.
@@ -382,6 +385,21 @@ cluster_name_tx() ->
[mnesia:delete(T, K, write) || K <- Ks],
ok.
+down_slave_nodes() ->
+ ok = down_slave_nodes(rabbit_queue),
+ ok = down_slave_nodes(rabbit_durable_queue).
+
+down_slave_nodes(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, Policy, GmPids, Decorators}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, [], Policy, GmPids, Decorators}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ede69748..03b99562 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -22,7 +22,7 @@
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
- status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
+ info/2, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -821,15 +821,19 @@ msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
out = AvgEgressRate } }) ->
{AvgIngressRate, AvgEgressRate}.
-status(#vqstate {
+info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) ->
+ RamMsgCount;
+info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA}) ->
+ gb_trees:size(RPA);
+info(messages_ram, State) ->
+ info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State);
+info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
+ PersistentCount;
+info(backing_queue_status, #vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
- ram_pending_ack = RPA,
- disk_pending_ack = DPA,
target_ram_count = TargetRamCount,
- ram_msg_count = RamMsgCount,
next_seq_id = NextSeqId,
- persistent_count = PersistentCount,
rates = #rates { in = AvgIngressRate,
out = AvgEgressRate,
ack_in = AvgAckIngressRate,
@@ -841,16 +845,14 @@ status(#vqstate {
{q3 , ?QUEUE:len(Q3)},
{q4 , ?QUEUE:len(Q4)},
{len , Len},
- {pending_acks , gb_trees:size(RPA) + gb_trees:size(DPA)},
{target_ram_count , TargetRamCount},
- {ram_msg_count , RamMsgCount},
- {ram_ack_count , gb_trees:size(RPA)},
{next_seq_id , NextSeqId},
- {persistent_count , PersistentCount},
{avg_ingress_rate , AvgIngressRate},
{avg_egress_rate , AvgEgressRate},
{avg_ack_ingress_rate, AvgAckIngressRate},
- {avg_ack_egress_rate , AvgAckEgressRate} ].
+ {avg_ack_egress_rate , AvgAckEgressRate} ];
+info(Item, _) ->
+ throw({bad_argument, Item}).
invoke(?MODULE, Fun, State) -> Fun(?MODULE, State);
invoke( _, _, State) -> State.
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
index d943b599..3a041508 100644
--- a/src/rabbit_version.erl
+++ b/src/rabbit_version.erl
@@ -114,8 +114,8 @@ upgrades_required(Scope) ->
with_upgrade_graph(Fun, Scope) ->
case rabbit_misc:build_acyclic_graph(
- fun (Module, Steps) -> vertices(Module, Steps, Scope) end,
- fun (Module, Steps) -> edges(Module, Steps, Scope) end,
+ fun ({_App, Module, Steps}) -> vertices(Module, Steps, Scope) end,
+ fun ({_App, Module, Steps}) -> edges(Module, Steps, Scope) end,
rabbit_misc:all_module_attributes(rabbit_upgrade)) of
{ok, G} -> try
Fun(G)
@@ -161,7 +161,7 @@ heads(G) ->
categorise_by_scope(Version) when is_list(Version) ->
Categorised =
- [{Scope, Name} || {_Module, Attributes} <-
+ [{Scope, Name} || {_App, _Module, Attributes} <-
rabbit_misc:all_module_attributes(rabbit_upgrade),
{Name, Scope, _Requires} <- Attributes,
lists:member(Name, Version)],