summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-18 12:56:41 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-18 12:56:41 +0100
commit508e72d273eb223be2f0b36f04eecb88f87054e8 (patch)
tree81cfd3acae89e09e2329785b0e77ebda979e0c62
parent94ecaab70dcd2ce7ceac95e8802d3a54267f4b81 (diff)
parent79e0a773bdfd72ceb82e54fa25407a2620e95914 (diff)
downloadrabbitmq-server-508e72d273eb223be2f0b36f04eecb88f87054e8.tar.gz
stable to default
-rw-r--r--Makefile14
-rw-r--r--docs/rabbitmq-plugins.1.xml87
-rw-r--r--docs/rabbitmq.config.example30
-rw-r--r--docs/rabbitmqctl.1.xml92
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--include/rabbit.hrl29
-rw-r--r--packaging/common/README2
-rwxr-xr-xquickcheck12
-rwxr-xr-xscripts/rabbitmq-env44
-rwxr-xr-xscripts/rabbitmq-plugins8
-rwxr-xr-xscripts/rabbitmq-plugins.bat6
-rwxr-xr-xscripts/rabbitmq-server43
-rwxr-xr-xscripts/rabbitmq-server.bat1
-rwxr-xr-xscripts/rabbitmq-service.bat1
-rwxr-xr-xscripts/rabbitmqctl15
-rwxr-xr-xscripts/rabbitmqctl.bat27
-rw-r--r--src/app_utils.erl19
-rw-r--r--src/gm.erl30
-rw-r--r--src/gm_qc.erl384
-rw-r--r--src/rabbit.erl187
-rw-r--r--src/rabbit_amqqueue.erl94
-rw-r--r--src/rabbit_amqqueue_process.erl103
-rw-r--r--src/rabbit_backing_queue.erl20
-rw-r--r--src/rabbit_backing_queue_qc.erl5
-rw-r--r--src/rabbit_channel.erl43
-rw-r--r--src/rabbit_channel_interceptor.erl25
-rw-r--r--src/rabbit_control_main.erl61
-rw-r--r--src/rabbit_dead_letter.erl3
-rw-r--r--src/rabbit_event.erl21
-rw-r--r--src/rabbit_exchange.erl89
-rw-r--r--src/rabbit_exchange_decorator.erl24
-rw-r--r--src/rabbit_limiter.erl7
-rw-r--r--src/rabbit_log.erl85
-rw-r--r--src/rabbit_mirror_queue_master.erl31
-rw-r--r--src/rabbit_mirror_queue_misc.erl81
-rw-r--r--src/rabbit_mirror_queue_slave.erl9
-rw-r--r--src/rabbit_misc.erl94
-rw-r--r--src/rabbit_mnesia.erl76
-rw-r--r--src/rabbit_networking.erl15
-rw-r--r--src/rabbit_node_monitor.erl1
-rw-r--r--src/rabbit_plugins.erl134
-rw-r--r--src/rabbit_plugins_main.erl257
-rw-r--r--src/rabbit_policies.erl15
-rw-r--r--src/rabbit_policy.erl24
-rw-r--r--src/rabbit_queue_decorator.erl23
-rw-r--r--src/rabbit_queue_index.erl94
-rw-r--r--src/rabbit_reader.erl64
-rw-r--r--src/rabbit_recovery_terms.erl22
-rw-r--r--src/rabbit_table.erl8
-rw-r--r--src/rabbit_tests.erl65
-rw-r--r--src/rabbit_trace.erl36
-rw-r--r--src/rabbit_upgrade.erl6
-rw-r--r--src/rabbit_upgrade_functions.erl18
-rw-r--r--src/rabbit_variable_queue.erl285
-rw-r--r--src/rabbit_version.erl6
55 files changed, 2145 insertions, 833 deletions
diff --git a/Makefile b/Makefile
index c54b44e5..ffb4cdfe 100644
--- a/Makefile
+++ b/Makefile
@@ -20,8 +20,6 @@ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml))
WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml $(DOCS_DIR)/rabbitmq-echopid.xml)
USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-plugins.1.xml
USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML)))
-QC_MODULES := rabbit_backing_queue_qc
-QC_TRIALS ?= 100
ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes)
PYTHON=python
@@ -56,6 +54,12 @@ endif
#other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests
ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc)
+ifdef INSTRUMENT_FOR_QC
+ERLC_OPTS += -DINSTR_MOD=gm_qc
+else
+ERLC_OPTS += -DINSTR_MOD=gm
+endif
+
include version.mk
PLUGINS_SRC_DIR?=$(shell [ -d "plugins-src" ] && echo "plugins-src" || echo )
@@ -123,7 +127,7 @@ plugins:
# Not building plugins
check-xref:
- $(info xref checks are disabled)
+ $(info xref checks are disabled as there is no plugins-src directory)
endif
@@ -217,7 +221,8 @@ run-tests: all
echo $$OUT ; echo $$OUT | grep '^{ok, passed}$$' > /dev/null
run-qc: all
- $(foreach MOD,$(QC_MODULES),./quickcheck $(RABBITMQ_NODENAME) $(MOD) $(QC_TRIALS))
+ ./quickcheck $(RABBITMQ_NODENAME) rabbit_backing_queue_qc 100 40
+ ./quickcheck $(RABBITMQ_NODENAME) gm_qc 1000 200
start-background-node: all
-rm -f $(RABBITMQ_MNESIA_DIR).pid
@@ -381,3 +386,4 @@ include $(DEPS_FILE)
endif
.PHONY: run-qc
+
diff --git a/docs/rabbitmq-plugins.1.xml b/docs/rabbitmq-plugins.1.xml
index 8ecb4fc8..f7be2d29 100644
--- a/docs/rabbitmq-plugins.1.xml
+++ b/docs/rabbitmq-plugins.1.xml
@@ -40,6 +40,7 @@
<refsynopsisdiv>
<cmdsynopsis>
<command>rabbitmq-plugins</command>
+ <arg choice="opt">-n <replaceable>node</replaceable></arg>
<arg choice="req"><replaceable>command</replaceable></arg>
<arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg>
</cmdsynopsis>
@@ -62,6 +63,16 @@
enabled. Implicitly enabled plugins are automatically disabled again
when they are no longer required.
</para>
+
+ <para>
+ The <command>enable</command>, <command>disable</command> and
+ <command>set</command> commands will update the plugins file and
+ then attempt to connect to the broker and ensure it is running
+ all enabled plugins. By default if it is not possible to connect
+ to the running broker (for example if it is stopped) then a
+ warning is displayed. Specify <command>--online</command> or
+ <command>--offline</command> to change this behaviour.
+ </para>
</refsect1>
<refsect1>
@@ -97,12 +108,14 @@
</variablelist>
<para>
Lists all plugins, their versions, dependencies and
- descriptions. Each plugin is prefixed with a status
- indicator - [ ] to indicate that the plugin is not
- enabled, [E] to indicate that it is explicitly enabled,
- [e] to indicate that it is implicitly enabled, and [!] to
- indicate that it is enabled but missing and thus not
- operational.
+ descriptions. Each plugin is prefixed with two status
+ indicator characters inside [ ]. The first indicator can
+ be " " to indicate that the plugin is not enabled, "E" to
+ indicate that it is explicitly enabled, "e" to indicate
+ that it is implicitly enabled, or "!" to indicate that it
+ is enabled but missing and thus not operational. The
+ second indicator can be " " to show that the plugin is not
+ running, or "*" to show that it is.
</para>
<para>
If the optional pattern is given, only plugins whose
@@ -130,17 +143,24 @@
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>enable</command> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>enable</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
+ <term>--offline</term>
+ <listitem><para>Just modify the enabled plugins file.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>--online</term>
+ <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>plugin</term>
<listitem><para>One or more plugins to enable.</para></listitem>
</varlistentry>
</variablelist>
<para>
- Enables the specified plugins and all their
- dependencies.
+ Enables the specified plugins and all their dependencies.
</para>
<para role="example-prefix">For example:</para>
@@ -154,17 +174,24 @@
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>disable</command> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>disable</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
+ <term>--offline</term>
+ <listitem><para>Just modify the enabled plugins file.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>--online</term>
+ <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>plugin</term>
<listitem><para>One or more plugins to disable.</para></listitem>
</varlistentry>
</variablelist>
<para>
- Disables the specified plugins and all plugins that
- depend on them.
+ Disables the specified plugins and all their dependencies.
</para>
<para role="example-prefix">For example:</para>
@@ -175,6 +202,42 @@
</para>
</listitem>
</varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>set</command> <arg choice="opt">--offline</arg> <arg choice="opt">--online</arg> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>--offline</term>
+ <listitem><para>Just modify the enabled plugins file.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>--online</term>
+ <listitem><para>Treat failure to connect to the running broker as fatal.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>plugin</term>
+ <listitem><para>Zero or more plugins to enable.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Enables the specified plugins and all their
+ dependencies. Unlike <command>rabbitmq-plugins
+ enable</command> this command ignores and overwrites any
+ existing enabled plugins. <command>rabbitmq-plugins
+ set</command> with no plugin arguments is a legal command
+ meaning "disable all plugins".
+ </para>
+
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmq-plugins set rabbitmq_management</screen>
+ <para role="example">
+ This command enables the <command>management</command>
+ plugin and its dependencies and disables everything else.
+ </para>
+ </listitem>
+ </varlistentry>
+
</variablelist>
</refsect1>
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index a128bfbc..63540568 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -27,6 +27,11 @@
%%
%% {ssl_listeners, [5671]},
+ %% Maximum time for AMQP 0-8/0-9/0-9-1 handshake (after socket connection
+ %% and SSL handshake), in milliseconds.
+ %%
+ %% {handshake_timeout, 10000},
+
%% Log levels (currently just used for connection logging).
%% One of 'info', 'warning', 'error' or 'none', in decreasing order
%% of verbosity. Defaults to 'info'.
@@ -111,6 +116,10 @@
%%
%% {ssl_cert_login_from, common_name},
+ %% SSL handshake timeout, in milliseconds.
+ %%
+ %% {ssl_handshake_timeout, 5000},
+
%%
%% Default User / VHost
%% ====================
@@ -221,7 +230,12 @@
%% Explicitly enable/disable hipe compilation.
%%
- %% {hipe_compile, true}
+ %% {hipe_compile, true},
+
+ %% Timeout used when waiting for Mnesia tables in a cluster to
+ %% become available.
+ %%
+ %% {mnesia_table_loading_timeout, 30000}
]},
@@ -265,9 +279,13 @@
%% {certfile, "/path/to/cert.pem"},
%% {keyfile, "/path/to/key.pem"}]}]},
+ %% One of 'basic', 'detailed' or 'none'. See
+ %% http://www.rabbitmq.com/management.html#fine-stats for more details.
+ %% {rates_mode, basic},
+
%% Configure how long aggregated data (such as message rates and queue
%% lengths) is retained. Please read the plugin's documentation in
- %% https://www.rabbitmq.com/management.html#configuration for more
+ %% http://www.rabbitmq.com/management.html#configuration for more
%% details.
%%
%% {sample_retention_policies,
@@ -276,14 +294,6 @@
%% {detailed, [{10, 5}]}]}
]},
- {rabbitmq_management_agent,
- [%% Misc/Advanced Options
- %%
- %% NB: Change these only if you understand what you are doing!
- %%
- %% {force_fine_statistics, true}
- ]},
-
%% ----------------------------------------------------------------------------
%% RabbitMQ Shovel Plugin
%%
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 01b024a2..eb3c7ef3 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -406,11 +406,15 @@
online, except when using the <command>--offline</command> flag.
</para>
<para>
- When using the <command>--offline</command> flag the node you
- connect to will become the canonical source for cluster metadata
- (e.g. which queues exist), even if it was not before. Therefore
- you should use this command on the latest node to shut down if
- at all possible.
+ When using the <command>--offline</command> flag
+ rabbitmqctl will not attempt to connect to a node as
+ normal; instead it will temporarily become the node in
+ order to make the change. This is useful if the node
+ cannot be started normally. In this case the node will
+ become the canonical source for cluster metadata
+ (e.g. which queues exist), even if it was not
+ before. Therefore you should use this command on the
+ latest node to shut down if at all possible.
</para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl -n hare@mcnulty forget_cluster_node rabbit@stringer</screen>
@@ -454,6 +458,44 @@
</listitem>
</varlistentry>
<varlistentry>
+ <term><cmdsynopsis><command>force_boot</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Ensure that the node will start next time, even if it
+ was not the last to shut down.
+ </para>
+ <para>
+ Normally when you shut down a RabbitMQ cluster
+ altogether, the first node you restart should be the
+ last one to go down, since it may have seen things
+ happen that other nodes did not. But sometimes
+ that's not possible: for instance if the entire cluster
+ loses power then all nodes may think they were not the
+ last to shut down.
+ </para>
+ <para>
+ In such a case you can invoke <command>rabbitmqctl
+ force_boot</command> while the node is down. This will
+ tell the node to unconditionally start next time you ask
+ it to. If any changes happened to the cluster after this
+ node shut down, they will be lost.
+ </para>
+ <para>
+ If the last node to go down is permanently lost then you
+ should use <command>rabbitmqctl forget_cluster_node
+ --offline</command> in preference to this command, as it
+ will ensure that mirrored queues which were mastered on
+ the lost node get promoted.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl force_boot</screen>
+ <para role="example">
+ This will force the node not to wait for other nodes
+ next time it is started.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
<term><cmdsynopsis><command>sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis>
</term>
<listitem>
@@ -1148,6 +1190,42 @@
(queue depth).</para></listitem>
</varlistentry>
<varlistentry>
+ <term>messages_ready_ram</term>
+ <listitem><para>Number of messages from messages_ready which are resident in ram.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>messages_unacknowledged_ram</term>
+ <listitem><para>Number of messages from messages_unacknowledged which are resident in ram.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>messages_ram</term>
+ <listitem><para>Total number of messages which are resident in ram.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>messages_persistent</term>
+ <listitem><para>Total number of persistent messages in the queue (will always be 0 for transient queues).</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>message_bytes</term>
+ <listitem><para>Sum of the size of all message bodies in the queue. This does not include the message properties (including headers) or any overhead.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>message_bytes_ready</term>
+ <listitem><para>Like <command>message_bytes</command> but counting only those messages ready to be delivered to clients.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>message_bytes_unacknowledged</term>
+ <listitem><para>Like <command>message_bytes</command> but counting only those messages delivered to clients but not yet acknowledged.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>message_bytes_ram</term>
+ <listitem><para>Like <command>message_bytes</command> but counting only those messages which are in RAM.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>message_bytes_persistent</term>
+ <listitem><para>Like <command>message_bytes</command> but counting only those messages which are persistent.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>consumers</term>
<listitem><para>Number of consumers.</para></listitem>
</varlistentry>
@@ -1475,6 +1553,10 @@
<term>send_pend</term>
<listitem><para>Send queue size.</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>connected_at</term>
+ <listitem><para>Date and time this connection was established, as timestamp.</para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>connectioninfoitem</command>s are
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 7360208a..f26e0f77 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -39,12 +39,15 @@
{server_properties, []},
{collect_statistics, none},
{collect_statistics_interval, 5000},
+ {mnesia_table_loading_timeout, 30000},
{auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
{auth_backends, [rabbit_auth_backend_internal]},
{delegate_count, 16},
{trace_vhosts, []},
{log_levels, [{connection, info}]},
{ssl_cert_login_from, distinguished_name},
+ {ssl_handshake_timeout, 5000},
+ {handshake_timeout, 10000},
{reverse_dns_lookups, false},
{cluster_partition_handling, ignore},
{tcp_listen_options, [binary,
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 5ac3197e..5e41ea93 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -39,13 +39,25 @@
-record(resource, {virtual_host, kind, name}).
--record(exchange, {name, type, durable, auto_delete, internal, arguments,
- scratches, policy, decorators}).
--record(exchange_serial, {name, next}).
+%% fields described as 'transient' here are cleared when writing to
+%% rabbit_durable_<thing>
+-record(exchange, {
+ name, type, durable, auto_delete, internal, arguments, %% immutable
+ scratches, %% durable, explicitly updated via update_scratch/3
+ policy, %% durable, implicitly updated when policy changes
+ decorators}). %% transient, recalculated in store/1 (i.e. recovery)
+
+-record(amqqueue, {
+ name, durable, auto_delete, exclusive_owner = none, %% immutable
+ arguments, %% immutable
+ pid, %% durable (just so we know home node)
+ slave_pids, sync_slave_pids, %% transient
+ down_slave_nodes, %% durable
+ policy, %% durable, implicit update as above
+ gm_pids, %% transient
+ decorators}). %% transient, recalculated as above
--record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid, slave_pids, sync_slave_pids, policy,
- gm_pids, decorators}).
+-record(exchange_serial, {name, next}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
@@ -75,7 +87,7 @@
-record(event, {type, props, reference = undefined, timestamp}).
--record(message_properties, {expiry, needs_confirming = false}).
+-record(message_properties, {expiry, needs_confirming = false, size}).
-record(plugin, {name, %% atom()
version, %% string()
@@ -105,9 +117,6 @@
-define(DESIRED_HIBERNATE, 10000).
-define(CREDIT_DISC_BOUND, {2000, 500}).
-%% This is dictated by `erlang:send_after' on which we depend to implement TTL.
--define(MAX_EXPIRY_TIMER, 4294967295).
-
-define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
diff --git a/packaging/common/README b/packaging/common/README
index 0a29ee27..35a1523a 100644
--- a/packaging/common/README
+++ b/packaging/common/README
@@ -17,4 +17,4 @@ run as the superuser.
An example configuration file is provided in the same directory as
this README. Copy it to /etc/rabbitmq/rabbitmq.config to use it. The
RabbitMQ server must be restarted after changing the configuration
-file or enabling or disabling plugins.
+file.
diff --git a/quickcheck b/quickcheck
index b5382d75..59da3719 100755
--- a/quickcheck
+++ b/quickcheck
@@ -7,17 +7,21 @@
%% NodeStr is a local broker node name
%% ModStr is the module containing quickcheck properties
%% TrialsStr is the number of trials
-main([NodeStr, ModStr, TrialsStr]) ->
+main([NodeStr, ModStr, NumTestsStr, MaxSizeStr]) ->
{ok, Hostname} = inet:gethostname(),
Node = list_to_atom(NodeStr ++ "@" ++ Hostname),
Mod = list_to_atom(ModStr),
- Trials = erlang:list_to_integer(TrialsStr),
+ NumTests = erlang:list_to_integer(NumTestsStr),
+ MaxSize = erlang:list_to_integer(MaxSizeStr),
case rpc:call(Node, code, ensure_loaded, [proper]) of
{module, proper} ->
case rpc:call(Node, proper, module,
- [Mod] ++ [[{numtests, Trials}, {constraint_tries, 200}]]) of
+ [Mod] ++ [[{numtests, NumTests},
+ {max_size, MaxSize},
+ {constraint_tries, 200}]]) of
[] -> ok;
- _ -> quit(1)
+ R -> io:format("~p.~n", [R]),
+ quit(1)
end;
{badrpc, Reason} ->
io:format("Could not contact node ~p: ~p.~n", [Node, Reason]),
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 69d5a9c9..74298440 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -58,3 +58,47 @@ fi
## Get configuration variables from the configure environment file
[ -f ${CONF_ENV_FILE} ] && . ${CONF_ENV_FILE} || true
+
+##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
+
+DEFAULT_NODE_IP_ADDRESS=auto
+DEFAULT_NODE_PORT=5672
+[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
+[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
+
+[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS}
+[ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT}
+
+[ "x" = "x$RABBITMQ_DIST_PORT" ] && RABBITMQ_DIST_PORT=${DIST_PORT}
+[ "x" = "x$RABBITMQ_DIST_PORT" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_DIST_PORT=$((${DEFAULT_NODE_PORT} + 20000))
+[ "x" = "x$RABBITMQ_DIST_PORT" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_DIST_PORT=$((${RABBITMQ_NODE_PORT} + 20000))
+
+[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
+[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS}
+[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE}
+[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE}
+[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE}
+[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS}
+[ "x" = "x$RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS" ] && RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=${SERVER_ADDITIONAL_ERL_ARGS}
+[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR}
+[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}
+
+[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${PID_FILE}
+[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${RABBITMQ_MNESIA_DIR}.pid
+
+[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR}
+[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand
+
+[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
+
+[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR}
+
+## Log rotation
+[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS}
+[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}.log"
+[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS=${SASL_LOGS}
+[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}-sasl.log"
+
+[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS}
+
+##--- End of overridden <var_name> variables
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins
index bd7d0b6a..8b5b30b7 100755
--- a/scripts/rabbitmq-plugins
+++ b/scripts/rabbitmq-plugins
@@ -19,13 +19,6 @@
# Non-empty defaults should be set in rabbitmq-env
. `dirname $0`/rabbitmq-env
-##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
-
-[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
-[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR}
-
-##--- End of overridden <var_name> variables
-
exec ${ERL_DIR}erl \
-pa "${RABBITMQ_HOME}/ebin" \
-noinput \
@@ -35,4 +28,5 @@ exec ${ERL_DIR}erl \
-s rabbit_plugins_main \
-enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \
-plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \
+ -nodename $RABBITMQ_NODENAME \
-extra "$@"
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index a535ebad..61e39e38 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -31,6 +31,10 @@ if "!RABBITMQ_BASE!"=="" (
set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
+if "!RABBITMQ_NODENAME!"=="" (
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
+)
+
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
@@ -51,7 +55,7 @@ if "!RABBITMQ_PLUGINS_DIR!"=="" (
set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index bd397441..686c90cf 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -19,48 +19,6 @@
# Non-empty defaults should be set in rabbitmq-env
. `dirname $0`/rabbitmq-env
-##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
-
-DEFAULT_NODE_IP_ADDRESS=auto
-DEFAULT_NODE_PORT=5672
-[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
-[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
-
-[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS}
-[ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT}
-
-[ "x" = "x$RABBITMQ_DIST_PORT" ] && RABBITMQ_DIST_PORT=${DIST_PORT}
-[ "x" = "x$RABBITMQ_DIST_PORT" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_DIST_PORT=$((${DEFAULT_NODE_PORT} + 20000))
-[ "x" = "x$RABBITMQ_DIST_PORT" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_DIST_PORT=$((${RABBITMQ_NODE_PORT} + 20000))
-
-[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
-[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS}
-[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE}
-[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE}
-[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE}
-[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS}
-
-[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR}
-[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}
-
-[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${PID_FILE}
-[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${RABBITMQ_MNESIA_DIR}.pid
-
-[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR}
-[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand
-
-[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
-
-[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR}
-
-## Log rotation
-[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS}
-[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}.log"
-[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS=${SASL_LOGS}
-[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}-sasl.log"
-
-##--- End of overridden <var_name> variables
-
RABBITMQ_START_RABBIT=
[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT=" -noinput"
[ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT="$RABBITMQ_START_RABBIT -s rabbit boot "
@@ -131,6 +89,7 @@ exec ${ERL_DIR}erl \
${RABBITMQ_CONFIG_ARG} \
+W w \
${RABBITMQ_SERVER_ERL_ARGS} \
+ ${RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS} \
${RABBITMQ_LISTEN_ARG} \
-sasl errlog_type error \
-sasl sasl_error_logger false \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 043204fa..e2312406 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -147,6 +147,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
-kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
!RABBITMQ_SERVER_ERL_ARGS! ^
+!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
-sasl errlog_type error ^
-sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 895561d4..fb2703f2 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -235,6 +235,7 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-os_mon start_memsup false ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
!RABBITMQ_SERVER_START_ARGS! ^
+!RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS! ^
!RABBITMQ_DIST_ARG! ^
!STARVAR!
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 309abf2a..31fe0afc 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -19,20 +19,21 @@
# Non-empty defaults should be set in rabbitmq-env
. `dirname $0`/rabbitmq-env
-##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
-
-[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
-[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS}
-
-##--- End of overridden <var_name> variables
+# rabbitmqctl starts distribution itself, so we need to make sure epmd
+# is running.
+${ERL_DIR}erl -sname rabbitmqctl-prelaunch-$$ -noinput -eval 'erlang:halt().'
+# We specify Mnesia dir and sasl error logger since some actions
+# (e.g. forget_cluster_node --offline) require us to impersonate the
+# real node.
exec ${ERL_DIR}erl \
-pa "${RABBITMQ_HOME}/ebin" \
-noinput \
-hidden \
${RABBITMQ_CTL_ERL_ARGS} \
- -sname rabbitmqctl$$ \
-boot "${CLEAN_BOOT_FILE}" \
+ -sasl errlog_type error \
+ -mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \
-s rabbit_control_main \
-nodename $RABBITMQ_NODENAME \
-extra "$@"
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index 8e8ba1bd..7a2c454c 100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -23,6 +23,10 @@ set TDP0=%~dp0
set STAR=%*
setlocal enabledelayedexpansion
+if "!RABBITMQ_BASE!"=="" (
+ set RABBITMQ_BASE=!APPDATA!\RabbitMQ
+)
+
if "!COMPUTERNAME!"=="" (
set COMPUTERNAME=localhost
)
@@ -31,6 +35,14 @@ if "!RABBITMQ_NODENAME!"=="" (
set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
+if "!RABBITMQ_MNESIA_BASE!"=="" (
+ set RABBITMQ_MNESIA_BASE=!RABBITMQ_BASE!/db
+)
+
+if "!RABBITMQ_MNESIA_DIR!"=="" (
+ set RABBITMQ_MNESIA_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-mnesia
+)
+
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
@@ -43,7 +55,20 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" (
exit /B
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM!!TIME:~9! -s rabbit_control_main -nodename !RABBITMQ_NODENAME! -extra !STAR!
+rem rabbitmqctl starts distribution itself, so we need to make sure epmd
+rem is running.
+"!ERLANG_HOME!\bin\erl.exe" -sname rabbitmqctl-prelaunch-!RANDOM!!TIME:~9! -noinput -eval "erlang:halt()."
+
+"!ERLANG_HOME!\bin\erl.exe" ^
+-pa "!TDP0!..\ebin" ^
+-noinput ^
+-hidden ^
+!RABBITMQ_CTL_ERL_ARGS! ^
+-sasl errlog_type error ^
+-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
+-s rabbit_control_main ^
+-nodename !RABBITMQ_NODENAME! ^
+-extra !STAR!
endlocal
endlocal
diff --git a/src/app_utils.erl b/src/app_utils.erl
index 0479ce66..87e6fa0b 100644
--- a/src/app_utils.erl
+++ b/src/app_utils.erl
@@ -17,7 +17,7 @@
-export([load_applications/1, start_applications/1, start_applications/2,
stop_applications/1, stop_applications/2, app_dependency_order/2,
- wait_for_applications/1]).
+ app_dependencies/1]).
-ifdef(use_specs).
@@ -28,8 +28,8 @@
-spec stop_applications([atom()]) -> 'ok'.
-spec start_applications([atom()], error_handler()) -> 'ok'.
-spec stop_applications([atom()], error_handler()) -> 'ok'.
--spec wait_for_applications([atom()]) -> 'ok'.
-spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()].
+-spec app_dependencies(atom()) -> [atom()].
-endif.
@@ -68,14 +68,10 @@ stop_applications(Apps, ErrorHandler) ->
ErrorHandler,
Apps).
-
-wait_for_applications(Apps) ->
- [wait_for_application(App) || App <- Apps], ok.
-
app_dependency_order(RootApps, StripUnreachable) ->
{ok, G} = rabbit_misc:build_acyclic_graph(
- fun (App, _Deps) -> [{App, App}] end,
- fun (App, Deps) -> [{Dep, App} || Dep <- Deps] end,
+ fun ({App, _Deps}) -> [{App, App}] end,
+ fun ({App, Deps}) -> [{Dep, App} || Dep <- Deps] end,
[{App, app_dependencies(App)} ||
{App, _Desc, _Vsn} <- application:loaded_applications()]),
try
@@ -92,13 +88,6 @@ app_dependency_order(RootApps, StripUnreachable) ->
%%---------------------------------------------------------------------------
%% Private API
-wait_for_application(Application) ->
- case lists:keymember(Application, 1, rabbit_misc:which_applications()) of
- true -> ok;
- false -> timer:sleep(1000),
- wait_for_application(Application)
- end.
-
load_applications(Worklist, Loaded) ->
case queue:out(Worklist) of
{empty, _WorkList} ->
diff --git a/src/gm.erl b/src/gm.erl
index 2235da33..3113f449 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -388,6 +388,9 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_info/3]).
+%% For INSTR_MOD callbacks
+-export([call/3, cast/2, monitor/1, demonitor/1]).
+
-ifndef(use_specs).
-export([behaviour_info/1]).
-endif.
@@ -616,6 +619,16 @@ handle_call({add_on_right, NewMember}, _From,
members_state = MembersState1 }),
handle_callback_result({Result, {ok, Group}, State1}).
+%% add_on_right causes a catchup to be sent immediately from the left,
+%% so we can never see this from the left neighbour. However, it's
+%% possible for the right neighbour to send us a check_neighbours
+%% immediately before that. We can't possibly handle it, but if we're
+%% in this state we know a catchup is coming imminently anyway. So
+%% just ignore it.
+handle_cast({?TAG, _ReqVer, check_neighbours},
+ State = #state { members_state = undefined }) ->
+ noreply(State);
+
handle_cast({?TAG, ReqVer, Msg},
State = #state { view = View,
members_state = MembersState,
@@ -1177,8 +1190,8 @@ can_erase_view_member(Self, Self, _LA, _LP) -> false;
can_erase_view_member(_Self, _Id, N, N) -> true;
can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
-neighbour_cast(N, Msg) -> gen_server2:cast(get_pid(N), Msg).
-neighbour_call(N, Msg) -> gen_server2:call(get_pid(N), Msg, infinity).
+neighbour_cast(N, Msg) -> ?INSTR_MOD:cast(get_pid(N), Msg).
+neighbour_call(N, Msg) -> ?INSTR_MOD:call(get_pid(N), Msg, infinity).
%% ---------------------------------------------------------------------------
%% View monitoring and maintanence
@@ -1192,7 +1205,7 @@ ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
{RealNeighbour, MRef};
ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
- true = erlang:demonitor(MRef),
+ true = ?INSTR_MOD:demonitor(MRef),
Msg = {?TAG, Ver, check_neighbours},
ok = neighbour_cast(RealNeighbour, Msg),
ok = case Neighbour of
@@ -1202,7 +1215,7 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
{Neighbour, maybe_monitor(Neighbour, Self)}.
maybe_monitor( Self, Self) -> undefined;
-maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)).
+maybe_monitor(Other, _Self) -> ?INSTR_MOD:monitor(get_pid(Other)).
check_neighbours(State = #state { self = Self,
left = Left,
@@ -1461,3 +1474,12 @@ last_pub( [], LP) -> LP;
last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List),
true = PubNum > LP, %% ASSERTION
PubNum.
+
+%% ---------------------------------------------------------------------------
+
+%% Uninstrumented versions
+
+call(Pid, Msg, Timeout) -> gen_server2:call(Pid, Msg, Timeout).
+cast(Pid, Msg) -> gen_server2:cast(Pid, Msg).
+monitor(Pid) -> erlang:monitor(process, Pid).
+demonitor(MRef) -> erlang:demonitor(MRef).
diff --git a/src/gm_qc.erl b/src/gm_qc.erl
new file mode 100644
index 00000000..394cbcbd
--- /dev/null
+++ b/src/gm_qc.erl
@@ -0,0 +1,384 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(gm_qc).
+-ifdef(use_proper_qc).
+
+-include_lib("proper/include/proper.hrl").
+
+-define(GROUP, test_group).
+-define(MAX_SIZE, 5).
+-define(MSG_TIMEOUT, 1000000). %% micros
+
+-export([prop_gm_test/0]).
+
+-behaviour(proper_statem).
+-export([initial_state/0, command/1, precondition/2, postcondition/3,
+ next_state/3]).
+
+-behaviour(gm).
+-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
+
+%% Helpers
+-export([do_join/0, do_leave/1, do_send/1, do_proceed1/1, do_proceed2/2]).
+
+%% For insertion into gm
+-export([call/3, cast/2, monitor/1, demonitor/1, execute_mnesia_transaction/1]).
+
+-record(state, {seq, %% symbolic and dynamic
+ instrumented, %% dynamic only
+ outstanding, %% dynamic only
+ monitors, %% dynamic only
+ all_join, %% for symbolic
+ to_join, %% dynamic only
+ to_leave %% for symbolic
+ }).
+
+prop_gm_test() ->
+ case ?INSTR_MOD of
+ ?MODULE -> ok;
+ _ -> exit(compile_with_INSTRUMENT_FOR_QC)
+ end,
+ process_flag(trap_exit, true),
+ erlang:register(?MODULE, self()),
+ ?FORALL(Cmds, commands(?MODULE), gm_test(Cmds)).
+
+gm_test(Cmds) ->
+ {_H, State, Res} = run_commands(?MODULE, Cmds),
+ cleanup(State),
+ ?WHENFAIL(
+ io:format("Result: ~p~n", [Res]),
+ aggregate(command_names(Cmds), Res =:= ok)).
+
+cleanup(S) ->
+ S2 = ensure_joiners_joined_and_msgs_received(S),
+ All = gms_joined(S2),
+ All = gms(S2), %% assertion - none to join
+ check_stale_members(All),
+ [gm:leave(GM) || GM <- All],
+ drain_and_proceed_gms(S2),
+ [await_death(GM) || GM <- All],
+ gm:forget_group(?GROUP),
+ ok.
+
+check_stale_members(All) ->
+ GMs = [P || P <- processes(), is_gm_process(?GROUP, P)],
+ case GMs -- All of
+ [] -> ok;
+ Rest -> exit({forgot, Rest})
+ end.
+
+is_gm_process(Group, P) ->
+ case process_info(P, dictionary) of
+ undefined -> false;
+ {dictionary, D} -> {gm, Group} =:= proplists:get_value(process_name, D)
+ end.
+
+await_death(P) ->
+ MRef = erlang:monitor(process, P),
+ await_death(MRef, P).
+
+await_death(MRef, P) ->
+ receive
+ {'DOWN', MRef, process, P, _} -> ok;
+ {'DOWN', _, _, _, _} -> await_death(MRef, P);
+ {'EXIT', _, normal} -> await_death(MRef, P);
+ {'EXIT', _, Reason} -> exit(Reason);
+ {joined, _GM} -> await_death(MRef, P);
+ {left, _GM} -> await_death(MRef, P);
+ Anything -> exit({stray_msg, Anything})
+ end.
+
+%% ---------------------------------------------------------------------------
+%% proper_statem
+%% ---------------------------------------------------------------------------
+
+initial_state() -> #state{seq = 1,
+ outstanding = dict:new(),
+ instrumented = dict:new(),
+ monitors = dict:new(),
+ all_join = sets:new(),
+ to_join = sets:new(),
+ to_leave = sets:new()}.
+
+command(S) ->
+ case {length(gms_symb_not_left(S)), length(gms_symb(S))} of
+ {0, 0} -> qc_join(S);
+ {0, _} -> frequency([{1, qc_join(S)},
+ {3, qc_proceed1(S)},
+ {5, qc_proceed2(S)}]);
+ _ -> frequency([{1, qc_join(S)},
+ {1, qc_leave(S)},
+ {10, qc_send(S)},
+ {5, qc_proceed1(S)},
+ {15, qc_proceed2(S)}])
+ end.
+
+qc_join(_S) -> {call,?MODULE,do_join, []}.
+qc_leave(S) -> {call,?MODULE,do_leave,[oneof(gms_symb_not_left(S))]}.
+qc_send(S) -> {call,?MODULE,do_send, [oneof(gms_symb_not_left(S))]}.
+qc_proceed1(S) -> {call,?MODULE,do_proceed1, [oneof(gms_symb(S))]}.
+qc_proceed2(S) -> {call,?MODULE,do_proceed2, [oneof(gms_symb(S)),
+ oneof(gms_symb(S))]}.
+
+precondition(S, {call, ?MODULE, do_join, []}) ->
+ length(gms_symb(S)) < ?MAX_SIZE;
+
+precondition(_S, {call, ?MODULE, do_leave, [_GM]}) ->
+ true;
+
+precondition(_S, {call, ?MODULE, do_send, [_GM]}) ->
+ true;
+
+precondition(_S, {call, ?MODULE, do_proceed1, [_GM]}) ->
+ true;
+
+precondition(_S, {call, ?MODULE, do_proceed2, [GM1, GM2]}) ->
+ GM1 =/= GM2.
+
+postcondition(_S, {call, _M, _F, _A}, _Res) ->
+ true.
+
+next_state(S = #state{to_join = ToSet,
+ all_join = AllSet}, GM, {call, ?MODULE, do_join, []}) ->
+ S#state{to_join = sets:add_element(GM, ToSet),
+ all_join = sets:add_element(GM, AllSet)};
+
+next_state(S = #state{to_leave = Set}, _Res, {call, ?MODULE, do_leave, [GM]}) ->
+ S#state{to_leave = sets:add_element(GM, Set)};
+
+next_state(S = #state{seq = Seq,
+ outstanding = Outstanding}, _Res,
+ {call, ?MODULE, do_send, [GM]}) ->
+ case is_pid(GM) andalso lists:member(GM, gms_joined(S)) of
+ true ->
+ %% Dynamic state, i.e. runtime
+ Msg = [{sequence, Seq},
+ {sent_to, GM},
+ {dests, gms_joined(S)}],
+ gm:broadcast(GM, Msg),
+ Outstanding1 = dict:map(
+ fun (_GM, Set) ->
+ gb_sets:add_element(Msg, Set)
+ end, Outstanding),
+ drain(S#state{seq = Seq + 1,
+ outstanding = Outstanding1});
+ false ->
+ S
+ end;
+
+next_state(S, _Res, {call, ?MODULE, do_proceed1, [Pid]}) ->
+ proceed(Pid, S);
+
+next_state(S, _Res, {call, ?MODULE, do_proceed2, [From, To]}) ->
+ proceed({From, To}, S).
+
+proceed(K, S = #state{instrumented = Msgs}) ->
+ case dict:find(K, Msgs) of
+ {ok, Q} -> case queue:out(Q) of
+ {{value, Thing}, Q2} ->
+ S2 = proceed(K, Thing, S),
+ S2#state{instrumented = dict:store(K, Q2, Msgs)};
+ {empty, _} ->
+ S
+ end;
+ error -> S
+ end.
+
+%% ---------------------------------------------------------------------------
+%% GM
+%% ---------------------------------------------------------------------------
+
+joined(Pid, _Members) -> Pid ! {joined, self()},
+ ok.
+members_changed(_Pid, _Bs, _Ds) -> ok.
+handle_msg(Pid, _From, Msg) -> Pid ! {gm, self(), Msg}, ok.
+terminate(Pid, _Reason) -> Pid ! {left, self()}.
+
+%% ---------------------------------------------------------------------------
+%% Helpers
+%% ---------------------------------------------------------------------------
+
+do_join() ->
+ {ok, GM} = gm:start_link(?GROUP, ?MODULE, self(),
+ fun execute_mnesia_transaction/1),
+ GM.
+
+do_leave(GM) ->
+ gm:leave(GM),
+ GM.
+
+%% We need to update the state, so do the work in next_state
+do_send( _GM) -> ok.
+do_proceed1(_Pid) -> ok.
+do_proceed2(_From, _To) -> ok.
+
+%% All GMs, joined and to join
+gms(#state{outstanding = Outstanding,
+ to_join = ToJoin}) ->
+ dict:fetch_keys(Outstanding) ++ sets:to_list(ToJoin).
+
+%% All GMs, joined and to join
+gms_joined(#state{outstanding = Outstanding}) ->
+ dict:fetch_keys(Outstanding).
+
+%% All GMs including those that have left (symbolic)
+gms_symb(#state{all_join = AllJoin}) ->
+ sets:to_list(AllJoin).
+
+%% All GMs not including those that have left (symbolic)
+gms_symb_not_left(#state{all_join = AllJoin,
+ to_leave = ToLeave}) ->
+ sets:to_list(sets:subtract(AllJoin, ToLeave)).
+
+drain(S) ->
+ receive
+ Msg -> drain(handle_msg(Msg, S))
+ after 10 -> S
+ end.
+
+drain_and_proceed_gms(S0) ->
+ S = #state{instrumented = Msgs} = drain(S0),
+ case dict:size(Msgs) of
+ 0 -> S;
+ _ -> S1 = dict:fold(
+ fun (Key, Q, Si) ->
+ lists:foldl(
+ fun (Msg, Sij) ->
+ proceed(Key, Msg, Sij)
+ end, Si, queue:to_list(Q))
+ end, S, Msgs),
+ drain_and_proceed_gms(S1#state{instrumented = dict:new()})
+ end.
+
+handle_msg({gm, GM, Msg}, S = #state{outstanding = Outstanding}) ->
+ case dict:find(GM, Outstanding) of
+ {ok, Set} ->
+ Set2 = gb_sets:del_element(Msg, Set),
+ S#state{outstanding = dict:store(GM, Set2, Outstanding)};
+ error ->
+ %% Message from GM that has already died. OK.
+ S
+ end;
+handle_msg({instrumented, Key, Thing}, S = #state{instrumented = Msgs}) ->
+ Q1 = case dict:find(Key, Msgs) of
+ {ok, Q} -> queue:in(Thing, Q);
+ error -> queue:from_list([Thing])
+ end,
+ S#state{instrumented = dict:store(Key, Q1, Msgs)};
+handle_msg({joined, GM}, S = #state{outstanding = Outstanding,
+ to_join = ToJoin}) ->
+ S#state{outstanding = dict:store(GM, gb_sets:empty(), Outstanding),
+ to_join = sets:del_element(GM, ToJoin)};
+handle_msg({left, GM}, S = #state{outstanding = Outstanding,
+ to_join = ToJoin}) ->
+ true = dict:is_key(GM, Outstanding) orelse sets:is_element(GM, ToJoin),
+ S#state{outstanding = dict:erase(GM, Outstanding),
+ to_join = sets:del_element(GM, ToJoin)};
+handle_msg({'DOWN', MRef, _, From, _} = Msg, S = #state{monitors = Mons}) ->
+ To = dict:fetch(MRef, Mons),
+ handle_msg({instrumented, {From, To}, {info, Msg}},
+ S#state{monitors = dict:erase(MRef, Mons)});
+handle_msg({'EXIT', _From, normal}, S) ->
+ S;
+handle_msg({'EXIT', _From, Reason}, _S) ->
+ %% We just trapped exits to get nicer SASL logging.
+ exit(Reason).
+
+proceed({_From, To}, {cast, Msg}, S) -> gen_server2:cast(To, Msg), S;
+proceed({_From, To}, {info, Msg}, S) -> To ! Msg, S;
+proceed({From, _To}, {wait, Ref}, S) -> From ! {proceed, Ref}, S;
+proceed({From, To}, {mon, Ref}, S) -> add_monitor(From, To, Ref, S);
+proceed(_Pid, {demon, MRef}, S) -> erlang:demonitor(MRef), S;
+proceed(Pid, {wait, Ref}, S) -> Pid ! {proceed, Ref}, S.
+
+%% NB From here is To in handle_msg/DOWN above, since the msg is going
+%% the other way
+add_monitor(From, To, Ref, S = #state{monitors = Mons}) ->
+ MRef = erlang:monitor(process, To),
+ From ! {mref, Ref, MRef},
+ S#state{monitors = dict:store(MRef, From, Mons)}.
+
+%% ----------------------------------------------------------------------------
+%% Assertions
+%% ----------------------------------------------------------------------------
+
+ensure_joiners_joined_and_msgs_received(S0) ->
+ S = drain_and_proceed_gms(S0),
+ case outstanding_joiners(S) of
+ true -> ensure_joiners_joined_and_msgs_received(S);
+ false -> case outstanding_msgs(S) of
+ [] -> S;
+ Out -> exit({outstanding_msgs, Out})
+ end
+ end.
+
+outstanding_joiners(#state{to_join = ToJoin}) ->
+ sets:size(ToJoin) > 0.
+
+outstanding_msgs(#state{outstanding = Outstanding}) ->
+ dict:fold(fun (GM, Set, OS) ->
+ case gb_sets:is_empty(Set) of
+ true -> OS;
+ false -> [{GM, gb_sets:to_list(Set)} | OS]
+ end
+ end, [], Outstanding).
+
+%% ---------------------------------------------------------------------------
+%% For insertion into GM
+%% ---------------------------------------------------------------------------
+
+call(Pid, Msg, infinity) ->
+ Ref = make_ref(),
+ whereis(?MODULE) ! {instrumented, {self(), Pid}, {wait, Ref}},
+ receive
+ {proceed, Ref} -> ok
+ end,
+ gen_server2:call(Pid, Msg, infinity).
+
+cast(Pid, Msg) ->
+ whereis(?MODULE) ! {instrumented, {self(), Pid}, {cast, Msg}},
+ ok.
+
+monitor(Pid) ->
+ Ref = make_ref(),
+ whereis(?MODULE) ! {instrumented, {self(), Pid}, {mon, Ref}},
+ receive
+ {mref, Ref, MRef} -> MRef
+ end.
+
+demonitor(MRef) ->
+ whereis(?MODULE) ! {instrumented, self(), {demon, MRef}},
+ true.
+
+execute_mnesia_transaction(Fun) ->
+ Ref = make_ref(),
+ whereis(?MODULE) ! {instrumented, self(), {wait, Ref}},
+ receive
+ {proceed, Ref} -> ok
+ end,
+ rabbit_misc:execute_mnesia_transaction(Fun).
+
+-else.
+
+-export([prop_disabled/0]).
+
+prop_disabled() ->
+ exit({compiled_without_proper,
+ "PropEr was not present during compilation of the test module. "
+ "Hence all tests are disabled."}).
+
+-endif.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 29e38c1f..b00a1ad7 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -22,10 +22,9 @@
stop_and_halt/0, await_startup/0, status/0, is_running/0,
is_running/1, environment/0, rotate_logs/1, force_event_refresh/1,
start_fhc/0]).
-
-export([start/2, stop/1]).
-
--export([log_location/1]). %% for testing
+-export([start_apps/1, stop_apps/1]).
+-export([log_location/1, config_files/0]). %% for testing and mgmt-agent
%%---------------------------------------------------------------------------
%% Boot steps.
@@ -75,13 +74,6 @@
{requires, external_infrastructure},
{enables, kernel_ready}]}).
--rabbit_boot_step({rabbit_log,
- [{description, "logging server"},
- {mfa, {rabbit_sup, start_restartable_child,
- [rabbit_log]}},
- {requires, external_infrastructure},
- {enables, kernel_ready}]}).
-
-rabbit_boot_step({rabbit_event,
[{description, "statistics event manager"},
{mfa, {rabbit_sup, start_restartable_child,
@@ -202,6 +194,7 @@
%% practice 2 processes seems just as fast as any other number > 1,
%% and keeps the progress bar realistic-ish.
-define(HIPE_PROCESSES, 2).
+-define(ASYNC_THREADS_WARNING_THRESHOLD, 8).
%%----------------------------------------------------------------------------
@@ -211,6 +204,7 @@
%% this really should be an abstract type
-type(log_location() :: 'tty' | 'undefined' | file:filename()).
-type(param() :: atom()).
+-type(app_name() :: atom()).
-spec(start/0 :: () -> 'ok').
-spec(boot/0 :: () -> 'ok').
@@ -242,6 +236,8 @@
-spec(maybe_insert_default_data/0 :: () -> 'ok').
-spec(boot_delegate/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
+-spec(start_apps/1 :: ([app_name()]) -> 'ok').
+-spec(stop_apps/1 :: ([app_name()]) -> 'ok').
-endif.
@@ -263,7 +259,7 @@ maybe_hipe_compile() ->
warn_if_hipe_compilation_failed(true) ->
ok;
warn_if_hipe_compilation_failed(false) ->
- error_logger:warning_msg(
+ rabbit_log:warning(
"Not HiPE compiling: HiPE not found in this Erlang installation.~n").
%% HiPE compilation happens before we have log handlers and can take a
@@ -312,9 +308,7 @@ start() ->
ok = ensure_working_log_handlers(),
rabbit_node_monitor:prepare_cluster_status_files(),
rabbit_mnesia:check_cluster_consistency(),
- ok = app_utils:start_applications(
- app_startup_order(), fun handle_app_error/2),
- ok = log_broker_started(rabbit_plugins:active())
+ broker_start()
end).
boot() ->
@@ -329,21 +323,14 @@ boot() ->
%% the upgrade, since if we are a secondary node the
%% primary node will have forgotten us
rabbit_mnesia:check_cluster_consistency(),
- Plugins = rabbit_plugins:setup(),
- ToBeLoaded = Plugins ++ ?APPS,
- ok = app_utils:load_applications(ToBeLoaded),
- StartupApps = app_utils:app_dependency_order(ToBeLoaded,
- false),
- ok = app_utils:start_applications(
- StartupApps, fun handle_app_error/2),
- ok = log_broker_started(Plugins)
+ broker_start()
end).
-handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) ->
- throw({could_not_start, App, Reason});
-
-handle_app_error(App, Reason) ->
- throw({could_not_start, App, Reason}).
+broker_start() ->
+ Plugins = rabbit_plugins:setup(),
+ ToBeLoaded = Plugins ++ ?APPS,
+ start_apps(ToBeLoaded),
+ ok = log_broker_started(rabbit_plugins:active()).
start_it(StartFun) ->
Marker = spawn_link(fun() -> receive stop -> ok end end),
@@ -371,22 +358,66 @@ start_it(StartFun) ->
stop() ->
case whereis(rabbit_boot) of
undefined -> ok;
- _ -> await_startup()
+ _ -> await_startup(true)
end,
- rabbit_log:info("Stopping RabbitMQ~n"),
- ok = app_utils:stop_applications(app_shutdown_order()).
+ rabbit_log:info("Stopping RabbitMQ~n", []),
+ Apps = ?APPS ++ rabbit_plugins:active(),
+ stop_apps(app_utils:app_dependency_order(Apps, true)).
stop_and_halt() ->
try
stop()
after
- rabbit_misc:local_info_msg("Halting Erlang VM~n", []),
+ rabbit_log:info("Halting Erlang VM~n", []),
init:stop()
end,
ok.
+start_apps(Apps) ->
+ app_utils:load_applications(Apps),
+ OrderedApps = app_utils:app_dependency_order(Apps, false),
+ case lists:member(rabbit, Apps) of
+ false -> run_boot_steps(Apps); %% plugin activation
+ true -> ok %% will run during start of rabbit app
+ end,
+ ok = app_utils:start_applications(OrderedApps,
+ handle_app_error(could_not_start)).
+
+stop_apps(Apps) ->
+ ok = app_utils:stop_applications(
+ Apps, handle_app_error(error_during_shutdown)),
+ case lists:member(rabbit, Apps) of
+ false -> run_cleanup_steps(Apps); %% plugin deactivation
+ true -> ok %% it's all going anyway
+ end,
+ ok.
+
+handle_app_error(Term) ->
+ fun(App, {bad_return, {_MFA, {'EXIT', {ExitReason, _}}}}) ->
+ throw({Term, App, ExitReason});
+ (App, Reason) ->
+ throw({Term, App, Reason})
+ end.
+
+run_cleanup_steps(Apps) ->
+ [run_step(Name, Attrs, cleanup) || {_, Name, Attrs} <- find_steps(Apps)],
+ ok.
+
await_startup() ->
- app_utils:wait_for_applications(app_startup_order()).
+ await_startup(false).
+
+await_startup(HaveSeenRabbitBoot) ->
+ %% We don't take absence of rabbit_boot as evidence we've started,
+ %% since there's a small window before it is registered.
+ case whereis(rabbit_boot) of
+ undefined -> case HaveSeenRabbitBoot orelse is_running() of
+ true -> ok;
+ false -> timer:sleep(100),
+ await_startup(false)
+ end;
+ _ -> timer:sleep(100),
+ await_startup(true)
+ end.
status() ->
S1 = [{pid, list_to_integer(os:getpid())},
@@ -437,6 +468,9 @@ listeners() ->
ip_address = IP,
port = Port} <- Listeners, Node =:= node()].
+%% TODO this only determines if the rabbit application has started,
+%% not if it is running, never mind plugins. It would be nice to have
+%% more nuance here.
is_running() -> is_running(node()).
is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit).
@@ -447,7 +481,7 @@ environment() ->
rotate_logs(BinarySuffix) ->
Suffix = binary_to_list(BinarySuffix),
- rabbit_misc:local_info_msg("Rotating logs with suffix '~s'~n", [Suffix]),
+ rabbit_log:info("Rotating logs with suffix '~s'~n", [Suffix]),
log_rotation_result(rotate_logs(log_location(kernel),
Suffix,
rabbit_error_logger_file_h),
@@ -461,14 +495,15 @@ start(normal, []) ->
case erts_version_check() of
ok ->
{ok, Vsn} = application:get_key(rabbit, vsn),
- error_logger:info_msg("Starting RabbitMQ ~s on Erlang ~s~n~s~n~s~n",
- [Vsn, erlang:system_info(otp_release),
- ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
+ rabbit_log:info("Starting RabbitMQ ~s on Erlang ~s~n~s~n~s~n",
+ [Vsn, erlang:system_info(otp_release),
+ ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
{ok, SupPid} = rabbit_sup:start_link(),
true = register(rabbit, self()),
print_banner(),
log_banner(),
- [ok = run_boot_step(Step) || Step <- boot_steps()],
+ warn_if_kernel_config_dubious(),
+ run_boot_steps(),
{ok, SupPid};
Error ->
Error
@@ -483,42 +518,42 @@ stop(_State) ->
ok.
%%---------------------------------------------------------------------------
-%% application life cycle
+%% boot step logic
-app_startup_order() ->
- ok = app_utils:load_applications(?APPS),
- app_utils:app_dependency_order(?APPS, false).
+run_boot_steps() ->
+ run_boot_steps([App || {App, _, _} <- application:loaded_applications()]).
-app_shutdown_order() ->
- Apps = ?APPS ++ rabbit_plugins:active(),
- app_utils:app_dependency_order(Apps, true).
+run_boot_steps(Apps) ->
+ [ok = run_step(Step, Attrs, mfa) || {_, Step, Attrs} <- find_steps(Apps)],
+ ok.
-%%---------------------------------------------------------------------------
-%% boot step logic
+find_steps(Apps) ->
+ All = sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)),
+ [Step || {App, _, _} = Step <- All, lists:member(App, Apps)].
-run_boot_step({_StepName, Attributes}) ->
- case [MFA || {mfa, MFA} <- Attributes] of
+run_step(StepName, Attributes, AttributeName) ->
+ case [MFA || {Key, MFA} <- Attributes,
+ Key =:= AttributeName] of
[] ->
ok;
MFAs ->
[try
apply(M,F,A)
of
- ok -> ok;
- {error, Reason} -> boot_error(Reason, not_available)
+ ok -> ok;
+ {error, Reason} -> boot_error({boot_step, StepName, Reason},
+ not_available)
catch
- _:Reason -> boot_error(Reason, erlang:get_stacktrace())
+ _:Reason -> boot_error({boot_step, StepName, Reason},
+ erlang:get_stacktrace())
end || {M,F,A} <- MFAs],
ok
end.
-boot_steps() ->
- sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)).
-
-vertices(_Module, Steps) ->
- [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps].
+vertices({AppName, _Module, Steps}) ->
+ [{StepName, {AppName, StepName, Atts}} || {StepName, Atts} <- Steps].
-edges(_Module, Steps) ->
+edges({_AppName, _Module, Steps}) ->
[case Key of
requires -> {StepName, OtherStep};
enables -> {OtherStep, StepName}
@@ -527,7 +562,7 @@ edges(_Module, Steps) ->
Key =:= requires orelse Key =:= enables].
sort_boot_steps(UnsortedSteps) ->
- case rabbit_misc:build_acyclic_graph(fun vertices/2, fun edges/2,
+ case rabbit_misc:build_acyclic_graph(fun vertices/1, fun edges/1,
UnsortedSteps) of
{ok, G} ->
%% Use topological sort to find a consistent ordering (if
@@ -541,8 +576,8 @@ sort_boot_steps(UnsortedSteps) ->
digraph:delete(G),
%% Check that all mentioned {M,F,A} triples are exported.
case [{StepName, {M,F,A}} ||
- {StepName, Attributes} <- SortedSteps,
- {mfa, {M,F,A}} <- Attributes,
+ {_App, StepName, Attributes} <- SortedSteps,
+ {mfa, {M,F,A}} <- Attributes,
not erlang:function_exported(M, F, length(A))] of
[] -> SortedSteps;
MissingFunctions -> basic_boot_error(
@@ -603,7 +638,7 @@ boot_error(Reason, Fmt, Args, Stacktrace) ->
basic_boot_error(Reason, Format, Args) ->
io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
- rabbit_misc:local_info_msg(Format, Args),
+ rabbit_log:info(Format, Args),
timer:sleep(1000),
exit({?MODULE, failure_during_boot, Reason}).
@@ -727,11 +762,11 @@ force_event_refresh(Ref) ->
%% misc
log_broker_started(Plugins) ->
- rabbit_misc:with_local_io(
+ rabbit_log:with_local_io(
fun() ->
PluginList = iolist_to_binary([rabbit_misc:format(" * ~s~n", [P])
|| P <- Plugins]),
- error_logger:info_msg(
+ rabbit_log:info(
"Server startup complete; ~b plugins started.~n~s",
[length(Plugins), PluginList]),
io:format(" completed with ~p plugins.~n", [length(Plugins)])
@@ -780,7 +815,31 @@ log_banner() ->
{K, V} ->
Format(K, V)
end || S <- Settings]),
- error_logger:info_msg("~s", [Banner]).
+ rabbit_log:info("~s", [Banner]).
+
+warn_if_kernel_config_dubious() ->
+ case erlang:system_info(kernel_poll) of
+ true -> ok;
+ false -> rabbit_log:warning(
+ "Kernel poll (epoll, kqueue, etc) is disabled. Throughput "
+ "and CPU utilization may worsen.~n")
+ end,
+ AsyncThreads = erlang:system_info(thread_pool_size),
+ case AsyncThreads < ?ASYNC_THREADS_WARNING_THRESHOLD of
+ true -> rabbit_log:warning(
+ "Erlang VM is running with ~b I/O threads, "
+ "file I/O performance may worsen~n", [AsyncThreads]);
+ false -> ok
+ end,
+ IDCOpts = case application:get_env(kernel, inet_default_connect_options) of
+ undefined -> [];
+ {ok, Val} -> Val
+ end,
+ case proplists:get_value(nodelay, IDCOpts, false) of
+ false -> rabbit_log:warning("Nagle's algorithm is enabled for sockets, "
+ "network I/O latency will be higher~n");
+ true -> ok
+ end.
home_dir() ->
case init:get_argument(home) of
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index e45e026e..692179fc 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -18,7 +18,7 @@
-export([recover/0, stop/0, start/1, declare/5, declare/6,
delete_immediately/1, delete/3, purge/1, forget_all_durable/1]).
--export([pseudo_queue/2]).
+-export([pseudo_queue/2, immutable/1]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
@@ -29,8 +29,8 @@
-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
--export([on_node_down/1]).
--export([update/2, store_queue/1, policy_changed/2]).
+-export([on_node_up/1, on_node_down/1]).
+-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]).
-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
cancel_sync_mirrors/1]).
@@ -174,9 +174,12 @@
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()).
+-spec(immutable/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()).
-spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(update_decorators/1 :: (name()) -> 'ok').
-spec(policy_changed/2 ::
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-spec(start_mirroring/1 :: (pid()) -> 'ok').
@@ -254,15 +257,17 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
%% effect) this might not be possible to satisfy.
declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
ok = check_declare_arguments(QueueName, Args),
- Q = rabbit_policy:set(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
- exclusive_owner = Owner,
- pid = none,
- slave_pids = [],
- sync_slave_pids = [],
- gm_pids = []}),
+ Q = rabbit_queue_decorator:set(
+ rabbit_policy:set(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = none,
+ slave_pids = [],
+ sync_slave_pids = [],
+ down_slave_nodes = [],
+ gm_pids = []})),
Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity).
@@ -308,12 +313,24 @@ store_queue(Q = #amqqueue{durable = true}) ->
ok = mnesia:write(rabbit_durable_queue,
Q#amqqueue{slave_pids = [],
sync_slave_pids = [],
- gm_pids = []}, write),
- ok = mnesia:write(rabbit_queue, Q, write),
- ok;
+ gm_pids = [],
+ decorators = undefined}, write),
+ store_queue_ram(Q);
store_queue(Q = #amqqueue{durable = false}) ->
- ok = mnesia:write(rabbit_queue, Q, write),
- ok.
+ store_queue_ram(Q).
+
+store_queue_ram(Q) ->
+ ok = mnesia:write(rabbit_queue, rabbit_queue_decorator:set(Q), write).
+
+update_decorators(Name) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ case mnesia:wread({rabbit_queue, Name}) of
+ [Q] -> store_queue_ram(Q),
+ ok;
+ [] -> ok
+ end
+ end).
policy_changed(Q1 = #amqqueue{decorators = Decorators1},
Q2 = #amqqueue{decorators = Decorators2}) ->
@@ -444,7 +461,8 @@ declare_args() ->
[{<<"x-expires">>, fun check_expires_arg/2},
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
- {<<"x-max-length">>, fun check_non_neg_int_arg/2}].
+ {<<"x-max-length">>, fun check_non_neg_int_arg/2},
+ {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}].
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
{<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}].
@@ -650,15 +668,23 @@ forget_all_durable(Node) ->
fun () ->
Qs = mnesia:match_object(rabbit_durable_queue,
#amqqueue{_ = '_'}, write),
- [rabbit_binding:process_deletions(
- internal_delete1(Name, true)) ||
- #amqqueue{name = Name, pid = Pid} = Q <- Qs,
- node(Pid) =:= Node,
- rabbit_policy:get(<<"ha-mode">>, Q) =:= undefined],
+ [forget_node_for_queue(Q) || #amqqueue{pid = Pid} = Q <- Qs,
+ node(Pid) =:= Node],
ok
end),
ok.
+forget_node_for_queue(#amqqueue{name = Name,
+ down_slave_nodes = []}) ->
+ %% No slaves to recover from, queue is gone
+ rabbit_binding:process_deletions(internal_delete1(Name, true));
+
+forget_node_for_queue(Q = #amqqueue{down_slave_nodes = [H|T]}) ->
+ %% Promote a slave while down - it'll happily recover as a master
+ Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H),
+ down_slave_nodes = T},
+ ok = mnesia:write(rabbit_durable_queue, Q1, write).
+
run_backing_queue(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
@@ -674,6 +700,20 @@ stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring).
sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors).
cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors).
+on_node_up(Node) ->
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ Qs = mnesia:match_object(rabbit_queue,
+ #amqqueue{_ = '_'}, write),
+ [case lists:member(Node, DSNs) of
+ true -> DSNs1 = DSNs -- [Node],
+ store_queue(
+ Q#amqqueue{down_slave_nodes = DSNs1});
+ false -> ok
+ end || #amqqueue{down_slave_nodes = DSNs} = Q <- Qs],
+ ok
+ end).
+
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
@@ -709,6 +749,14 @@ pseudo_queue(QueueName, Pid) ->
pid = Pid,
slave_pids = []}.
+immutable(Q) -> Q#amqqueue{pid = none,
+ slave_pids = none,
+ sync_slave_pids = none,
+ down_slave_nodes = none,
+ gm_pids = none,
+ policy = none,
+ decorators = none}.
+
deliver([], _Delivery, _Flow) ->
%% /dev/null optimisation
[];
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9b785303..db297c1d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -52,6 +52,7 @@
dlx,
dlx_routing_key,
max_length,
+ max_bytes,
args_policy_version,
status
}).
@@ -84,6 +85,7 @@
memory,
slave_pids,
synchronised_slave_pids,
+ down_slave_nodes,
backing_queue_status,
state
]).
@@ -102,7 +104,8 @@
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
-info_keys() -> ?INFO_KEYS.
+info_keys() -> ?INFO_KEYS ++ rabbit_backing_queue:info_keys().
+statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys().
%%----------------------------------------------------------------------------
@@ -263,7 +266,8 @@ process_args_policy(State = #q{q = Q,
{<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2},
{<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2},
{<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
- {<<"max-length">>, fun res_min/2, fun init_max_length/2}],
+ {<<"max-length">>, fun res_min/2, fun init_max_length/2},
+ {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}],
drop_expired_msgs(
lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
Fun(args_policy_lookup(Name, Resolve, Q), StateN)
@@ -302,6 +306,10 @@ init_max_length(MaxLen, State) ->
{_Dropped, State1} = maybe_drop_head(State#q{max_length = MaxLen}),
State1.
+init_max_bytes(MaxBytes, State) ->
+ {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}),
+ State1.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS, consumers = Consumers} =
lists:foldl(fun (F, S) -> F(S) end, State,
@@ -385,15 +393,13 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined,
V when V > 0 -> V + 999; %% always fire later
_ -> 0
end) div 1000,
- TRef = erlang:send_after(After, self(), {drop_expired, Version}),
+ TRef = rabbit_misc:send_after(After, self(), {drop_expired, Version}),
State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
ttl_timer_expiry = TExpiry})
when Expiry + 1000 < TExpiry ->
- case erlang:cancel_timer(TRef) of
- false -> State;
- _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
- end;
+ rabbit_misc:cancel_timer(TRef),
+ ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined});
ensure_ttl_timer(_Expiry, State) ->
State.
@@ -543,34 +549,41 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
%% remains unchanged, or if the newly published message
%% has no expiry and becomes the head of the queue then
%% the call is unnecessary.
- case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of
+ case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
{false, false, _} -> State4;
{true, true, undefined} -> State4;
{_, _, _} -> drop_expired_msgs(State4)
end
end.
-maybe_drop_head(State = #q{max_length = undefined}) ->
- {0, State};
-maybe_drop_head(State = #q{max_length = MaxLen,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- case BQ:len(BQS) - MaxLen of
- Excess when Excess > 0 ->
- {Excess,
- with_dlx(
- State#q.dlx,
- fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end,
- fun () ->
- {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) ->
- BQ:drop(false, BQS0)
- end, {ok, BQS},
- lists:seq(1, Excess)),
- State#q{backing_queue_state = BQS1}
- end)};
- _ -> {0, State}
+maybe_drop_head(State = #q{max_length = undefined,
+ max_bytes = undefined}) ->
+ {false, State};
+maybe_drop_head(State) ->
+ maybe_drop_head(false, State).
+
+maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ case over_max_length(State) of
+ true ->
+ maybe_drop_head(true,
+ with_dlx(
+ State#q.dlx,
+ fun (X) -> dead_letter_maxlen_msg(X, State) end,
+ fun () ->
+ {_, BQS1} = BQ:drop(false, BQS),
+ State#q{backing_queue_state = BQS1}
+ end));
+ false ->
+ {AlreadyDropped, State}
end.
+over_max_length(#q{max_length = MaxLen,
+ max_bytes = MaxBytes,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQ:len(BQS) > MaxLen orelse BQ:info(message_bytes_ready, BQS) > MaxBytes.
+
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
WasEmpty = BQ:is_empty(BQS),
@@ -664,9 +677,12 @@ subtract_acks(ChPid, AckTags, State = #q{consumers = Consumers}, Fun) ->
run_message_queue(true, Fun(State1))
end.
-message_properties(Message, Confirm, #q{ttl = TTL}) ->
+message_properties(Message = #basic_message{content = Content},
+ Confirm, #q{ttl = TTL}) ->
+ #content{payload_fragments_rev = PFR} = Content,
#message_properties{expiry = calculate_msg_expiry(Message, TTL),
- needs_confirming = Confirm == eventually}.
+ needs_confirming = Confirm == eventually,
+ size = iolist_size(PFR)}.
calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
#content{properties = Props} =
@@ -723,15 +739,12 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
end, rejected, X, State),
State1.
-dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) ->
+dead_letter_maxlen_msg(X, State = #q{backing_queue = BQ}) ->
{ok, State1} =
dead_letter_msgs(
fun (DLFun, Acc, BQS) ->
- lists:foldl(fun (_, {ok, Acc0, BQS0}) ->
- {{Msg, _, AckTag}, BQS1} =
- BQ:fetch(true, BQS0),
- {ok, DLFun(Msg, AckTag, Acc0), BQS1}
- end, {ok, Acc, BQS}, lists:seq(1, Excess))
+ {{Msg, _, AckTag}, BQS1} = BQ:fetch(true, BQS),
+ {ok, DLFun(Msg, AckTag, Acc), BQS1}
end, maxlen, X, State),
State1.
@@ -810,19 +823,25 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
+i(down_slave_nodes, #q{q = #amqqueue{name = Name,
+ durable = Durable}}) ->
+ {ok, Q = #amqqueue{down_slave_nodes = Nodes}} =
+ rabbit_amqqueue:lookup(Name),
+ case Durable andalso rabbit_mirror_queue_misc:is_mirrored(Q) of
+ false -> '';
+ true -> Nodes
+ end;
i(state, #q{status = running}) -> credit_flow:state();
i(state, #q{status = State}) -> State;
-i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
- BQ:status(BQS);
-i(Item, _) ->
- throw({bad_argument, Item}).
+i(Item, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
+ BQ:info(Item, BQS).
emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
ExtraKs = [K || {K, _} <- Extra],
- Infos = [{K, V} || {K, V} <- infos(?STATISTICS_KEYS, State),
+ Infos = [{K, V} || {K, V} <- infos(statistics_keys(), State),
not lists:member(K, ExtraKs)],
rabbit_event:notify(queue_stats, Extra ++ Infos).
@@ -922,7 +941,7 @@ handle_call({init, Recover}, From,
end;
handle_call(info, _From, State) ->
- reply(infos(?INFO_KEYS, State), State);
+ reply(infos(info_keys(), State), State);
handle_call({info, Items}, _From, State) ->
try
@@ -1165,7 +1184,7 @@ handle_cast({force_event_refresh, Ref},
emit_consumer_created(
Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref)
end,
- noreply(State);
+ noreply(rabbit_event:init_stats_timer(State, #q.stats_timer));
handle_cast(notify_decorators, State) ->
notify_decorators(State),
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 8f37bf60..098f5f43 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -16,6 +16,14 @@
-module(rabbit_backing_queue).
+-export([info_keys/0]).
+
+-define(INFO_KEYS, [messages_ram, messages_ready_ram,
+ messages_unacknowledged_ram, messages_persistent,
+ message_bytes, message_bytes_ready,
+ message_bytes_unacknowledged, message_bytes_ram,
+ message_bytes_persistent, backing_queue_status]).
+
-ifdef(use_specs).
%% We can't specify a per-queue ack/state with callback signatures
@@ -37,6 +45,8 @@
-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)).
-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
+-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
+
%% Called on startup with a list of durable queue names. The queues
%% aren't being started at this point, but this call allows the
%% backing queue to perform any checking necessary for the consistency
@@ -216,9 +226,7 @@
%% inbound messages and outbound messages at the moment.
-callback msg_rates(state()) -> {float(), float()}.
-%% Exists for debugging purposes, to be able to expose state via
-%% rabbitmqctl list_queues backing_queue_status
--callback status(state()) -> [{atom(), any()}].
+-callback info(atom(), state()) -> any().
%% Passed a function to be invoked with the relevant backing queue's
%% state. Useful for when the backing queue or other components need
@@ -243,9 +251,11 @@ behaviour_info(callbacks) ->
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
- {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, {status, 1},
- {invoke, 3}, {is_duplicate, 2}] ;
+ {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, {info_keys, 0},
+ {infos, 2}, {invoke, 3}, {is_duplicate, 2}] ;
behaviour_info(_Other) ->
undefined.
-endif.
+
+info_keys() -> ?INFO_KEYS.
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 49b71122..622b1b16 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -116,7 +116,8 @@ qc_publish(#state{bqstate = BQ}) ->
[qc_message(),
#message_properties{needs_confirming = frequency([{1, true},
{20, false}]),
- expiry = oneof([undefined | lists:seq(1, 10)])},
+ expiry = oneof([undefined | lists:seq(1, 10)]),
+ size = 10},
false, self(), BQ]}.
qc_publish_multiple(#state{}) ->
@@ -124,7 +125,7 @@ qc_publish_multiple(#state{}) ->
qc_publish_delivered(#state{bqstate = BQ}) ->
{call, ?BQMOD, publish_delivered,
- [qc_message(), #message_properties{}, self(), BQ]}.
+ [qc_message(), #message_properties{size = 10}, self(), BQ]}.
qc_fetch(#state{bqstate = BQ}) ->
{call, ?BQMOD, fetch, [boolean(), BQ]}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 043ec7e3..d2f6719c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -341,7 +341,7 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) ->
handle_cast({force_event_refresh, Ref}, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State),
Ref),
- noreply(State);
+ noreply(rabbit_event:init_stats_timer(State, #ch.stats_timer));
handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) ->
%% NB: don't call noreply/1 since we don't want to send confirms.
@@ -433,17 +433,22 @@ send(_Command, #ch{state = closing}) ->
send(Command, #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Command).
-handle_exception(Reason, State = #ch{protocol = Protocol,
- channel = Channel,
- writer_pid = WriterPid,
- reader_pid = ReaderPid,
- conn_pid = ConnPid}) ->
+handle_exception(Reason, State = #ch{protocol = Protocol,
+ channel = Channel,
+ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
+ conn_pid = ConnPid,
+ conn_name = ConnName,
+ virtual_host = VHost,
+ user = User}) ->
%% something bad's happened: notify_queues may not be 'ok'
{_Result, State1} = notify_queues(State),
case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of
{Channel, CloseMethod} ->
- rabbit_log:error("connection ~p, channel ~p - soft error:~n~p~n",
- [ConnPid, Channel, Reason]),
+ rabbit_log:error("Channel error on connection ~p (~s, vhost: '~s',"
+ " user: '~s'), channel ~p:~n~p~n",
+ [ConnPid, ConnName, VHost, User#user.username,
+ Channel, Reason]),
ok = rabbit_writer:send_command(WriterPid, CloseMethod),
{noreply, State1};
{0, _} ->
@@ -668,8 +673,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
mandatory = Mandatory},
Content, State = #ch{virtual_host = VHostPath,
tx = Tx,
+ channel = ChannelNum,
confirm_enabled = ConfirmEnabled,
- trace_state = TraceState}) ->
+ trace_state = TraceState,
+ user = #user{username = Username},
+ conn_name = ConnName}) ->
check_msg_size(Content),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
@@ -690,7 +698,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
- rabbit_trace:tap_in(Message, TraceState),
+ rabbit_trace:tap_in(Message, ConnName, ChannelNum,
+ Username, TraceState),
Delivery = rabbit_basic:delivery(
Mandatory, DoConfirm, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
@@ -992,7 +1001,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
QueueName,
fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
Q, Durable, AutoDelete, Args, Owner),
- rabbit_amqqueue:stat(Q)
+ maybe_stat(NoWait, Q)
end) of
{ok, MessageCount, ConsumerCount} ->
return_queue_declare_ok(QueueName, NoWait, MessageCount,
@@ -1048,7 +1057,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
{{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
rabbit_amqqueue:with_or_die(
- QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
+ QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end),
ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
State);
@@ -1204,6 +1213,9 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
E
end.
+maybe_stat(false, Q) -> rabbit_amqqueue:stat(Q);
+maybe_stat(true, _Q) -> {ok, 0, 0}.
+
consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
queue_monitors = QMons,
@@ -1365,7 +1377,10 @@ record_sent(ConsumerTag, AckRequired,
Msg = {QName, QPid, MsgId, Redelivered, _Message},
State = #ch{unacked_message_q = UAMQ,
next_tag = DeliveryTag,
- trace_state = TraceState}) ->
+ trace_state = TraceState,
+ user = #user{username = Username},
+ conn_name = ConnName,
+ channel = ChannelNum}) ->
?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of
{none, true} -> get;
{none, false} -> get_no_ack;
@@ -1376,7 +1391,7 @@ record_sent(ConsumerTag, AckRequired,
true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State);
false -> ok
end,
- rabbit_trace:tap_out(Msg, TraceState),
+ rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
UAMQ1 = case AckRequired of
true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
UAMQ);
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
index 81c17fbf..db9349ac 100644
--- a/src/rabbit_channel_interceptor.erl
+++ b/src/rabbit_channel_interceptor.erl
@@ -33,7 +33,7 @@
-callback description() -> [proplists:property()].
-callback intercept(original_method(), rabbit_types:vhost()) ->
- rabbit_types:ok_or_error2(processed_method(), any()).
+ processed_method() | rabbit_misc:channel_or_connection_exit().
%% Whether the interceptor wishes to intercept the amqp method
-callback applies_to(intercept_method()) -> boolean().
@@ -62,20 +62,15 @@ intercept_method(M, VHost) ->
intercept_method(M, _VHost, []) ->
M;
intercept_method(M, VHost, [I]) ->
- case I:intercept(M, VHost) of
- {ok, M2} ->
- case validate_method(M, M2) of
- true ->
- M2;
- _ ->
- internal_error("Interceptor: ~p expected "
- "to return method: ~p but returned: ~p",
- [I, rabbit_misc:method_record_type(M),
- rabbit_misc:method_record_type(M2)])
- end;
- {error, Reason} ->
- internal_error("Interceptor: ~p failed with reason: ~p",
- [I, Reason])
+ M2 = I:intercept(M, VHost),
+ case validate_method(M, M2) of
+ true ->
+ M2;
+ _ ->
+ internal_error("Interceptor: ~p expected "
+ "to return method: ~p but returned: ~p",
+ [I, rabbit_misc:method_record_type(M),
+ rabbit_misc:method_record_type(M2)])
end;
intercept_method(M, _VHost, Is) ->
internal_error("More than one interceptor for method: ~p -- ~p",
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 451f4d70..70fe10e6 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -54,6 +54,7 @@
change_cluster_node_type,
update_cluster_nodes,
{forget_cluster_node, [?OFFLINE_DEF]},
+ force_boot,
cluster_status,
{sync_queue, [?VHOST_DEF]},
{cancel_sync_queue, [?VHOST_DEF]},
@@ -114,6 +115,12 @@
{"Policies", rabbit_policy, list_formatted, info_keys},
{"Parameters", rabbit_runtime_parameters, list_formatted, info_keys}]).
+-define(COMMANDS_NOT_REQUIRING_APP,
+ [stop, stop_app, start_app, wait, reset, force_reset, rotate_logs,
+ join_cluster, change_cluster_node_type, update_cluster_nodes,
+ forget_cluster_node, cluster_status, status, environment, eval,
+ force_boot]).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -131,6 +138,7 @@
%%----------------------------------------------------------------------------
start() ->
+ start_distribution(),
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
{Command, Opts, Args} =
case parse_arguments(init:get_plain_arguments(), NodeStr) of
@@ -154,7 +162,7 @@ start() ->
%% The reason we don't use a try/catch here is that rpc:call turns
%% thrown errors into normal return values
- case catch action(Command, Node, Args, Opts, Inform) of
+ case catch do_action(Command, Node, Args, Opts, Inform) of
ok ->
case Quiet of
true -> ok;
@@ -249,6 +257,15 @@ parse_arguments(CmdLine, NodeStr) ->
%%----------------------------------------------------------------------------
+do_action(Command, Node, Args, Opts, Inform) ->
+ case lists:member(Command, ?COMMANDS_NOT_REQUIRING_APP) of
+ false -> case ensure_app_running(Node) of
+ ok -> action(Command, Node, Args, Opts, Inform);
+ E -> E
+ end;
+ true -> action(Command, Node, Args, Opts, Inform)
+ end.
+
action(stop, Node, Args, _Opts, Inform) ->
Inform("Stopping and halting node ~p", [Node]),
Res = call(Node, {rabbit, stop_and_halt, []}),
@@ -302,8 +319,19 @@ action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) ->
ClusterNode = list_to_atom(ClusterNodeS),
RemoveWhenOffline = proplists:get_bool(?OFFLINE_OPT, Opts),
Inform("Removing node ~p from cluster", [ClusterNode]),
- rpc_call(Node, rabbit_mnesia, forget_cluster_node,
- [ClusterNode, RemoveWhenOffline]);
+ case RemoveWhenOffline of
+ true -> become(Node),
+ rabbit_mnesia:forget_cluster_node(ClusterNode, true);
+ false -> rpc_call(Node, rabbit_mnesia, forget_cluster_node,
+ [ClusterNode, false])
+ end;
+
+action(force_boot, Node, [], _Opts, Inform) ->
+ Inform("Forcing boot for Mnesia dir ~s", [mnesia:system_info(directory)]),
+ case rabbit:is_running(Node) of
+ false -> rabbit_mnesia:force_load_next_boot();
+ true -> {error, rabbit_running}
+ end;
action(sync_queue, Node, [Q], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
@@ -650,6 +678,22 @@ exit_loop(Port) ->
{Port, _} -> exit_loop(Port)
end.
+start_distribution() ->
+ CtlNodeName = rabbit_misc:format("rabbitmqctl-~s", [os:getpid()]),
+ {ok, _} = net_kernel:start([list_to_atom(CtlNodeName), shortnames]).
+
+become(BecomeNode) ->
+ case net_adm:ping(BecomeNode) of
+ pong -> exit({node_running, BecomeNode});
+ pang -> io:format(" * Impersonating node: ~s...", [BecomeNode]),
+ error_logger:tty(false),
+ ok = net_kernel:stop(),
+ {ok, _} = net_kernel:start([BecomeNode, shortnames]),
+ io:format(" done~n", []),
+ Dir = mnesia:system_info(directory),
+ io:format(" * Mnesia directory : ~s~n", [Dir])
+ end.
+
%%----------------------------------------------------------------------------
default_if_empty(List, Default) when is_list(List) ->
@@ -715,6 +759,17 @@ unsafe_rpc(Node, Mod, Fun, Args) ->
Normal -> Normal
end.
+ensure_app_running(Node) ->
+ case call(Node, {rabbit, is_running, []}) of
+ true -> ok;
+ false -> {error_string,
+ rabbit_misc:format(
+ "rabbit application is not running on node ~s.~n"
+ " * Suggestion: start it with \"rabbitmqctl start_app\" "
+ "and try again", [Node])};
+ Other -> Other
+ end.
+
call(Node, {Mod, Fun, Args}) ->
rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index ec32e687..728bc431 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -129,6 +129,9 @@ is_cycle(Queue, Deaths) ->
{longstr, <<"rejected">>} =/=
rabbit_misc:table_lookup(D, <<"reason">>);
(_) ->
+ %% There was something we didn't expect, therefore
+ %% a client must have put it there, therefore the
+ %% cycle was not "fully automatic".
false
end, Cycle ++ [H])
end.
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index b867223b..a33103fd 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -23,6 +23,7 @@
ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]).
-export([stats_level/2, if_enabled/3]).
-export([notify/2, notify/3, notify_if/3]).
+-export([sync_notify/2, sync_notify/3]).
%%----------------------------------------------------------------------------
@@ -61,6 +62,9 @@
-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
-spec(notify/3 :: (event_type(), event_props(), reference() | 'none') -> 'ok').
-spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok').
+-spec(sync_notify/2 :: (event_type(), event_props()) -> 'ok').
+-spec(sync_notify/3 :: (event_type(), event_props(),
+ reference() | 'none') -> 'ok').
-endif.
@@ -145,7 +149,16 @@ notify_if(false, _Type, _Props) -> ok.
notify(Type, Props) -> notify(Type, Props, none).
notify(Type, Props, Ref) ->
- gen_event:notify(?MODULE, #event{type = Type,
- props = Props,
- reference = Ref,
- timestamp = os:timestamp()}).
+ gen_event:notify(?MODULE, event_cons(Type, Props, Ref)).
+
+sync_notify(Type, Props) -> sync_notify(Type, Props, none).
+
+sync_notify(Type, Props, Ref) ->
+ gen_event:sync_notify(?MODULE, event_cons(Type, Props, Ref)).
+
+event_cons(Type, Props, Ref) ->
+ #event{type = Type,
+ props = Props,
+ reference = Ref,
+ timestamp = os:timestamp()}.
+
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 685c311f..8b944763 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -20,7 +20,8 @@
-export([recover/0, policy_changed/2, callback/4, declare/6,
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
- lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3,
+ lookup/1, lookup_or_die/1, list/0, list/1, lookup_scratch/2,
+ update_scratch/3, update_decorators/1, immutable/1,
info_keys/0, info/1, info/2, info_all/1, info_all/2,
route/2, delete/2, validate_binding/2]).
%% these must be run inside a mnesia tx
@@ -61,6 +62,7 @@
-spec(lookup_or_die/1 ::
(name()) -> rabbit_types:exchange() |
rabbit_types:channel_exit()).
+-spec(list/0 :: () -> [rabbit_types:exchange()]).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]).
-spec(lookup_scratch/2 :: (name(), atom()) ->
rabbit_types:ok(term()) |
@@ -70,6 +72,8 @@
(name(),
fun((rabbit_types:exchange()) -> rabbit_types:exchange()))
-> not_found | rabbit_types:exchange()).
+-spec(update_decorators/1 :: (name()) -> 'ok').
+-spec(immutable/1 :: (rabbit_types:exchange()) -> rabbit_types:exchange()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()).
-spec(info/2 ::
@@ -106,24 +110,15 @@ recover() ->
mnesia:read({rabbit_exchange, XName}) =:= []
end,
fun (X, Tx) ->
- case Tx of
- true -> store(X);
- false -> ok
- end,
- callback(X, create, map_create_tx(Tx), [X])
+ X1 = case Tx of
+ true -> store_ram(X);
+ false -> rabbit_exchange_decorator:set(X)
+ end,
+ callback(X1, create, map_create_tx(Tx), [X1])
end,
rabbit_durable_exchange),
- report_missing_decorators(Xs),
[XName || #exchange{name = XName} <- Xs].
-report_missing_decorators(Xs) ->
- Mods = lists:usort(lists:append([rabbit_exchange_decorator:select(raw, D) ||
- #exchange{decorators = D} <- Xs])),
- case [M || M <- Mods, code:which(M) =:= non_existing] of
- [] -> ok;
- M -> rabbit_log:warning("Missing exchange decorators: ~p~n", [M])
- end.
-
callback(X = #exchange{type = XType,
decorators = Decorators}, Fun, Serial0, Args) ->
Serial = if is_function(Serial0) -> Serial0;
@@ -158,12 +153,13 @@ serial(#exchange{name = XName} = X) ->
end.
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
- X = rabbit_policy:set(#exchange{name = XName,
- type = Type,
- durable = Durable,
- auto_delete = AutoDelete,
- internal = Internal,
- arguments = Args}),
+ X = rabbit_exchange_decorator:set(
+ rabbit_policy:set(#exchange{name = XName,
+ type = Type,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ internal = Internal,
+ arguments = Args})),
XT = type_to_module(Type),
%% We want to upset things if it isn't ok
ok = XT:validate(X),
@@ -171,13 +167,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
fun () ->
case mnesia:wread({rabbit_exchange, XName}) of
[] ->
- store(X),
- ok = case Durable of
- true -> mnesia:write(rabbit_durable_exchange,
- X, write);
- false -> ok
- end,
- {new, X};
+ {new, store(X)};
[ExistingX] ->
{existing, ExistingX}
end
@@ -195,7 +185,19 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
map_create_tx(true) -> transaction;
map_create_tx(false) -> none.
-store(X) -> ok = mnesia:write(rabbit_exchange, X, write).
+
+store(X = #exchange{durable = true}) ->
+ mnesia:write(rabbit_durable_exchange, X#exchange{decorators = undefined},
+ write),
+ store_ram(X);
+store(X = #exchange{durable = false}) ->
+ store_ram(X).
+
+store_ram(X) ->
+ X1 = rabbit_exchange_decorator:set(X),
+ ok = mnesia:write(rabbit_exchange, rabbit_exchange_decorator:set(X1),
+ write),
+ X1.
%% Used with binaries sent over the wire; the type may not exist.
check_type(TypeBin) ->
@@ -243,6 +245,8 @@ lookup_or_die(Name) ->
{error, not_found} -> rabbit_misc:not_found(Name)
end.
+list() -> mnesia:dirty_match_object(rabbit_exchange, #exchange{_ = '_'}).
+
%% Not dirty_match_object since that would not be transactional when used in a
%% tx context
list(VHostPath) ->
@@ -287,20 +291,27 @@ update_scratch(Name, App, Fun) ->
ok
end).
+update_decorators(Name) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ case mnesia:wread({rabbit_exchange, Name}) of
+ [X] -> store_ram(X),
+ ok;
+ [] -> ok
+ end
+ end).
+
update(Name, Fun) ->
case mnesia:wread({rabbit_exchange, Name}) of
- [X = #exchange{durable = Durable}] ->
- X1 = Fun(X),
- ok = mnesia:write(rabbit_exchange, X1, write),
- case Durable of
- true -> ok = mnesia:write(rabbit_durable_exchange, X1, write);
- _ -> ok
- end,
- X1;
- [] ->
- not_found
+ [X] -> X1 = Fun(X),
+ store(X1);
+ [] -> not_found
end.
+immutable(X) -> X#exchange{scratches = none,
+ policy = none,
+ decorators = none}.
+
info_keys() -> ?INFO_KEYS.
map(VHostPath, F) ->
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index 2f056b1b..900f9c32 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([select/2, set/1]).
+-export([select/2, set/1, register/2, unregister/1]).
%% This is like an exchange type except that:
%%
@@ -104,3 +104,25 @@ list() -> [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
cons_if_eq(Select, Select, Item, List) -> [Item | List];
cons_if_eq(_Select, _Other, _Item, List) -> List.
+
+register(TypeName, ModuleName) ->
+ rabbit_registry:register(exchange_decorator, TypeName, ModuleName),
+ [maybe_recover(X) || X <- rabbit_exchange:list()],
+ ok.
+
+unregister(TypeName) ->
+ rabbit_registry:unregister(exchange_decorator, TypeName),
+ [maybe_recover(X) || X <- rabbit_exchange:list()],
+ ok.
+
+maybe_recover(X = #exchange{name = Name,
+ decorators = Decs}) ->
+ #exchange{decorators = Decs1} = set(X),
+ Old = lists:sort(select(all, Decs)),
+ New = lists:sort(select(all, Decs1)),
+ case New of
+ Old -> ok;
+ _ -> %% TODO create a tx here for non-federation decorators
+ [M:create(none, X) || M <- New -- Old],
+ rabbit_exchange:update_decorators(Name)
+ end.
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index b17b7de9..f32a187d 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -183,12 +183,13 @@
-record(lim, {prefetch_count = 0,
ch_pid,
+ %% 'Notify' is a boolean that indicates whether a queue should be
+ %% notified of a change in the limit or volume that may allow it to
+ %% deliver more messages via the limiter's channel.
queues = orddict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
-%% 'Notify' is a boolean that indicates whether a queue should be
-%% notified of a change in the limit or volume that may allow it to
-%% deliver more messages via the limiter's channel.
+%% mode is of type credit_mode()
-record(credit, {credit = 0, mode}).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index f4df0e76..e05ef05a 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -16,16 +16,8 @@
-module(rabbit_log).
--behaviour(gen_server).
-
--export([start_link/0]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
-export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]).
-
--define(SERVER, ?MODULE).
+-export([with_local_io/1]).
%%----------------------------------------------------------------------------
@@ -36,8 +28,6 @@
-type(category() :: atom()).
-type(level() :: 'info' | 'warning' | 'error').
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-
-spec(log/3 :: (category(), level(), string()) -> 'ok').
-spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok').
@@ -48,16 +38,24 @@
-spec(error/1 :: (string()) -> 'ok').
-spec(error/2 :: (string(), [any()]) -> 'ok').
+-spec(with_local_io/1 :: (fun (() -> A)) -> A).
+
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
log(Category, Level, Fmt) -> log(Category, Level, Fmt, []).
log(Category, Level, Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {log, Category, Level, Fmt, Args}).
+ case level(Level) =< catlevel(Category) of
+ false -> ok;
+ true -> F = case Level of
+ info -> fun error_logger:info_msg/2;
+ warning -> fun error_logger:warning_msg/2;
+ error -> fun error_logger:error_msg/2
+ end,
+ with_local_io(fun () -> F(Fmt, Args) end)
+ end.
info(Fmt) -> log(default, info, Fmt).
info(Fmt, Args) -> log(default, info, Fmt, Args).
@@ -66,41 +64,14 @@ warning(Fmt, Args) -> log(default, warning, Fmt, Args).
error(Fmt) -> log(default, error, Fmt).
error(Fmt, Args) -> log(default, error, Fmt, Args).
-%%--------------------------------------------------------------------
-
-init([]) ->
- {ok, CatLevelList} = application:get_env(log_levels),
- CatLevels = [{Cat, level(Level)} || {Cat, Level} <- CatLevelList],
- {ok, orddict:from_list(CatLevels)}.
-
-handle_call(_Request, _From, State) ->
- {noreply, State}.
-
-handle_cast({log, Category, Level, Fmt, Args}, CatLevels) ->
- CatLevel = case orddict:find(Category, CatLevels) of
- {ok, L} -> L;
- error -> level(info)
- end,
- case level(Level) =< CatLevel of
- false -> ok;
- true -> (case Level of
- info -> fun error_logger:info_msg/2;
- warning -> fun error_logger:warning_msg/2;
- error -> fun error_logger:error_msg/2
- end)(Fmt, Args)
- end,
- {noreply, CatLevels};
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
+catlevel(Category) ->
+ %% We can get here as part of rabbitmqctl when it is impersonating
+ %% a node; in which case the env will not be defined.
+ CatLevelList = case application:get_env(rabbit, log_levels) of
+ {ok, L} -> L;
+ undefined -> []
+ end,
+ level(proplists:get_value(Category, CatLevelList, info)).
%%--------------------------------------------------------------------
@@ -108,3 +79,19 @@ level(info) -> 3;
level(warning) -> 2;
level(error) -> 1;
level(none) -> 0.
+
+%% Execute Fun using the IO system of the local node (i.e. the node on
+%% which the code is executing). Since this is invoked for every log
+%% message, we try to avoid unnecessarily churning group_leader/1.
+with_local_io(Fun) ->
+ GL = group_leader(),
+ Node = node(),
+ case node(GL) of
+ Node -> Fun();
+ _ -> group_leader(whereis(user), self()),
+ try
+ Fun()
+ after
+ group_leader(GL, self())
+ end
+ end.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 2b16b911..9bccf5dd 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -22,7 +22,7 @@
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
- msg_rates/1, status/1, invoke/3, is_duplicate/2]).
+ msg_rates/1, info/2, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
@@ -170,10 +170,24 @@ terminate({shutdown, dropped} = Reason,
State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)};
terminate(Reason,
- State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ State = #state { name = QName,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
%% shouldn't be deleted. Most likely safe shutdown of this
- %% node. Thus just let some other slave take over.
+ %% node.
+ {ok, Q = #amqqueue{sync_slave_pids = SSPids}} =
+ rabbit_amqqueue:lookup(QName),
+ case SSPids =:= [] andalso
+ rabbit_policy:get(<<"ha-promote-on-shutdown">>, Q) =/= <<"always">> of
+ true -> %% Remove the whole queue to avoid data loss
+ rabbit_mirror_queue_misc:log_warning(
+ QName, "Stopping all nodes on master shutdown since no "
+ "synchronised slave is available~n", []),
+ stop_all_slaves(Reason, State);
+ false -> %% Just let some other slave take over.
+ ok
+ end,
State #state { backing_queue_state = BQ:terminate(Reason, BQS) }.
delete_and_terminate(Reason, State = #state { backing_queue = BQ,
@@ -181,7 +195,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ,
stop_all_slaves(Reason, State),
State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
-stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
+stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
MRefs = [erlang:monitor(process, Pid) || Pid <- [GM | SPids]],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
@@ -360,10 +374,13 @@ resume(State = #state { backing_queue = BQ,
msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:msg_rates(BQS).
-status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
- BQ:status(BQS) ++
+info(backing_queue_status,
+ State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:info(backing_queue_status, BQS) ++
[ {mirror_seen, dict:size(State #state.seen_status)},
- {mirror_senders, sets:size(State #state.known_senders)} ].
+ {mirror_senders, sets:size(State #state.known_senders)} ];
+info(Item, #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:info(Item, BQS).
invoke(?MODULE, Fun, State) ->
Fun(?MODULE, State);
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index b0f092a9..9e8c4a18 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -29,16 +29,19 @@
-include("rabbit.hrl").
--rabbit_boot_step({?MODULE,
- [{description, "HA policy validation"},
- {mfa, {rabbit_registry, register,
- [policy_validator, <<"ha-mode">>, ?MODULE]}},
- {mfa, {rabbit_registry, register,
- [policy_validator, <<"ha-params">>, ?MODULE]}},
- {mfa, {rabbit_registry, register,
- [policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
- {requires, rabbit_registry},
- {enables, recovery}]}).
+-rabbit_boot_step(
+ {?MODULE,
+ [{description, "HA policy validation"},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-mode">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-params">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
%%----------------------------------------------------------------------------
@@ -75,9 +78,10 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
%% get here.
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [Q = #amqqueue { pid = QPid,
- slave_pids = SPids,
- gm_pids = GMPids }] ->
+ [Q = #amqqueue { pid = QPid,
+ slave_pids = SPids,
+ gm_pids = GMPids,
+ down_slave_nodes = DSNs}] ->
{DeadGM, AliveGM} = lists:partition(
fun ({GM, _}) ->
lists:member(GM, DeadGMPids)
@@ -86,6 +90,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
AlivePids = [Pid || {_GM, Pid} <- AliveGM],
Alive = [Pid || Pid <- [QPid | SPids],
lists:member(Pid, AlivePids)],
+ DSNs1 = [node(Pid) ||
+ Pid <- SPids,
+ not lists:member(Pid, AlivePids)] ++ DSNs,
{QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
@@ -94,9 +101,10 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
- Q1 = Q#amqqueue{pid = QPid1,
- slave_pids = SPids1,
- gm_pids = AliveGM},
+ Q1 = Q#amqqueue{pid = QPid1,
+ slave_pids = SPids1,
+ gm_pids = AliveGM,
+ down_slave_nodes = DSNs1},
store_updated_slaves(Q1),
%% If we add and remove nodes at the same time we
%% might tell the old master we need to sync and
@@ -106,8 +114,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
_ ->
%% Master has changed, and we're not it.
%% [1].
- Q1 = Q#amqqueue{slave_pids = Alive,
- gm_pids = AliveGM},
+ Q1 = Q#amqqueue{slave_pids = Alive,
+ gm_pids = AliveGM,
+ down_slave_nodes = DSNs1},
store_updated_slaves(Q1)
end,
{ok, QPid1, DeadPids}
@@ -236,12 +245,16 @@ log(Level, QName, Fmt, Args) ->
rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt,
[rabbit_misc:rs(QName) | Args]).
-store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
- sync_slave_pids = SSPids}) ->
+store_updated_slaves(Q = #amqqueue{pid = MPid,
+ slave_pids = SPids,
+ sync_slave_pids = SSPids,
+ down_slave_nodes = DSNs}) ->
%% TODO now that we clear sync_slave_pids in rabbit_durable_queue,
%% do we still need this filtering?
SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)],
- Q1 = Q#amqqueue{sync_slave_pids = SSPids1},
+ DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]],
+ Q1 = Q#amqqueue{sync_slave_pids = SSPids1,
+ down_slave_nodes = DSNs1},
ok = rabbit_amqqueue:store_queue(Q1),
%% Wake it up so that we emit a stats event
rabbit_amqqueue:notify_policy_changed(Q1),
@@ -374,16 +387,21 @@ validate_policy(KeyList) ->
Mode = proplists:get_value(<<"ha-mode">>, KeyList, none),
Params = proplists:get_value(<<"ha-params">>, KeyList, none),
SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none),
- case {Mode, Params, SyncMode} of
- {none, none, none} ->
+ PromoteOnShutdown = proplists:get_value(
+ <<"ha-promote-on-shutdown">>, KeyList, none),
+ case {Mode, Params, SyncMode, PromoteOnShutdown} of
+ {none, none, none, none} ->
ok;
- {none, _, _} ->
- {error, "ha-mode must be specified to specify ha-params or "
- "ha-sync-mode", []};
+ {none, _, _, _} ->
+ {error, "ha-mode must be specified to specify ha-params, "
+ "ha-sync-mode or ha-promote-on-shutdown", []};
_ ->
case module(Mode) of
{ok, M} -> case M:validate_policy(Params) of
- ok -> validate_sync_mode(SyncMode);
+ ok -> case validate_sync_mode(SyncMode) of
+ ok -> validate_pos(PromoteOnShutdown);
+ E -> E
+ end;
E -> E
end;
_ -> {error, "~p is not a valid ha-mode value", [Mode]}
@@ -398,3 +416,12 @@ validate_sync_mode(SyncMode) ->
Mode -> {error, "ha-sync-mode must be \"manual\" "
"or \"automatic\", got ~p", [Mode]}
end.
+
+validate_pos(PromoteOnShutdown) ->
+ case PromoteOnShutdown of
+ <<"always">> -> ok;
+ <<"when-synced">> -> ok;
+ none -> ok;
+ Mode -> {error, "ha-promote-on-shutdown must be "
+ "\"always\" or \"when-synced\", got ~p", [Mode]}
+ end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 11d6a79c..6d0064ab 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -271,8 +271,8 @@ handle_cast({sync_start, Ref, Syncer},
DD, Ref, TRef, Syncer, BQ, BQS,
fun (BQN, BQSN) ->
BQSN1 = update_ram_duration(BQN, BQSN),
- TRefN = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL,
- self(), update_ram_duration),
+ TRefN = rabbit_misc:send_after(?RAM_DURATION_UPDATE_INTERVAL,
+ self(), update_ram_duration),
{TRefN, BQSN1}
end) of
denied -> noreply(State1);
@@ -653,8 +653,9 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
timed -> {ensure_sync_timer(State1), 0 }
end.
-backing_queue_timeout(State = #state { backing_queue = BQ }) ->
- run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State).
+backing_queue_timeout(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State#state{backing_queue_state = BQ:timeout(BQS)}.
ensure_sync_timer(State) ->
rabbit_misc:ensure_timer(State, #state.sync_timer_ref,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 006bbadf..d2456918 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -42,10 +42,9 @@
-export([table_filter/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([format/2, format_many/1, format_stderr/2]).
--export([with_local_io/1, local_info_msg/2]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
--export([pid_to_string/1, string_to_pid/1]).
+-export([pid_to_string/1, string_to_pid/1, node_to_fake_pid/1]).
-export([version_compare/2, version_compare/3]).
-export([version_minor_equivalent/2]).
-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
@@ -67,10 +66,11 @@
-export([check_expiry/1]).
-export([base64url/1]).
-export([interval_operation/4]).
--export([ensure_timer/4, stop_timer/2]).
+-export([ensure_timer/4, stop_timer/2, send_after/3, cancel_timer/1]).
-export([get_parent/0]).
-export([store_proc_name/1, store_proc_name/2]).
-export([moving_average/4]).
+-export([now_to_ms/1]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -81,7 +81,7 @@
-ifdef(use_specs).
--export_type([resource_name/0, thunk/1]).
+-export_type([resource_name/0, thunk/1, channel_or_connection_exit/0]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
@@ -91,9 +91,10 @@
:: rabbit_types:channel_exit() | rabbit_types:connection_exit()).
-type(digraph_label() :: term()).
-type(graph_vertex_fun() ::
- fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])).
+ fun (({atom(), [term()]}) -> [{digraph:vertex(), digraph_label()}])).
-type(graph_edge_fun() ::
- fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])).
+ fun (({atom(), [term()]}) -> [{digraph:vertex(), digraph:vertex()}])).
+-type(tref() :: {'erlang', reference()} | {timer, timer:tref()}).
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
@@ -183,8 +184,6 @@
-spec(format/2 :: (string(), [any()]) -> string()).
-spec(format_many/1 :: ([{string(), [any()]}]) -> string()).
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
--spec(with_local_io/1 :: (fun (() -> A)) -> A).
--spec(local_info_msg/2 :: (string(), [any()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> integer()).
-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
@@ -192,6 +191,7 @@
(rabbit_framing:amqp_table()) -> rabbit_framing:amqp_table()).
-spec(pid_to_string/1 :: (pid()) -> string()).
-spec(string_to_pid/1 :: (string()) -> pid()).
+-spec(node_to_fake_pid/1 :: (atom()) -> pid()).
-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt').
-spec(version_compare/3 ::
(string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt'))
@@ -209,7 +209,8 @@
[string()])
-> {'ok', {atom(), [{string(), string()}], [string()]}} |
'no_command').
--spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
+-spec(all_module_attributes/1 ::
+ (atom()) -> [{atom(), atom(), [term()]}]).
-spec(build_acyclic_graph/3 ::
(graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}])
-> rabbit_types:ok_or_error2(digraph(),
@@ -245,11 +246,16 @@
-> {any(), non_neg_integer()}).
-spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A).
-spec(stop_timer/2 :: (A, non_neg_integer()) -> A).
+-spec(send_after/3 :: (non_neg_integer(), pid(), any()) -> tref()).
+-spec(cancel_timer/1 :: (tref()) -> 'ok').
-spec(get_parent/0 :: () -> pid()).
-spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok).
-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok).
-spec(moving_average/4 :: (float(), float(), float(), float() | 'undefined')
-> float()).
+-spec(now_to_ms/1 :: ({non_neg_integer(),
+ non_neg_integer(),
+ non_neg_integer()}) -> pos_integer()).
-endif.
%%----------------------------------------------------------------------------
@@ -635,23 +641,6 @@ format_stderr(Fmt, Args) ->
end,
ok.
-%% Execute Fun using the IO system of the local node (i.e. the node on
-%% which the code is executing).
-with_local_io(Fun) ->
- GL = group_leader(),
- group_leader(whereis(user), self()),
- try
- Fun()
- after
- group_leader(GL, self())
- end.
-
-%% Log an info message on the local node using the standard logger.
-%% Use this if rabbit isn't running and the call didn't originate on
-%% the local node (e.g. rabbitmqctl calls).
-local_info_msg(Format, Args) ->
- with_local_io(fun () -> error_logger:info_msg(Format, Args) end).
-
unfold(Fun, Init) ->
unfold(Fun, [], Init).
@@ -705,6 +694,10 @@ string_to_pid(Str) ->
throw(Err)
end.
+%% node(node_to_fake_pid(Node)) =:= Node.
+node_to_fake_pid(Node) ->
+ string_to_pid(format("<~s.0.0.0>", [Node])).
+
version_compare(A, B, lte) ->
case version_compare(A, B) of
eq -> true;
@@ -849,20 +842,20 @@ module_attributes(Module) ->
end.
all_module_attributes(Name) ->
- Modules =
+ Targets =
lists:usort(
lists:append(
- [Modules || {App, _, _} <- application:loaded_applications(),
- {ok, Modules} <- [application:get_key(App, modules)]])),
+ [[{App, Module} || Module <- Modules] ||
+ {App, _, _} <- application:loaded_applications(),
+ {ok, Modules} <- [application:get_key(App, modules)]])),
lists:foldl(
- fun (Module, Acc) ->
+ fun ({App, Module}, Acc) ->
case lists:append([Atts || {N, Atts} <- module_attributes(Module),
N =:= Name]) of
[] -> Acc;
- Atts -> [{Module, Atts} | Acc]
+ Atts -> [{App, Module, Atts} | Acc]
end
- end, [], Modules).
-
+ end, [], Targets).
build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
G = digraph:new([acyclic]),
@@ -870,13 +863,13 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
[case digraph:vertex(G, Vertex) of
false -> digraph:add_vertex(G, Vertex, Label);
_ -> ok = throw({graph_error, {vertex, duplicate, Vertex}})
- end || {Module, Atts} <- Graph,
- {Vertex, Label} <- VertexFun(Module, Atts)],
+ end || GraphElem <- Graph,
+ {Vertex, Label} <- VertexFun(GraphElem)],
[case digraph:add_edge(G, From, To) of
{error, E} -> throw({graph_error, {edge, E, From, To}});
_ -> ok
- end || {Module, Atts} <- Graph,
- {From, To} <- EdgeFun(Module, Atts)],
+ end || GraphElem <- Graph,
+ {From, To} <- EdgeFun(GraphElem)],
{ok, G}
catch {graph_error, Reason} ->
true = digraph:delete(G),
@@ -1017,7 +1010,9 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
V =:= true orelse V =:= false ->
V.
-check_expiry(N) when N > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, N}};
+now_to_ms({Mega, Sec, Micro}) ->
+ (Mega * 1000000 * 1000000 + Sec * 1000000 + Micro) div 1000.
+
check_expiry(N) when N < 0 -> {error, {value_negative, N}};
check_expiry(_N) -> ok.
@@ -1045,7 +1040,7 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) ->
ensure_timer(State, Idx, After, Msg) ->
case element(Idx, State) of
- undefined -> TRef = erlang:send_after(After, self(), Msg),
+ undefined -> TRef = send_after(After, self(), Msg),
setelement(Idx, State, TRef);
_ -> State
end.
@@ -1053,12 +1048,25 @@ ensure_timer(State, Idx, After, Msg) ->
stop_timer(State, Idx) ->
case element(Idx, State) of
undefined -> State;
- TRef -> case erlang:cancel_timer(TRef) of
- false -> State;
- _ -> setelement(Idx, State, undefined)
- end
+ TRef -> cancel_timer(TRef),
+ setelement(Idx, State, undefined)
end.
+%% timer:send_after/3 goes through a single timer process but allows
+%% long delays. erlang:send_after/3 does not have a bottleneck but
+%% only allows max 2^32-1 millis.
+-define(MAX_ERLANG_SEND_AFTER, 4294967295).
+send_after(Millis, Pid, Msg) when Millis > ?MAX_ERLANG_SEND_AFTER ->
+ {ok, Ref} = timer:send_after(Millis, Pid, Msg),
+ {timer, Ref};
+send_after(Millis, Pid, Msg) ->
+ {erlang, erlang:send_after(Millis, Pid, Msg)}.
+
+cancel_timer({erlang, Ref}) -> erlang:cancel_timer(Ref),
+ ok;
+cancel_timer({timer, Ref}) -> {ok, cancel} = timer:cancel(Ref),
+ ok.
+
store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
store_proc_name(TypeProcName) -> put(process_name, TypeProcName).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index c6c2c8eb..d5dd9712 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -23,6 +23,7 @@
update_cluster_nodes/1,
change_cluster_node_type/1,
forget_cluster_node/2,
+ force_load_next_boot/0,
status/0,
is_clustered/0,
@@ -63,6 +64,7 @@
-spec(update_cluster_nodes/1 :: (node()) -> 'ok').
-spec(change_cluster_node_type/1 :: (node_type()) -> 'ok').
-spec(forget_cluster_node/2 :: (node(), boolean()) -> 'ok').
+-spec(force_load_next_boot/0 :: () -> 'ok').
%% Various queries to get the status of the db
-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
@@ -114,7 +116,7 @@ init_from_config() ->
true -> disc;
false -> ram
end},
- error_logger:warning_msg(
+ rabbit_log:warning(
"Converting legacy 'cluster_nodes' configuration~n ~w~n"
"to~n ~w.~n~n"
"Please update the configuration to the new format "
@@ -125,17 +127,22 @@ init_from_config() ->
{ok, Config} ->
Config
end,
- case find_good_node(nodes_excl_me(TryNodes)) of
+ case TryNodes of
+ [] -> init_db_and_upgrade([node()], disc, false);
+ _ -> auto_cluster(TryNodes, NodeType)
+ end.
+
+auto_cluster(TryNodes, NodeType) ->
+ case find_auto_cluster_node(nodes_excl_me(TryNodes)) of
{ok, Node} ->
- rabbit_log:info("Node '~p' selected for clustering from "
- "configuration~n", [Node]),
+ rabbit_log:info("Node '~p' selected for auto-clustering~n", [Node]),
{ok, {_, DiscNodes, _}} = discover_cluster0(Node),
init_db_and_upgrade(DiscNodes, NodeType, true),
rabbit_node_monitor:notify_joined_cluster();
none ->
- rabbit_log:warning("Could not find any suitable node amongst the "
- "ones provided in the configuration: ~p~n",
- [TryNodes]),
+ rabbit_log:warning(
+ "Could not find any node for auto-clustering from: ~p~n"
+ "Starting blank node...~n", [TryNodes]),
init_db_and_upgrade([node()], disc, false)
end.
@@ -170,14 +177,13 @@ join_cluster(DiscoveryNode, NodeType) ->
reset_gracefully(),
%% Join the cluster
- rabbit_misc:local_info_msg("Clustering with ~p as ~p node~n",
- [ClusterNodes, NodeType]),
+ rabbit_log:info("Clustering with ~p as ~p node~n",
+ [ClusterNodes, NodeType]),
ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true),
rabbit_node_monitor:notify_joined_cluster(),
ok;
true ->
- rabbit_misc:local_info_msg("Already member of cluster: ~p~n",
- [ClusterNodes]),
+ rabbit_log:info("Already member of cluster: ~p~n", [ClusterNodes]),
{ok, already_member}
end.
@@ -186,12 +192,12 @@ join_cluster(DiscoveryNode, NodeType) ->
%% persisted messages
reset() ->
ensure_mnesia_not_running(),
- rabbit_misc:local_info_msg("Resetting Rabbit~n", []),
+ rabbit_log:info("Resetting Rabbit~n", []),
reset_gracefully().
force_reset() ->
ensure_mnesia_not_running(),
- rabbit_misc:local_info_msg("Resetting Rabbit forcefully~n", []),
+ rabbit_log:info("Resetting Rabbit forcefully~n", []),
wipe().
reset_gracefully() ->
@@ -247,8 +253,8 @@ update_cluster_nodes(DiscoveryNode) ->
%% nodes
mnesia:delete_schema([node()]),
rabbit_node_monitor:write_cluster_status(Status),
- rabbit_misc:local_info_msg("Updating cluster nodes from ~p~n",
- [DiscoveryNode]),
+ rabbit_log:info("Updating cluster nodes from ~p~n",
+ [DiscoveryNode]),
init_db_with_mnesia(AllNodes, node_type(), true, true);
false ->
e(inconsistent_cluster)
@@ -271,7 +277,7 @@ forget_cluster_node(Node, RemoveWhenOffline) ->
{true, false} -> remove_node_offline_node(Node);
{true, true} -> e(online_node_offline_flag);
{false, false} -> e(offline_node_no_offline_flag);
- {false, true} -> rabbit_misc:local_info_msg(
+ {false, true} -> rabbit_log:info(
"Removing node ~p from cluster~n", [Node]),
case remove_node_if_mnesia_running(Node) of
ok -> ok;
@@ -303,7 +309,6 @@ remove_node_offline_node(Node) ->
e(removing_node_from_offline_node)
end.
-
%%----------------------------------------------------------------------------
%% Queries
%%----------------------------------------------------------------------------
@@ -618,10 +623,10 @@ schema_ok_or_move() ->
{error, Reason} ->
%% NB: we cannot use rabbit_log here since it may not have been
%% started yet
- error_logger:warning_msg("schema integrity check failed: ~p~n"
- "moving database to backup location "
- "and recreating schema from scratch~n",
- [Reason]),
+ rabbit_log:warning("schema integrity check failed: ~p~n"
+ "moving database to backup location "
+ "and recreating schema from scratch~n",
+ [Reason]),
ok = move_db(),
ok = create_schema()
end.
@@ -647,8 +652,8 @@ move_db() ->
ok ->
%% NB: we cannot use rabbit_log here since it may not have
%% been started yet
- error_logger:warning_msg("moved database from ~s to ~s~n",
- [MnesiaDir, BackupDir]),
+ rabbit_log:warning("moved database from ~s to ~s~n",
+ [MnesiaDir, BackupDir]),
ok;
{error, Reason} -> throw({error, {cannot_backup_mnesia,
MnesiaDir, BackupDir, Reason}})
@@ -694,7 +699,7 @@ leave_cluster(Node) ->
end.
wait_for(Condition) ->
- error_logger:info_msg("Waiting for ~p...~n", [Condition]),
+ rabbit_log:info("Waiting for ~p...~n", [Condition]),
timer:sleep(1000).
start_mnesia(CheckConsistency) ->
@@ -787,17 +792,24 @@ is_virgin_node() ->
false
end.
-find_good_node([]) ->
+find_auto_cluster_node([]) ->
none;
-find_good_node([Node | Nodes]) ->
+find_auto_cluster_node([Node | Nodes]) ->
+ Fail = fun (Fmt, Args) ->
+ rabbit_log:warning(
+ "Could not auto-cluster with ~s: " ++ Fmt, [Node | Args]),
+ find_auto_cluster_node(Nodes)
+ end,
case rpc:call(Node, rabbit_mnesia, node_info, []) of
- {badrpc, _Reason} -> find_good_node(Nodes);
+ {badrpc, _} = Reason -> Diag = rabbit_nodes:diagnostics([Node]),
+ Fail("~p~n~s~n", [Reason, Diag]);
%% old delegate hash check
- {_OTP, _Rabbit, _Hash, _} -> find_good_node(Nodes);
- {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of
- {error, _} -> find_good_node(Nodes);
- ok -> {ok, Node}
- end
+ {_OTP, Rabbit, _Hash, _} -> Fail("version ~s~n", [Rabbit]);
+ {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of
+ {error, _} -> Fail("versions ~p~n",
+ [{OTP, Rabbit}]);
+ ok -> {ok, Node}
+ end
end.
is_only_clustered_disc_node() ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 9082dbd3..4e92bf39 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -37,8 +37,6 @@
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
--define(SSL_TIMEOUT, 5). %% seconds
-
-define(FIRST_TEST_BIND_PORT, 10000).
%%----------------------------------------------------------------------------
@@ -168,9 +166,14 @@ ensure_ssl() ->
end
end.
+ssl_timeout() ->
+ {ok, Val} = application:get_env(rabbit, ssl_handshake_timeout),
+ Val.
+
ssl_transform_fun(SslOpts) ->
fun (Sock) ->
- case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of
+ Timeout = ssl_timeout(),
+ case catch ssl:ssl_accept(Sock, SslOpts, Timeout) of
{ok, SslSock} ->
{ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
{error, timeout} ->
@@ -185,7 +188,7 @@ ssl_transform_fun(SslOpts) ->
%% form, according to the TLS spec). So we give
%% the ssl_connection a little bit of time to send
%% such alerts.
- timer:sleep(?SSL_TIMEOUT * 1000),
+ timer:sleep(Timeout),
{error, {ssl_upgrade_error, Reason}};
{'EXIT', Reason} ->
{error, {ssl_upgrade_failure, Reason}}
@@ -205,7 +208,7 @@ tcp_listener_addresses({Host, Port, Family0})
[{IPAddress, Port, Family} ||
{IPAddress, Family} <- getaddr(Host, Family0)];
tcp_listener_addresses({_Host, Port, _Family0}) ->
- error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]),
+ rabbit_log:error("invalid port ~p - not 0..65535~n", [Port]),
throw({error, {invalid_port, Port}}).
tcp_listener_addresses_auto(Port) ->
@@ -392,7 +395,7 @@ gethostaddr(Host, Family) ->
end.
host_lookup_error(Host, Reason) ->
- error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]),
+ rabbit_log:error("invalid host ~p - ~p~n", [Host, Reason]),
throw({error, {invalid_host, Host, Reason}}).
resolve_family({_,_,_,_}, auto) -> inet;
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index ca843e14..7eaeaf50 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -458,6 +458,7 @@ ensure_ping_timer(State) ->
State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL, ping_nodes).
handle_live_rabbit(Node) ->
+ ok = rabbit_amqqueue:on_node_up(Node),
ok = rabbit_alarm:on_node_up(Node),
ok = rabbit_mnesia:on_node_up(Node).
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index c0fb05e2..04db5d5d 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -18,6 +18,7 @@
-include("rabbit.hrl").
-export([setup/0, active/0, read_enabled/1, list/1, dependencies/3]).
+-export([ensure/1]).
%%----------------------------------------------------------------------------
@@ -31,22 +32,54 @@
-spec(read_enabled/1 :: (file:filename()) -> [plugin_name()]).
-spec(dependencies/3 :: (boolean(), [plugin_name()], [#plugin{}]) ->
[plugin_name()]).
-
+-spec(ensure/1 :: (string()) -> {'ok', [atom()], [atom()]} | {error, any()}).
-endif.
%%----------------------------------------------------------------------------
+ensure(FileJustChanged) ->
+ {ok, OurFile} = application:get_env(rabbit, enabled_plugins_file),
+ case OurFile of
+ FileJustChanged ->
+ {ok, Dir} = application:get_env(rabbit, plugins_dir),
+ Enabled = read_enabled(OurFile),
+ Wanted = dependencies(false, Enabled, list(Dir)),
+ prepare_plugins(Enabled),
+ Current = active(),
+ Start = Wanted -- Current,
+ Stop = Current -- Wanted,
+ rabbit:start_apps(Start),
+ %% We need sync_notify here since mgmt will attempt to look at all
+ %% the modules for the disabled plugins - if they are unloaded
+ %% that won't work.
+ ok = rabbit_event:sync_notify(plugins_changed, [{enabled, Start},
+ {disabled, Stop}]),
+ rabbit:stop_apps(Stop),
+ clean_plugins(Stop),
+ {ok, Start, Stop};
+ _ ->
+ {error, {enabled_plugins_mismatch, FileJustChanged, OurFile}}
+ end.
+
%% @doc Prepares the file system and installs all enabled plugins.
setup() ->
- {ok, PluginDir} = application:get_env(rabbit, plugins_dir),
{ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+
+ %% Eliminate the contents of the destination directory
+ case delete_recursively(ExpandDir) of
+ ok -> ok;
+ {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir,
+ [ExpandDir, E1]}})
+ end,
+
{ok, EnabledFile} = application:get_env(rabbit, enabled_plugins_file),
- prepare_plugins(EnabledFile, PluginDir, ExpandDir).
+ Enabled = read_enabled(EnabledFile),
+ prepare_plugins(Enabled).
%% @doc Lists the plugins which are currently running.
active() ->
{ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
- InstalledPlugins = [ P#plugin.name || P <- list(ExpandDir) ],
+ InstalledPlugins = plugin_names(list(ExpandDir)),
[App || {App, _, _} <- rabbit_misc:which_applications(),
lists:member(App, InstalledPlugins)].
@@ -64,10 +97,10 @@ list(PluginsDir) ->
[plugin_info(PluginsDir, Plug) || Plug <- EZs ++ FreeApps]),
case Problems of
[] -> ok;
- _ -> error_logger:warning_msg(
+ _ -> rabbit_log:warning(
"Problem reading some plugins: ~p~n", [Problems])
end,
- Plugins.
+ ensure_dependencies(Plugins).
%% @doc Read the list of enabled plugins from the supplied term file.
read_enabled(PluginsFile) ->
@@ -86,15 +119,10 @@ read_enabled(PluginsFile) ->
%% the resulting list, otherwise they're skipped.
dependencies(Reverse, Sources, AllPlugins) ->
{ok, G} = rabbit_misc:build_acyclic_graph(
- fun (App, _Deps) -> [{App, App}] end,
- fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end,
- lists:ukeysort(
- 1, [{Name, Deps} ||
- #plugin{name = Name,
- dependencies = Deps} <- AllPlugins] ++
- [{Dep, []} ||
- #plugin{dependencies = Deps} <- AllPlugins,
- Dep <- Deps])),
+ fun ({App, _Deps}) -> [{App, App}] end,
+ fun ({App, Deps}) -> [{App, Dep} || Dep <- Deps] end,
+ [{Name, Deps} || #plugin{name = Name,
+ dependencies = Deps} <- AllPlugins]),
Dests = case Reverse of
false -> digraph_utils:reachable(Sources, G);
true -> digraph_utils:reaching(Sources, G)
@@ -102,27 +130,44 @@ dependencies(Reverse, Sources, AllPlugins) ->
true = digraph:delete(G),
Dests.
+%% Make sure we don't list OTP apps in here, and also that we detect
+%% missing dependencies.
+ensure_dependencies(Plugins) ->
+ Names = plugin_names(Plugins),
+ NotThere = [Dep || #plugin{dependencies = Deps} <- Plugins,
+ Dep <- Deps,
+ not lists:member(Dep, Names)],
+ {OTP, Missing} = lists:partition(fun is_loadable/1, lists:usort(NotThere)),
+ case Missing of
+ [] -> ok;
+ _ -> Blame = [Name || #plugin{name = Name,
+ dependencies = Deps} <- Plugins,
+ lists:any(fun (Dep) ->
+ lists:member(Dep, Missing)
+ end, Deps)],
+ throw({error, {missing_dependencies, Missing, Blame}})
+ end,
+ [P#plugin{dependencies = Deps -- OTP}
+ || P = #plugin{dependencies = Deps} <- Plugins].
+
+is_loadable(App) ->
+ case application:load(App) of
+ {error, {already_loaded, _}} -> true;
+ ok -> application:unload(App),
+ true;
+ _ -> false
+ end.
+
%%----------------------------------------------------------------------------
-prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) ->
+prepare_plugins(Enabled) ->
+ {ok, PluginsDistDir} = application:get_env(rabbit, plugins_dir),
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+
AllPlugins = list(PluginsDistDir),
- Enabled = read_enabled(EnabledFile),
ToUnpack = dependencies(false, Enabled, AllPlugins),
ToUnpackPlugins = lookup_plugins(ToUnpack, AllPlugins),
- case Enabled -- plugin_names(ToUnpackPlugins) of
- [] -> ok;
- Missing -> error_logger:warning_msg(
- "The following enabled plugins were not found: ~p~n",
- [Missing])
- end,
-
- %% Eliminate the contents of the destination directory
- case delete_recursively(ExpandDir) of
- ok -> ok;
- {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir,
- [ExpandDir, E1]}})
- end,
case filelib:ensure_dir(ExpandDir ++ "/") of
ok -> ok;
{error, E2} -> throw({error, {cannot_create_plugins_expand_dir,
@@ -134,6 +179,20 @@ prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) ->
[prepare_dir_plugin(PluginAppDescPath) ||
PluginAppDescPath <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")].
+clean_plugins(Plugins) ->
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+ [clean_plugin(Plugin, ExpandDir) || Plugin <- Plugins].
+
+clean_plugin(Plugin, ExpandDir) ->
+ {ok, Mods} = application:get_key(Plugin, modules),
+ application:unload(Plugin),
+ [begin
+ code:soft_purge(Mod),
+ code:delete(Mod),
+ false = code:is_loaded(Mod)
+ end || Mod <- Mods],
+ delete_recursively(rabbit_misc:format("~s/~s", [ExpandDir, Plugin])).
+
prepare_dir_plugin(PluginAppDescPath) ->
code:add_path(filename:dirname(PluginAppDescPath)),
list_to_atom(filename:basename(PluginAppDescPath, ".app")).
@@ -172,8 +231,7 @@ plugin_info(Base, {app, App0}) ->
mkplugin(Name, Props, Type, Location) ->
Version = proplists:get_value(vsn, Props, "0"),
Description = proplists:get_value(description, Props, ""),
- Dependencies =
- filter_applications(proplists:get_value(applications, Props, [])),
+ Dependencies = proplists:get_value(applications, Props, []),
#plugin{name = Name, version = Version, description = Description,
dependencies = Dependencies, location = Location, type = Type}.
@@ -206,18 +264,6 @@ parse_binary(Bin) ->
Err -> {error, {invalid_app, Err}}
end.
-filter_applications(Applications) ->
- [Application || Application <- Applications,
- not is_available_app(Application)].
-
-is_available_app(Application) ->
- case application:load(Application) of
- {error, {already_loaded, _}} -> true;
- ok -> application:unload(Application),
- true;
- _ -> false
- end.
-
plugin_names(Plugins) ->
[Name || #plugin{name = Name} <- Plugins].
diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl
index 89e16f14..278fcf98 100644
--- a/src/rabbit_plugins_main.erl
+++ b/src/rabbit_plugins_main.erl
@@ -18,23 +18,34 @@
-include("rabbit.hrl").
-export([start/0, stop/0]).
+-export([action/6]).
+-define(NODE_OPT, "-n").
-define(VERBOSE_OPT, "-v").
-define(MINIMAL_OPT, "-m").
-define(ENABLED_OPT, "-E").
-define(ENABLED_ALL_OPT, "-e").
+-define(OFFLINE_OPT, "--offline").
+-define(ONLINE_OPT, "--online").
+-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
-define(VERBOSE_DEF, {?VERBOSE_OPT, flag}).
-define(MINIMAL_DEF, {?MINIMAL_OPT, flag}).
-define(ENABLED_DEF, {?ENABLED_OPT, flag}).
-define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}).
+-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}).
+-define(ONLINE_DEF, {?ONLINE_OPT, flag}).
--define(GLOBAL_DEFS, []).
+-define(RPC_TIMEOUT, infinity).
+
+-define(GLOBAL_DEFS(Node), [?NODE_DEF(Node)]).
-define(COMMANDS,
[{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]},
- enable,
- disable]).
+ {enable, [?OFFLINE_DEF, ?ONLINE_DEF]},
+ {disable, [?OFFLINE_DEF, ?ONLINE_DEF]},
+ {set, [?OFFLINE_DEF, ?ONLINE_DEF]},
+ {sync, []}]).
%%----------------------------------------------------------------------------
@@ -51,11 +62,10 @@
start() ->
{ok, [[PluginsFile|_]|_]} =
init:get_argument(enabled_plugins_file),
+ {ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
{ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir),
{Command, Opts, Args} =
- case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS,
- init:get_plain_arguments())
- of
+ case parse_arguments(init:get_plain_arguments(), NodeStr) of
{ok, Res} -> Res;
no_command -> print_error("could not recognise command", []),
usage()
@@ -67,7 +77,8 @@ start() ->
[string:join([atom_to_list(Command) | Args], " ")])
end,
- case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of
+ Node = proplists:get_value(?NODE_OPT, Opts),
+ case catch action(Command, Node, Args, Opts, PluginsFile, PluginsDir) of
ok ->
rabbit_misc:quit(0);
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
@@ -76,12 +87,23 @@ start() ->
{'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} ->
PrintInvalidCommandError(),
usage();
+ {error, {missing_dependencies, Missing, Blame}} ->
+ print_error("dependent plugins ~p not found; used by ~p.",
+ [Missing, Blame]),
+ rabbit_misc:quit(2);
{error, Reason} ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
{error_string, Reason} ->
print_error("~s", [Reason]),
rabbit_misc:quit(2);
+ {badrpc, {'EXIT', Reason}} ->
+ print_error("~p", [Reason]),
+ rabbit_misc:quit(2);
+ {badrpc, Reason} ->
+ print_error("unable to connect to node ~w: ~w", [Node, Reason]),
+ print_badrpc_diagnostics([Node]),
+ rabbit_misc:quit(2);
Other ->
print_error("~p", [Other]),
rabbit_misc:quit(2)
@@ -92,50 +114,80 @@ stop() ->
%%----------------------------------------------------------------------------
-action(list, [], Opts, PluginsFile, PluginsDir) ->
- action(list, [".*"], Opts, PluginsFile, PluginsDir);
-action(list, [Pat], Opts, PluginsFile, PluginsDir) ->
- format_plugins(Pat, Opts, PluginsFile, PluginsDir);
+parse_arguments(CmdLine, NodeStr) ->
+ case rabbit_misc:parse_arguments(
+ ?COMMANDS, ?GLOBAL_DEFS(NodeStr), CmdLine) of
+ {ok, {Cmd, Opts0, Args}} ->
+ Opts = [case K of
+ ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
+ _ -> {K, V}
+ end || {K, V} <- Opts0],
+ {ok, {Cmd, Opts, Args}};
+ E ->
+ E
+ end.
+
+action(list, Node, [], Opts, PluginsFile, PluginsDir) ->
+ action(list, Node, [".*"], Opts, PluginsFile, PluginsDir);
+action(list, Node, [Pat], Opts, PluginsFile, PluginsDir) ->
+ format_plugins(Node, Pat, Opts, PluginsFile, PluginsDir);
-action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
+action(enable, Node, ToEnable0, Opts, PluginsFile, PluginsDir) ->
case ToEnable0 of
[] -> throw({error_string, "Not enough arguments for 'enable'"});
_ -> ok
end,
AllPlugins = rabbit_plugins:list(PluginsDir),
Enabled = rabbit_plugins:read_enabled(PluginsFile),
- ImplicitlyEnabled = rabbit_plugins:dependencies(false,
- Enabled, AllPlugins),
+ ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins),
ToEnable = [list_to_atom(Name) || Name <- ToEnable0],
Missing = ToEnable -- plugin_names(AllPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> throw({error_string, fmt_missing(Missing)})
+ end,
NewEnabled = lists:usort(Enabled ++ ToEnable),
NewImplicitlyEnabled = rabbit_plugins:dependencies(false,
NewEnabled, AllPlugins),
- MissingDeps = (NewImplicitlyEnabled -- plugin_names(AllPlugins)) -- Missing,
- case {Missing, MissingDeps} of
- {[], []} -> ok;
- {Miss, []} -> throw({error_string, fmt_missing("plugins", Miss)});
- {[], Miss} -> throw({error_string, fmt_missing("dependencies", Miss)});
- {_, _} -> throw({error_string,
- fmt_missing("plugins", Missing) ++
- fmt_missing("dependencies", MissingDeps)})
- end,
write_enabled_plugins(PluginsFile, NewEnabled),
case NewEnabled -- ImplicitlyEnabled of
[] -> io:format("Plugin configuration unchanged.~n");
_ -> print_list("The following plugins have been enabled:",
- NewImplicitlyEnabled -- ImplicitlyEnabled),
- report_change()
- end;
+ NewImplicitlyEnabled -- ImplicitlyEnabled)
+ end,
+ action_change(
+ Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile);
+
+action(set, Node, ToSet0, Opts, PluginsFile, PluginsDir) ->
+ ToSet = [list_to_atom(Name) || Name <- ToSet0],
+ AllPlugins = rabbit_plugins:list(PluginsDir),
+ Enabled = rabbit_plugins:read_enabled(PluginsFile),
+ ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins),
+ Missing = ToSet -- plugin_names(AllPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> throw({error_string, fmt_missing(Missing)})
+ end,
+ NewImplicitlyEnabled = rabbit_plugins:dependencies(false,
+ ToSet, AllPlugins),
+ write_enabled_plugins(PluginsFile, ToSet),
+ case NewImplicitlyEnabled of
+ [] -> io:format("All plugins are now disabled.~n");
+ _ -> print_list("The following plugins are now enabled:",
+ NewImplicitlyEnabled)
+ end,
+ action_change(
+ Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile);
-action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
+action(disable, Node, ToDisable0, Opts, PluginsFile, PluginsDir) ->
case ToDisable0 of
[] -> throw({error_string, "Not enough arguments for 'disable'"});
_ -> ok
end,
- ToDisable = [list_to_atom(Name) || Name <- ToDisable0],
- Enabled = rabbit_plugins:read_enabled(PluginsFile),
AllPlugins = rabbit_plugins:list(PluginsDir),
+ Enabled = rabbit_plugins:read_enabled(PluginsFile),
+ ImplicitlyEnabled = rabbit_plugins:dependencies(false, Enabled, AllPlugins),
+ ToDisable = [list_to_atom(Name) || Name <- ToDisable0],
Missing = ToDisable -- plugin_names(AllPlugins),
case Missing of
[] -> ok;
@@ -144,30 +196,35 @@ action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
end,
ToDisableDeps = rabbit_plugins:dependencies(true, ToDisable, AllPlugins),
NewEnabled = Enabled -- ToDisableDeps,
+ NewImplicitlyEnabled = rabbit_plugins:dependencies(false,
+ NewEnabled, AllPlugins),
case length(Enabled) =:= length(NewEnabled) of
true -> io:format("Plugin configuration unchanged.~n");
- false -> ImplicitlyEnabled =
- rabbit_plugins:dependencies(false, Enabled, AllPlugins),
- NewImplicitlyEnabled =
- rabbit_plugins:dependencies(false,
- NewEnabled, AllPlugins),
- print_list("The following plugins have been disabled:",
+ false -> print_list("The following plugins have been disabled:",
ImplicitlyEnabled -- NewImplicitlyEnabled),
- write_enabled_plugins(PluginsFile, NewEnabled),
- report_change()
- end.
+ write_enabled_plugins(PluginsFile, NewEnabled)
+ end,
+ action_change(
+ Opts, Node, ImplicitlyEnabled, NewImplicitlyEnabled, PluginsFile);
+
+action(sync, Node, [], _Opts, PluginsFile, _PluginsDir) ->
+ sync(Node, true, PluginsFile).
%%----------------------------------------------------------------------------
-print_error(Format, Args) ->
- rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
+fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args).
+
+print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
+
+print_badrpc_diagnostics(Nodes) ->
+ fmt_stderr(rabbit_nodes:diagnostics(Nodes), []).
usage() ->
io:format("~s", [rabbit_plugins_usage:usage()]),
rabbit_misc:quit(1).
%% Pretty print a list of plugins.
-format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
+format_plugins(Node, Pattern, Opts, PluginsFile, PluginsDir) ->
Verbose = proplists:get_bool(?VERBOSE_OPT, Opts),
Minimal = proplists:get_bool(?MINIMAL_OPT, Opts),
Format = case {Verbose, Minimal} of
@@ -182,41 +239,52 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
AvailablePlugins = rabbit_plugins:list(PluginsDir),
EnabledExplicitly = rabbit_plugins:read_enabled(PluginsFile),
- EnabledImplicitly =
- rabbit_plugins:dependencies(false, EnabledExplicitly,
- AvailablePlugins) -- EnabledExplicitly,
- Missing = [#plugin{name = Name, dependencies = []} ||
- Name <- ((EnabledExplicitly ++ EnabledImplicitly) --
- plugin_names(AvailablePlugins))],
+ AllEnabled = rabbit_plugins:dependencies(false, EnabledExplicitly,
+ AvailablePlugins),
+ EnabledImplicitly = AllEnabled -- EnabledExplicitly,
+ {StatusMsg, Running} =
+ case rpc:call(Node, rabbit_plugins, active, [], ?RPC_TIMEOUT) of
+ {badrpc, _} -> {"[failed to contact ~s - status not shown]", []};
+ Active -> {"* = running on ~s", Active}
+ end,
{ok, RE} = re:compile(Pattern),
Plugins = [ Plugin ||
- Plugin = #plugin{name = Name} <- AvailablePlugins ++ Missing,
+ Plugin = #plugin{name = Name} <- AvailablePlugins,
re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match,
- if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
- OnlyEnabledAll -> (lists:member(Name,
- EnabledExplicitly) or
- lists:member(Name, EnabledImplicitly));
+ if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
+ OnlyEnabledAll -> lists:member(Name, EnabledExplicitly) or
+ lists:member(Name,EnabledImplicitly);
true -> true
end],
Plugins1 = usort_plugins(Plugins),
MaxWidth = lists:max([length(atom_to_list(Name)) ||
#plugin{name = Name} <- Plugins1] ++ [0]),
- [format_plugin(P, EnabledExplicitly, EnabledImplicitly,
- plugin_names(Missing), Format, MaxWidth) || P <- Plugins1],
+ case Format of
+ minimal -> ok;
+ _ -> io:format(" Configured: E = explicitly enabled; "
+ "e = implicitly enabled~n"
+ " | Status: ~s~n"
+ " |/~n", [rabbit_misc:format(StatusMsg, [Node])])
+ end,
+ [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Running,
+ Format, MaxWidth) || P <- Plugins1],
ok.
format_plugin(#plugin{name = Name, version = Version,
description = Description, dependencies = Deps},
- EnabledExplicitly, EnabledImplicitly, Missing,
- Format, MaxWidth) ->
- Glyph = case {lists:member(Name, EnabledExplicitly),
- lists:member(Name, EnabledImplicitly),
- lists:member(Name, Missing)} of
- {true, false, false} -> "[E]";
- {false, true, false} -> "[e]";
- {_, _, true} -> "[!]";
- _ -> "[ ]"
- end,
+ EnabledExplicitly, EnabledImplicitly, Running, Format,
+ MaxWidth) ->
+ EnabledGlyph = case {lists:member(Name, EnabledExplicitly),
+ lists:member(Name, EnabledImplicitly)} of
+ {true, false} -> "E";
+ {false, true} -> "e";
+ _ -> " "
+ end,
+ RunningGlyph = case lists:member(Name, Running) of
+ true -> "*";
+ false -> " "
+ end,
+ Glyph = rabbit_misc:format("[~s~s]", [EnabledGlyph, RunningGlyph]),
Opt = fun (_F, A, A) -> ok;
( F, A, _) -> io:format(F, [A])
end,
@@ -227,9 +295,9 @@ format_plugin(#plugin{name = Name, version = Version,
Opt("~s", Version, undefined),
io:format("~n");
verbose -> io:format("~s ~w~n", [Glyph, Name]),
- Opt(" Version: \t~s~n", Version, undefined),
- Opt(" Dependencies:\t~p~n", Deps, []),
- Opt(" Description: \t~s~n", Description, undefined),
+ Opt(" Version: \t~s~n", Version, undefined),
+ Opt(" Dependencies:\t~p~n", Deps, []),
+ Opt(" Description: \t~s~n", Description, undefined),
io:format("~n")
end.
@@ -240,8 +308,8 @@ fmt_list(Header, Plugins) ->
lists:flatten(
[Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]).
-fmt_missing(Desc, Missing) ->
- fmt_list("The following " ++ Desc ++ " could not be found:", Missing).
+fmt_missing(Missing) ->
+ fmt_list("The following plugins could not be found:", Missing).
usort_plugins(Plugins) ->
lists:usort(fun plugins_cmp/2, Plugins).
@@ -262,6 +330,51 @@ write_enabled_plugins(PluginsFile, Plugins) ->
PluginsFile, Reason}})
end.
-report_change() ->
- io:format("Plugin configuration has changed. "
- "Restart RabbitMQ for changes to take effect.~n").
+action_change(Opts, Node, Old, New, PluginsFile) ->
+ action_change0(proplists:get_bool(?OFFLINE_OPT, Opts),
+ proplists:get_bool(?ONLINE_OPT, Opts),
+ Node, Old, New, PluginsFile).
+
+action_change0(true, _Online, _Node, Same, Same, _PluginsFile) ->
+ %% Definitely nothing to do
+ ok;
+action_change0(true, _Online, _Node, _Old, _New, _PluginsFile) ->
+ io:format("Offline change; changes will take effect at broker restart.~n");
+action_change0(false, Online, Node, _Old, _New, PluginsFile) ->
+ sync(Node, Online, PluginsFile).
+
+sync(Node, ForceOnline, PluginsFile) ->
+ rpc_call(Node, ForceOnline, rabbit_plugins, ensure, [PluginsFile]).
+
+rpc_call(Node, Online, Mod, Fun, Args) ->
+ io:format("~nApplying plugin configuration to ~s...", [Node]),
+ case rpc:call(Node, Mod, Fun, Args) of
+ {ok, [], []} ->
+ io:format(" nothing to do.~n", []);
+ {ok, Start, []} ->
+ io:format(" started ~b plugin~s.~n", [length(Start), plur(Start)]);
+ {ok, [], Stop} ->
+ io:format(" stopped ~b plugin~s.~n", [length(Stop), plur(Stop)]);
+ {ok, Start, Stop} ->
+ io:format(" stopped ~b plugin~s and started ~b plugin~s.~n",
+ [length(Stop), plur(Stop), length(Start), plur(Start)]);
+ {badrpc, nodedown} = Error ->
+ io:format(" failed.~n", []),
+ case Online of
+ true -> Error;
+ false -> io:format(
+ " * Could not contact node ~s.~n"
+ " Changes will take effect at broker restart.~n"
+ " * Options: --online - fail if broker cannot be "
+ "contacted.~n"
+ " --offline - do not try to contact "
+ "broker.~n",
+ [Node])
+ end;
+ Error ->
+ io:format(" failed.~n", []),
+ Error
+ end.
+
+plur([_]) -> "";
+plur(_) -> "s".
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index fe2b766f..cc88765f 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -34,7 +34,8 @@ register() ->
{policy_validator, <<"dead-letter-routing-key">>},
{policy_validator, <<"message-ttl">>},
{policy_validator, <<"expires">>},
- {policy_validator, <<"max-length">>}]],
+ {policy_validator, <<"max-length">>},
+ {policy_validator, <<"max-length-bytes">>}]],
ok.
validate_policy(Terms) ->
@@ -61,13 +62,13 @@ validate_policy0(<<"dead-letter-routing-key">>, Value) ->
{error, "~p is not a valid dead letter routing key", [Value]};
validate_policy0(<<"message-ttl">>, Value)
- when is_integer(Value), Value >= 0, Value =< ?MAX_EXPIRY_TIMER ->
+ when is_integer(Value), Value >= 0 ->
ok;
validate_policy0(<<"message-ttl">>, Value) ->
{error, "~p is not a valid message TTL", [Value]};
validate_policy0(<<"expires">>, Value)
- when is_integer(Value), Value >= 1, Value =< ?MAX_EXPIRY_TIMER ->
+ when is_integer(Value), Value >= 1 ->
ok;
validate_policy0(<<"expires">>, Value) ->
{error, "~p is not a valid queue expiry", [Value]};
@@ -76,6 +77,10 @@ validate_policy0(<<"max-length">>, Value)
when is_integer(Value), Value >= 0 ->
ok;
validate_policy0(<<"max-length">>, Value) ->
- {error, "~p is not a valid maximum length", [Value]}.
-
+ {error, "~p is not a valid maximum length", [Value]};
+validate_policy0(<<"max-length-bytes">>, Value)
+ when is_integer(Value), Value >= 0 ->
+ ok;
+validate_policy0(<<"max-length-bytes">>, Value) ->
+ {error, "~p is not a valid maximum length in bytes", [Value]}.
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 0a69fb32..f5d03360 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -46,17 +46,11 @@ name(#exchange{policy = Policy}) -> name0(Policy).
name0(undefined) -> none;
name0(Policy) -> pget(name, Policy).
-set(Q = #amqqueue{name = Name}) -> rabbit_queue_decorator:set(
- Q#amqqueue{policy = set0(Name)});
-set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set(
- X#exchange{policy = set0(Name)}).
+set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
+set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)).
-set(Q = #amqqueue{name = Name}, Ps) -> Q#amqqueue{policy = match(Name, Ps)};
-set(X = #exchange{name = Name}, Ps) -> rabbit_exchange_decorator:set(
- X#exchange{policy = match(Name, Ps)}).
-
get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy);
get(Name, #exchange{policy = Policy}) -> get0(Name, Policy);
%% Caution - SLOW.
@@ -104,12 +98,18 @@ recover0() ->
Policies = list(),
[rabbit_misc:execute_mnesia_transaction(
fun () ->
- mnesia:write(rabbit_durable_exchange, set(X, Policies), write)
- end) || X <- Xs],
+ mnesia:write(
+ rabbit_durable_exchange,
+ rabbit_exchange_decorator:set(
+ X#exchange{policy = match(Name, Policies)}), write)
+ end) || X = #exchange{name = Name} <- Xs],
[rabbit_misc:execute_mnesia_transaction(
fun () ->
- mnesia:write(rabbit_durable_queue, set(Q, Policies), write)
- end) || Q <- Qs],
+ mnesia:write(
+ rabbit_durable_queue,
+ rabbit_queue_decorator:set(
+ Q#amqqueue{policy = match(Name, Policies)}), write)
+ end) || Q = #amqqueue{name = Name} <- Qs],
ok.
invalid_file() ->
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
index 6205e2dc..adfe0c7f 100644
--- a/src/rabbit_queue_decorator.erl
+++ b/src/rabbit_queue_decorator.erl
@@ -2,7 +2,7 @@
-include("rabbit.hrl").
--export([select/1, set/1]).
+-export([select/1, set/1, register/2, unregister/1]).
%%----------------------------------------------------------------------------
@@ -41,3 +41,24 @@ select(Modules) ->
set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}.
list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)].
+
+register(TypeName, ModuleName) ->
+ rabbit_registry:register(queue_decorator, TypeName, ModuleName),
+ [maybe_recover(Q) || Q <- rabbit_amqqueue:list()],
+ ok.
+
+unregister(TypeName) ->
+ rabbit_registry:unregister(queue_decorator, TypeName),
+ [maybe_recover(Q) || Q <- rabbit_amqqueue:list()],
+ ok.
+
+maybe_recover(Q = #amqqueue{name = Name,
+ decorators = Decs}) ->
+ #amqqueue{decorators = Decs1} = set(Q),
+ Old = lists:sort(select(Decs)),
+ New = lists:sort(select(Decs1)),
+ case New of
+ Old -> ok;
+ _ -> [M:startup(Q) || M <- New -- Old],
+ rabbit_amqqueue:update_decorators(Name)
+ end.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 56c19d3f..0f572866 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -21,7 +21,7 @@
publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
--export([add_queue_ttl/0, avoid_zeroes/0]).
+-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -140,8 +140,11 @@
-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
-%% 16 bytes for md5sum + 8 for expiry
--define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES)).
+-define(SIZE_BYTES, 4).
+-define(SIZE_BITS, (?SIZE_BYTES * 8)).
+
+%% 16 bytes for md5sum + 8 for expiry + 4 for size
+-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)).
%% + 2 for seq, bits and prefix
-define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)).
@@ -168,8 +171,9 @@
%%----------------------------------------------------------------------------
--rabbit_upgrade({add_queue_ttl, local, []}).
--rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}).
+-rabbit_upgrade({add_queue_ttl, local, []}).
+-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}).
+-rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}).
-ifdef(use_specs).
@@ -199,7 +203,8 @@
-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
-spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
contains_predicate(), on_sync_fun()) ->
- {'undefined' | non_neg_integer(), qistate()}).
+ {'undefined' | non_neg_integer(),
+ 'undefined' | non_neg_integer(), qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
-spec(publish/5 :: (rabbit_types:msg_id(), seq_id(),
@@ -415,7 +420,7 @@ init_clean(RecoveredCounts, State) ->
end, Segments, RecoveredCounts),
%% the counts above include transient messages, which would be the
%% wrong thing to return
- {undefined, State1 # qistate { segments = Segments1 }}.
+ {undefined, undefined, State1 # qistate { segments = Segments1 }}.
init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% Recover the journal completely. This will also load segments
@@ -424,7 +429,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% and the journal.
State1 = #qistate { dir = Dir, segments = Segments } =
recover_journal(State),
- {Segments1, Count, DirtyCount} =
+ {Segments1, Count, Bytes, DirtyCount} =
%% Load each segment in turn and filter out messages that are
%% not in the msg_store, by adding acks to the journal. These
%% acks only go to the RAM journal as it doesn't matter if we
@@ -433,16 +438,18 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% dirty count here, so we can call maybe_flush_journal below
%% and avoid unnecessary file system operations.
lists:foldl(
- fun (Seg, {Segments2, CountAcc, DirtyCount}) ->
- {Segment = #segment { unacked = UnackedCount }, Dirty} =
+ fun (Seg, {Segments2, CountAcc, BytesAcc, DirtyCount}) ->
+ {{Segment = #segment { unacked = UnackedCount }, Dirty},
+ UnackedBytes} =
recover_segment(ContainsCheckFun, CleanShutdown,
segment_find_or_new(Seg, Dir, Segments2)),
{segment_store(Segment, Segments2),
- CountAcc + UnackedCount, DirtyCount + Dirty}
- end, {Segments, 0, 0}, all_segment_nums(State1)),
+ CountAcc + UnackedCount,
+ BytesAcc + UnackedBytes, DirtyCount + Dirty}
+ end, {Segments, 0, 0, 0}, all_segment_nums(State1)),
State2 = maybe_flush_journal(State1 #qistate { segments = Segments1,
dirty_count = DirtyCount }),
- {Count, State2}.
+ {Count, Bytes, State2}.
terminate(State = #qistate { journal_handle = JournalHdl,
segments = Segments }) ->
@@ -464,12 +471,16 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq, {{MsgId, _MsgProps, _IsPersistent}, Del, no_ack},
- SegmentAndDirtyCount) ->
- recover_message(ContainsCheckFun(MsgId), CleanShutdown,
- Del, RelSeq, SegmentAndDirtyCount)
+ fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, Del, no_ack},
+ {SegmentAndDirtyCount, Bytes}) ->
+ {recover_message(ContainsCheckFun(MsgId), CleanShutdown,
+ Del, RelSeq, SegmentAndDirtyCount),
+ Bytes + case IsPersistent of
+ true -> MsgProps#message_properties.size;
+ false -> 0
+ end}
end,
- {Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0},
+ {{Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, 0},
SegEntries1).
recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount) ->
@@ -549,13 +560,15 @@ scan_segments(Fun, Acc, State) ->
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
-create_pub_record_body(MsgId, #message_properties { expiry = Expiry }) ->
- [MsgId, expiry_to_binary(Expiry)].
+create_pub_record_body(MsgId, #message_properties { expiry = Expiry,
+ size = Size }) ->
+ [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>].
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
-parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) ->
+parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
+ Size:?SIZE_BITS>>) ->
%% work around for binary data fragmentation. See
%% rabbit_msg_file:read_next/2
<<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
@@ -563,7 +576,8 @@ parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>) ->
?NO_EXPIRY -> undefined;
X -> X
end,
- {MsgId, #message_properties { expiry = Exp }}.
+ {MsgId, #message_properties { expiry = Exp,
+ size = Size }}.
%%----------------------------------------------------------------------------
%% journal manipulation
@@ -1064,6 +1078,42 @@ avoid_zeroes_segment(<<0:?REL_SEQ_ONLY_PREFIX_BITS,
avoid_zeroes_segment(_) ->
stop.
+%% At upgrade time we just define every message's size as 0 - that
+%% will save us a load of faff with the message store, and means we
+%% can actually use the clean recovery terms in VQ. It does mean we
+%% don't count message bodies from before the migration, but we can
+%% live with that.
+store_msg_size() ->
+ foreach_queue_index({fun store_msg_size_journal/1,
+ fun store_msg_size_segment/1}).
+
+store_msg_size_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_size_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_size_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
+ Rest/binary>>) ->
+ {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest};
+store_msg_size_journal(_) ->
+ stop.
+
+store_msg_size_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Rest/binary>>) ->
+ {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest};
+store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+store_msg_size_segment(_) ->
+ stop.
+
+
%%----------------------------------------------------------------------------
foreach_queue_index(Funs) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 4d4ce4a2..b2930f88 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -27,7 +27,6 @@
-export([conserve_resources/3, server_properties/1]).
--define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 30).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
@@ -43,7 +42,7 @@
-record(connection, {name, host, peer_host, port, peer_port,
protocol, user, timeout_sec, frame_max, channel_max, vhost,
client_properties, capabilities,
- auth_mechanism, auth_state}).
+ auth_mechanism, auth_state, connected_at}).
-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at}).
@@ -55,7 +54,7 @@
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
peer_cert_validity, auth_mechanism, ssl_protocol,
ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
- timeout, frame_max, channel_max, client_properties]).
+ timeout, frame_max, channel_max, client_properties, connected_at]).
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
@@ -189,10 +188,10 @@ server_capabilities(_) ->
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
socket_error(Reason) when is_atom(Reason) ->
- log(error, "error on AMQP connection ~p: ~s~n",
+ log(error, "Error on AMQP connection ~p: ~s~n",
[self(), rabbit_misc:format_inet_error(Reason)]);
socket_error(Reason) ->
- log(error, "error on AMQP connection ~p:~n~p~n", [self(), Reason]).
+ log(error, "Error on AMQP connection ~p:~n~p~n", [self(), Reason]).
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
@@ -216,8 +215,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
exit(normal)
end,
log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]),
+ {ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout),
ClientSock = socket_op(Sock, SockTransform),
- erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
+ erlang:send_after(HandshakeTimeout, self(), handshake_timeout),
{PeerHost, PeerPort, Host, Port} =
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
?store_proc_name(list_to_binary(Name)),
@@ -231,13 +231,14 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
peer_port = PeerPort,
protocol = none,
user = none,
- timeout_sec = ?HANDSHAKE_TIMEOUT,
+ timeout_sec = (HandshakeTimeout / 1000),
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
client_properties = none,
capabilities = [],
auth_mechanism = none,
- auth_state = none},
+ auth_state = none,
+ connected_at = rabbit_misc:now_to_ms(os:timestamp())},
callback = uninitialized_callback,
recv_len = 0,
pending_recv = false,
@@ -410,7 +411,7 @@ handle_other({'$gen_cast', {force_event_refresh, Ref}}, State)
rabbit_event:notify(
connection_created,
[{type, network} | infos(?CREATION_EVENT_KEYS, State)], Ref),
- State;
+ rabbit_event:init_stats_timer(State, #v1.stats_timer);
handle_other({'$gen_cast', force_event_refresh}, State) ->
%% Ignore, we will emit a created event once we start running.
State;
@@ -548,21 +549,27 @@ wait_for_channel_termination(0, TimerRef, State) ->
end;
_ -> State
end;
-wait_for_channel_termination(N, TimerRef, State) ->
+wait_for_channel_termination(N, TimerRef,
+ State = #v1{connection_state = CS,
+ connection = #connection{
+ name = ConnName,
+ user = User,
+ vhost = VHost}}) ->
receive
{'DOWN', _MRef, process, ChPid, Reason} ->
{Channel, State1} = channel_cleanup(ChPid, State),
case {Channel, termination_kind(Reason)} of
- {undefined, _} -> exit({abnormal_dependent_exit,
- ChPid, Reason});
- {_, controlled} -> wait_for_channel_termination(
- N-1, TimerRef, State1);
- {_, uncontrolled} -> log(error,
- "AMQP connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
- [self(), Channel, Reason]),
- wait_for_channel_termination(
- N-1, TimerRef, State1)
+ {undefined, _} ->
+ exit({abnormal_dependent_exit, ChPid, Reason});
+ {_, controlled} ->
+ wait_for_channel_termination(N-1, TimerRef, State1);
+ {_, uncontrolled} ->
+ log(error, "Error on AMQP connection ~p (~s, vhost: '~s',"
+ " user: '~s', state: ~p), channel ~p:"
+ "error while terminating:~n~p~n",
+ [self(), ConnName, VHost, User#user.username,
+ CS, Channel, Reason]),
+ wait_for_channel_termination(N-1, TimerRef, State1)
end;
cancel_wait ->
exit(channel_termination_timeout)
@@ -581,16 +588,24 @@ maybe_close(State) ->
termination_kind(normal) -> controlled;
termination_kind(_) -> uncontrolled.
+log_hard_error(#v1{connection_state = CS,
+ connection = #connection{
+ name = ConnName,
+ user = User,
+ vhost = VHost}}, Channel, Reason) ->
+ log(error,
+ "Error on AMQP connection ~p (~s, vhost: '~s',"
+ " user: '~s', state: ~p), channel ~p:~n~p~n",
+ [self(), ConnName, VHost, User#user.username, CS, Channel, Reason]).
+
handle_exception(State = #v1{connection_state = closed}, Channel, Reason) ->
- log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
- [self(), closed, Channel, Reason]),
+ log_hard_error(State, Channel, Reason),
State;
handle_exception(State = #v1{connection = #connection{protocol = Protocol},
connection_state = CS},
Channel, Reason)
when ?IS_RUNNING(State) orelse CS =:= closing ->
- log(error, "AMQP connection ~p (~p), channel ~p - error:~n~p~n",
- [self(), CS, Channel, Reason]),
+ log_hard_error(State, Channel, Reason),
{0, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
State1 = close_connection(terminate_channels(State)),
@@ -1130,6 +1145,7 @@ ic(channel_max, #connection{channel_max = ChMax}) -> ChMax;
ic(client_properties, #connection{client_properties = CP}) -> CP;
ic(auth_mechanism, #connection{auth_mechanism = none}) -> none;
ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name;
+ic(connected_at, #connection{connected_at = T}) -> T;
ic(Item, #connection{}) -> throw({bad_argument, Item}).
socket_info(Get, Select, #v1{sock = Sock}) ->
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index e23e3e6b..9f837cce 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -23,11 +23,14 @@
-export([start/0, stop/0, store/2, read/1, clear/0]).
--export([upgrade_recovery_terms/0, start_link/0]).
+-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-export([upgrade_recovery_terms/0, persistent_bytes/0]).
+
-rabbit_upgrade({upgrade_recovery_terms, local, []}).
+-rabbit_upgrade({persistent_bytes, local, [upgrade_recovery_terms]}).
%%----------------------------------------------------------------------------
@@ -61,6 +64,8 @@ clear() ->
ok = dets:delete_all_objects(?MODULE),
flush().
+start_link() -> gen_server:start_link(?MODULE, [], []).
+
%%----------------------------------------------------------------------------
upgrade_recovery_terms() ->
@@ -84,7 +89,20 @@ upgrade_recovery_terms() ->
close_table()
end.
-start_link() -> gen_server:start_link(?MODULE, [], []).
+persistent_bytes() -> dets_upgrade(fun persistent_bytes/1).
+persistent_bytes(Props) -> Props ++ [{persistent_bytes, 0}].
+
+dets_upgrade(Fun)->
+ open_table(),
+ try
+ ok = dets:foldl(fun ({DirBaseName, Terms}, Acc) ->
+ store(DirBaseName, Fun(Terms)),
+ Acc
+ end, ok, ?MODULE),
+ ok
+ after
+ close_table()
+ end.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index da75932d..fe2c3b58 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -70,7 +70,13 @@ wait_for_replicated() ->
not lists:member({local_content, true}, TabDef)]).
wait(TableNames) ->
- case mnesia:wait_for_tables(TableNames, 30000) of
+ %% We might be in ctl here for offline ops, in which case we can't
+ %% get_env() for the rabbit app.
+ Timeout = case application:get_env(rabbit, mnesia_table_loading_timeout) of
+ {ok, T} -> T;
+ undefined -> 30000
+ end,
+ case mnesia:wait_for_tables(TableNames, Timeout) of
ok ->
ok;
{timeout, BadTabs} ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index da6938bd..a186fb7a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -37,7 +37,7 @@ all_tests() ->
ok = supervisor2_tests:test_all(),
passed = gm_tests:all_tests(),
passed = mirrored_supervisor_tests:all_tests(),
- application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
+ application:set_env(rabbit, file_handles_high_watermark, 10),
ok = file_handle_cache:set_limit(10),
passed = test_version_equivalance(),
passed = test_file_handle_cache(),
@@ -1870,22 +1870,20 @@ test_backing_queue() ->
{ok, rabbit_variable_queue} ->
{ok, FileSizeLimit} =
application:get_env(rabbit, msg_store_file_size_limit),
- application:set_env(rabbit, msg_store_file_size_limit, 512,
- infinity),
+ application:set_env(rabbit, msg_store_file_size_limit, 512),
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
- application:set_env(rabbit, queue_index_max_journal_entries, 128,
- infinity),
+ application:set_env(rabbit, queue_index_max_journal_entries, 128),
passed = test_msg_store(),
application:set_env(rabbit, msg_store_file_size_limit,
- FileSizeLimit, infinity),
+ FileSizeLimit),
passed = test_queue_index(),
passed = test_queue_index_props(),
passed = test_variable_queue(),
passed = test_variable_queue_delete_msg_store_files_callback(),
passed = test_queue_recover(),
application:set_env(rabbit, queue_index_max_journal_entries,
- MaxJournal, infinity),
+ MaxJournal),
%% We will have restarted the message store, and thus changed
%% the order of the children of rabbit_sup. This will cause
%% problems if there are subsequent failures - see bug 24262.
@@ -2210,13 +2208,13 @@ restart_test_queue(Qi) ->
empty_test_queue() ->
ok = rabbit_variable_queue:stop(),
{ok, _} = rabbit_variable_queue:start([]),
- {0, Qi} = init_test_queue(),
+ {0, 0, Qi} = init_test_queue(),
_ = rabbit_queue_index:delete_and_terminate(Qi),
ok.
with_empty_test_queue(Fun) ->
ok = empty_test_queue(),
- {0, Qi} = init_test_queue(),
+ {0, 0, Qi} = init_test_queue(),
rabbit_queue_index:delete_and_terminate(Fun(Qi)).
restart_app() ->
@@ -2235,7 +2233,8 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) ->
MsgId = rabbit_guid:gen(),
QiM = rabbit_queue_index:publish(
- MsgId, SeqId, #message_properties{}, Persistent, QiN),
+ MsgId, SeqId, #message_properties{size = 10},
+ Persistent, QiN),
ok = rabbit_msg_store:write(MsgId, MsgId, MSCState),
{QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]}
end, {Qi, []}, SeqIds),
@@ -2257,7 +2256,7 @@ test_queue_index_props() ->
with_empty_test_queue(
fun(Qi0) ->
MsgId = rabbit_guid:gen(),
- Props = #message_properties{expiry=12345},
+ Props = #message_properties{expiry=12345, size = 10},
Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0),
{[{MsgId, 1, Props, _, _}], Qi2} =
rabbit_queue_index:read(1, 2, Qi1),
@@ -2287,7 +2286,7 @@ test_queue_index() ->
ok = verify_read_with_published(false, false, ReadA,
lists:reverse(SeqIdsMsgIdsA)),
%% should get length back as 0, as all the msgs were transient
- {0, Qi6} = restart_test_queue(Qi4),
+ {0, 0, Qi6} = restart_test_queue(Qi4),
{0, 0, Qi7} = rabbit_queue_index:bounds(Qi6),
{Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7),
{0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8),
@@ -2296,7 +2295,8 @@ test_queue_index() ->
lists:reverse(SeqIdsMsgIdsB)),
%% should get length back as MostOfASegment
LenB = length(SeqIdsB),
- {LenB, Qi12} = restart_test_queue(Qi10),
+ BytesB = LenB * 10,
+ {LenB, BytesB, Qi12} = restart_test_queue(Qi10),
{0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12),
Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13),
{ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14),
@@ -2308,7 +2308,7 @@ test_queue_index() ->
{0, 0, Qi18} = rabbit_queue_index:bounds(Qi17),
%% should get length back as 0 because all persistent
%% msgs have been acked
- {0, Qi19} = restart_test_queue(Qi18),
+ {0, 0, Qi19} = restart_test_queue(Qi18),
Qi19
end),
@@ -2380,11 +2380,11 @@ test_queue_index() ->
true, Qi0),
Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1),
Qi3 = rabbit_queue_index:ack([0], Qi2),
- {5, Qi4} = restart_test_queue(Qi3),
+ {5, 50, Qi4} = restart_test_queue(Qi3),
{Qi5, _SeqIdsMsgIdsF} = queue_index_publish([3,6,8], true, Qi4),
Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5),
Qi7 = rabbit_queue_index:ack([1,2,3], Qi6),
- {5, Qi8} = restart_test_queue(Qi7),
+ {5, 50, Qi8} = restart_test_queue(Qi7),
Qi8
end),
@@ -2419,7 +2419,8 @@ variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) ->
false -> 1
end},
PayloadFun(N)),
- PropFun(N, #message_properties{}), false, self(), VQN)
+ PropFun(N, #message_properties{size = 10}),
+ false, self(), VQN)
end, VQ, lists:seq(Start, Start + Count - 1))).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
@@ -2454,7 +2455,7 @@ with_fresh_variable_queue(Fun) ->
spawn_link(fun() ->
ok = empty_test_queue(),
VQ = variable_queue_init(test_amqqueue(true), false),
- S0 = rabbit_variable_queue:status(VQ),
+ S0 = variable_queue_status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta,
{delta, undefined, 0, undefined}},
@@ -2579,7 +2580,7 @@ variable_queue_with_holes(VQ0) ->
{delta, _, 0, _} -> true;
0 -> true;
_ -> false
- end || {K, V} <- rabbit_variable_queue:status(VQ8),
+ end || {K, V} <- variable_queue_status(VQ8),
lists:member(K, [q1, delta, q3])],
Depth = Count + Interval,
Depth = rabbit_variable_queue:depth(VQ8),
@@ -2649,17 +2650,20 @@ test_variable_queue_ack_limiting(VQ0) ->
%% fetch half the messages
{VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3),
- VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2},
- {ram_ack_count, Len div 2},
- {ram_msg_count, Len div 2}]),
+ VQ5 = check_variable_queue_status(
+ VQ4, [{len, Len div 2},
+ {messages_unacknowledged_ram, Len div 2},
+ {messages_ready_ram, Len div 2},
+ {messages_ram, Len}]),
%% ensure all acks go to disk on 0 duration target
VQ6 = check_variable_queue_status(
variable_queue_set_ram_duration_target(0, VQ5),
- [{len, Len div 2},
- {target_ram_count, 0},
- {ram_msg_count, 0},
- {ram_ack_count, 0}]),
+ [{len, Len div 2},
+ {target_ram_count, 0},
+ {messages_unacknowledged_ram, 0},
+ {messages_ready_ram, 0},
+ {messages_ram, 0}]),
VQ6.
@@ -2760,7 +2764,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
fun (Duration1, VQ4) ->
{_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4),
io:format("~p:~n~p~n",
- [Duration1, rabbit_variable_queue:status(VQ5)]),
+ [Duration1, variable_queue_status(VQ5)]),
VQ6 = variable_queue_set_ram_duration_target(
Duration1, VQ5),
publish_fetch_and_ack(Churn, Len, VQ6)
@@ -2820,11 +2824,16 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
check_variable_queue_status(VQ0, Props) ->
VQ1 = variable_queue_wait_for_shuffling_end(VQ0),
- S = rabbit_variable_queue:status(VQ1),
+ S = variable_queue_status(VQ1),
io:format("~p~n", [S]),
assert_props(S, Props),
VQ1.
+variable_queue_status(VQ) ->
+ Keys = rabbit_backing_queue:info_keys() -- [backing_queue_status],
+ [{K, rabbit_variable_queue:info(K, VQ)} || K <- Keys] ++
+ rabbit_variable_queue:info(backing_queue_status, VQ).
+
variable_queue_wait_for_shuffling_end(VQ) ->
case credit_flow:blocked() of
false -> VQ;
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index aafd81df..dbc2856d 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -16,7 +16,7 @@
-module(rabbit_trace).
--export([init/1, enabled/1, tap_in/2, tap_out/2, start/1, stop/1]).
+-export([init/1, enabled/1, tap_in/5, tap_out/5, start/1, stop/1]).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -32,8 +32,12 @@
-spec(init/1 :: (rabbit_types:vhost()) -> state()).
-spec(enabled/1 :: (rabbit_types:vhost()) -> boolean()).
--spec(tap_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok').
--spec(tap_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok').
+-spec(tap_in/5 :: (rabbit_types:basic_message(), binary(),
+ rabbit_channel:channel_number(),
+ rabbit_types:username(), state()) -> 'ok').
+-spec(tap_out/5 :: (rabbit_amqqueue:qmsg(), binary(),
+ rabbit_channel:channel_number(),
+ rabbit_types:username(), state()) -> 'ok').
-spec(start/1 :: (rabbit_types:vhost()) -> 'ok').
-spec(stop/1 :: (rabbit_types:vhost()) -> 'ok').
@@ -54,15 +58,27 @@ enabled(VHost) ->
{ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS),
lists:member(VHost, VHosts).
-tap_in(_Msg, none) -> ok;
-tap_in(Msg = #basic_message{exchange_name = #resource{name = XName}}, TraceX) ->
- trace(TraceX, Msg, <<"publish">>, XName, []).
-
-tap_out(_Msg, none) -> ok;
-tap_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}, TraceX) ->
+tap_in(_Msg, _ConnName, _ChannelNum, _Username, none) -> ok;
+tap_in(Msg = #basic_message{exchange_name = #resource{name = XName,
+ virtual_host = VHost}},
+ ConnName, ChannelNum, Username, TraceX) ->
+ trace(TraceX, Msg, <<"publish">>, XName,
+ [{<<"vhost">>, longstr, VHost},
+ {<<"connection">>, longstr, ConnName},
+ {<<"channel">>, signedint, ChannelNum},
+ {<<"user">>, longstr, Username}]).
+
+tap_out(_Msg, _ConnName, _ChannelNum, _Username, none) -> ok;
+tap_out({#resource{name = QName, virtual_host = VHost},
+ _QPid, _QMsgId, Redelivered, Msg},
+ ConnName, ChannelNum, Username, TraceX) ->
RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
trace(TraceX, Msg, <<"deliver">>, QName,
- [{<<"redelivered">>, signedint, RedeliveredNum}]).
+ [{<<"redelivered">>, signedint, RedeliveredNum},
+ {<<"vhost">>, longstr, VHost},
+ {<<"connection">>, longstr, ConnName},
+ {<<"channel">>, signedint, ChannelNum},
+ {<<"user">>, longstr, Username}]).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 8ab35a89..72bf7855 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -190,7 +190,7 @@ die(Msg, Args) ->
%% We don't throw or exit here since that gets thrown
%% straight out into do_boot, generating an erl_crash.dump
%% and displaying any error message in a confusing way.
- error_logger:error_msg(Msg, Args),
+ rabbit_log:error(Msg, Args),
Str = rabbit_misc:format(
"~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args),
io:format(Str),
@@ -281,6 +281,4 @@ node_type_legacy() ->
false -> ram
end.
-%% NB: we cannot use rabbit_log here since it may not have been
-%% started yet
-info(Msg, Args) -> error_logger:info_msg(Msg, Args).
+info(Msg, Args) -> rabbit_log:info(Msg, Args).
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index b6d37852..1104f373 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -48,6 +48,7 @@
-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}).
-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}).
-rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}).
+-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}).
%% -------------------------------------------------------------------
@@ -77,6 +78,8 @@
-spec(policy_apply_to/0 :: () -> 'ok').
-spec(queue_decorators/0 :: () -> 'ok').
-spec(internal_system_x/0 :: () -> 'ok').
+-spec(cluster_name/0 :: () -> 'ok').
+-spec(down_slave_nodes/0 :: () -> 'ok').
-endif.
@@ -382,6 +385,21 @@ cluster_name_tx() ->
[mnesia:delete(T, K, write) || K <- Ks],
ok.
+down_slave_nodes() ->
+ ok = down_slave_nodes(rabbit_queue),
+ ok = down_slave_nodes(rabbit_durable_queue).
+
+down_slave_nodes(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, Policy, GmPids, Decorators}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, [], Policy, GmPids, Decorators}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ede69748..e97ed491 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -22,7 +22,7 @@
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
- status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
+ info/2, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -254,13 +254,17 @@
durable,
transient_threshold,
- len,
- persistent_count,
+ len, %% w/o unacked
+ bytes, %% w/o unacked
+ unacked_bytes,
+ persistent_count, %% w unacked
+ persistent_bytes, %% w unacked
target_ram_count,
- ram_msg_count,
+ ram_msg_count, %% w/o unacked
ram_msg_count_prev,
ram_ack_count_prev,
+ ram_bytes, %% w unacked
out_counter,
in_counter,
rates,
@@ -343,11 +347,17 @@
transient_threshold :: non_neg_integer(),
len :: non_neg_integer(),
+ bytes :: non_neg_integer(),
+ unacked_bytes :: non_neg_integer(),
+
persistent_count :: non_neg_integer(),
+ persistent_bytes :: non_neg_integer(),
target_ram_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
+ ram_ack_count_prev :: non_neg_integer(),
+ ram_bytes :: non_neg_integer(),
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
rates :: rates(),
@@ -425,7 +435,7 @@ init(Queue, Recover, AsyncCallback) ->
init(#amqqueue { name = QueueName, durable = IsDurable }, new,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
- init(IsDurable, IndexState, 0, [],
+ init(IsDurable, IndexState, 0, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
MsgOnDiskFun, AsyncCallback);
@@ -440,7 +450,7 @@ init(#amqqueue { name = QueueName, durable = true }, Terms,
MsgOnDiskFun, AsyncCallback),
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
undefined, AsyncCallback),
- {DeltaCount, IndexState} =
+ {DeltaCount, DeltaBytes, IndexState} =
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
@@ -448,7 +458,7 @@ init(#amqqueue { name = QueueName, durable = true }, Terms,
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, RecoveryTerms,
+ init(true, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
PersistentClient, TransientClient).
process_recovery_terms(Terms=non_clean_shutdown) ->
@@ -461,6 +471,7 @@ process_recovery_terms(Terms) ->
terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
+ persistent_bytes = PBytes,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
purge_pending_ack(true, State),
@@ -470,7 +481,9 @@ terminate(_Reason, State) ->
rabbit_msg_store:client_ref(MSCStateP)
end,
ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT),
- Terms = [{persistent_ref, PRef}, {persistent_count, PCount}],
+ Terms = [{persistent_ref, PRef},
+ {persistent_count, PCount},
+ {persistent_bytes, PBytes}],
a(State1 #vqstate { index_state = rabbit_queue_index:terminate(
Terms, IndexState),
msg_store_clients = undefined }).
@@ -498,28 +511,35 @@ purge(State = #vqstate { q4 = Q4,
index_state = IndexState,
msg_store_clients = MSCState,
len = Len,
- persistent_count = PCount }) ->
+ ram_bytes = RamBytes,
+ persistent_count = PCount,
+ persistent_bytes = PBytes }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
- {LensByStore, IndexState1} = remove_queue_entries(
- fun ?QUEUE:foldl/3, Q4,
- orddict:new(), IndexState, MSCState),
- {LensByStore1, State1 = #vqstate { q1 = Q1,
- index_state = IndexState2,
- msg_store_clients = MSCState1 }} =
- purge_betas_and_deltas(LensByStore,
- State #vqstate { q4 = ?QUEUE:new(),
- index_state = IndexState1 }),
- {LensByStore2, IndexState3} = remove_queue_entries(
- fun ?QUEUE:foldl/3, Q1,
- LensByStore1, IndexState2, MSCState1),
- PCount1 = PCount - find_persistent_count(LensByStore2),
+ Stats = {RamBytes, PCount, PBytes},
+ {Stats1, IndexState1} =
+ remove_queue_entries(Q4, Stats, IndexState, MSCState),
+
+ {Stats2, State1 = #vqstate { q1 = Q1,
+ index_state = IndexState2,
+ msg_store_clients = MSCState1 }} =
+
+ purge_betas_and_deltas(
+ Stats1, State #vqstate { q4 = ?QUEUE:new(),
+ index_state = IndexState1 }),
+
+ {{RamBytes3, PCount3, PBytes3}, IndexState3} =
+ remove_queue_entries(Q1, Stats2, IndexState2, MSCState1),
+
{Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
index_state = IndexState3,
len = 0,
+ bytes = 0,
ram_msg_count = 0,
- persistent_count = PCount1 })}.
+ ram_bytes = RamBytes3,
+ persistent_count = PCount3,
+ persistent_bytes = PBytes3 })}.
purge_acks(State) -> a(purge_pending_ack(false, State)).
@@ -542,11 +562,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
InCount1 = InCount + 1,
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount1,
- persistent_count = PCount1,
- unconfirmed = UC1 }),
+ State3 = upd_bytes(
+ 1, 0, MsgStatus1,
+ inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 })),
a(reduce_memory_use(maybe_update_rates(State3))).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
@@ -565,11 +587,12 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = State2 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- unconfirmed = UC1 },
+ State3 = upd_bytes(0, 1, MsgStatus,
+ State2 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 }),
{SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}.
discard(_MsgId, _ChPid, State) -> State.
@@ -638,9 +661,8 @@ ack([SeqId], State) ->
index_on_disk = IndexOnDisk },
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
- persistent_count = PCount,
ack_out_counter = AckOutCount }} =
- remove_pending_ack(SeqId, State),
+ remove_pending_ack(true, SeqId, State),
IndexState1 = case IndexOnDisk of
true -> rabbit_queue_index:ack([SeqId], IndexState);
false -> IndexState
@@ -649,30 +671,24 @@ ack([SeqId], State) ->
true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
false -> ok
end,
- PCount1 = PCount - one_if(IsPersistent),
{[MsgId],
a(State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
ack_out_counter = AckOutCount + 1 })};
ack(AckTags, State) ->
{{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
- persistent_count = PCount,
ack_out_counter = AckOutCount }} =
lists:foldl(
fun (SeqId, {Acc, State2}) ->
- {MsgStatus, State3} = remove_pending_ack(SeqId, State2),
+ {MsgStatus, State3} = remove_pending_ack(true, SeqId, State2),
{accumulate_ack(MsgStatus, Acc), State3}
end, {accumulate_ack_init(), State}, AckTags),
IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
- PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
- orddict:new(), MsgIdsByStore)),
{lists:reverse(AllMsgIds),
a(State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
requeue(AckTags, #vqstate { delta = Delta,
@@ -821,15 +837,30 @@ msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
out = AvgEgressRate } }) ->
{AvgIngressRate, AvgEgressRate}.
-status(#vqstate {
+info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) ->
+ RamMsgCount;
+info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA}) ->
+ gb_trees:size(RPA);
+info(messages_ram, State) ->
+ info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State);
+info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
+ PersistentCount;
+info(message_bytes, #vqstate{bytes = Bytes,
+ unacked_bytes = UBytes}) ->
+ Bytes + UBytes;
+info(message_bytes_ready, #vqstate{bytes = Bytes}) ->
+ Bytes;
+info(message_bytes_unacknowledged, #vqstate{unacked_bytes = UBytes}) ->
+ UBytes;
+info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
+ RamBytes;
+info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
+ PersistentBytes;
+info(backing_queue_status, #vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
- ram_pending_ack = RPA,
- disk_pending_ack = DPA,
target_ram_count = TargetRamCount,
- ram_msg_count = RamMsgCount,
next_seq_id = NextSeqId,
- persistent_count = PersistentCount,
rates = #rates { in = AvgIngressRate,
out = AvgEgressRate,
ack_in = AvgAckIngressRate,
@@ -841,16 +872,14 @@ status(#vqstate {
{q3 , ?QUEUE:len(Q3)},
{q4 , ?QUEUE:len(Q4)},
{len , Len},
- {pending_acks , gb_trees:size(RPA) + gb_trees:size(DPA)},
{target_ram_count , TargetRamCount},
- {ram_msg_count , RamMsgCount},
- {ram_ack_count , gb_trees:size(RPA)},
{next_seq_id , NextSeqId},
- {persistent_count , PersistentCount},
{avg_ingress_rate , AvgIngressRate},
{avg_egress_rate , AvgEgressRate},
{avg_ack_ingress_rate, AvgAckIngressRate},
- {avg_ack_egress_rate , AvgAckEgressRate} ].
+ {avg_ack_egress_rate , AvgAckEgressRate} ];
+info(Item, _) ->
+ throw({bad_argument, Item}).
invoke(?MODULE, Fun, State) -> Fun(?MODULE, State);
invoke( _, _, State) -> State.
@@ -863,8 +892,12 @@ is_duplicate(_Msg, State) -> {false, State}.
a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
+ bytes = Bytes,
+ unacked_bytes = UnackedBytes,
persistent_count = PersistentCount,
- ram_msg_count = RamMsgCount }) ->
+ persistent_bytes = PersistentBytes,
+ ram_msg_count = RamMsgCount,
+ ram_bytes = RamBytes}) ->
E1 = ?QUEUE:is_empty(Q1),
E2 = ?QUEUE:is_empty(Q2),
ED = Delta#delta.count == 0,
@@ -878,9 +911,14 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
true = LZ == (E3 and E4),
true = Len >= 0,
+ true = Bytes >= 0,
+ true = UnackedBytes >= 0,
true = PersistentCount >= 0,
+ true = PersistentBytes >= 0,
true = RamMsgCount >= 0,
true = RamMsgCount =< Len,
+ true = RamBytes >= 0,
+ true = RamBytes =< Bytes + UnackedBytes,
State.
@@ -1028,15 +1066,17 @@ expand_delta(_SeqId, #delta { count = Count } = Delta) ->
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-init(IsDurable, IndexState, DeltaCount, Terms,
+init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
- DeltaCount1 =
+ {DeltaCount1, DeltaBytes1} =
case Terms of
- non_clean_shutdown -> DeltaCount;
- _ -> proplists:get_value(persistent_count,
- Terms, DeltaCount)
+ non_clean_shutdown -> {DeltaCount, DeltaBytes};
+ _ -> {proplists:get_value(persistent_count,
+ Terms, DeltaCount),
+ proplists:get_value(persistent_bytes,
+ Terms, DeltaBytes)}
end,
Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
true -> ?BLANK_DELTA;
@@ -1061,11 +1101,15 @@ init(IsDurable, IndexState, DeltaCount, Terms,
len = DeltaCount1,
persistent_count = DeltaCount1,
+ bytes = DeltaBytes1,
+ persistent_bytes = DeltaBytes1,
target_ram_count = infinity,
ram_msg_count = 0,
ram_msg_count_prev = 0,
ram_ack_count_prev = 0,
+ ram_bytes = 0,
+ unacked_bytes = 0,
out_counter = 0,
in_counter = 0,
rates = blank_rates(Now),
@@ -1090,9 +1134,11 @@ in_r(MsgStatus = #msg_status { msg = undefined },
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
- inc_ram_msg_count(
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
- msg = Msg }, Q4a) })
+ upd_ram_bytes(
+ 1, MsgStatus,
+ inc_ram_msg_count(
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
+ msg = Msg }, Q4a) }))
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1123,6 +1169,28 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
State#vqstate{ram_msg_count = RamMsgCount + 1}.
+upd_bytes(SignReady, SignUnacked,
+ MsgStatus = #msg_status{msg = undefined}, State) ->
+ upd_bytes0(SignReady, SignUnacked, MsgStatus, State);
+upd_bytes(SignReady, SignUnacked, MsgStatus = #msg_status{msg = _}, State) ->
+ upd_ram_bytes(SignReady + SignUnacked, MsgStatus,
+ upd_bytes0(SignReady, SignUnacked, MsgStatus, State)).
+
+upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP},
+ State = #vqstate{bytes = Bytes,
+ unacked_bytes = UBytes,
+ persistent_bytes = PBytes}) ->
+ S = msg_size(MsgStatus),
+ SignTotal = SignReady + SignUnacked,
+ State#vqstate{bytes = Bytes + SignReady * S,
+ unacked_bytes = UBytes + SignUnacked * S,
+ persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}.
+
+upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) ->
+ State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}.
+
+msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size.
+
remove(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,
msg_id = MsgId,
@@ -1164,58 +1232,64 @@ remove(AckRequired, MsgStatus = #msg_status {
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
-
+ State2 = case AckRequired of
+ false -> upd_bytes(-1, 0, MsgStatus, State1);
+ true -> upd_bytes(-1, 1, MsgStatus, State1)
+ end,
{AckTag, maybe_update_rates(
- State1 #vqstate {ram_msg_count = RamMsgCount1,
+ State2 #vqstate {ram_msg_count = RamMsgCount1,
out_counter = OutCount + 1,
index_state = IndexState2,
len = Len - 1,
persistent_count = PCount1})}.
-purge_betas_and_deltas(LensByStore,
+purge_betas_and_deltas(Stats,
State = #vqstate { q3 = Q3,
index_state = IndexState,
msg_store_clients = MSCState }) ->
case ?QUEUE:is_empty(Q3) of
- true -> {LensByStore, State};
- false -> {LensByStore1, IndexState1} =
- remove_queue_entries(fun ?QUEUE:foldl/3, Q3,
- LensByStore, IndexState, MSCState),
- purge_betas_and_deltas(LensByStore1,
+ true -> {Stats, State};
+ false -> {Stats1, IndexState1} = remove_queue_entries(
+ Q3, Stats, IndexState, MSCState),
+ purge_betas_and_deltas(Stats1,
maybe_deltas_to_betas(
State #vqstate {
q3 = ?QUEUE:new(),
index_state = IndexState1 }))
end.
-remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) ->
- {MsgIdsByStore, Delivers, Acks} =
- Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q),
+remove_queue_entries(Q, {RamBytes, PCount, PBytes},
+ IndexState, MSCState) ->
+ {MsgIdsByStore, RamBytes1, PBytes1, Delivers, Acks} =
+ ?QUEUE:foldl(fun remove_queue_entries1/2,
+ {orddict:new(), RamBytes, PBytes, [], []}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
- {sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore),
+ {{RamBytes1,
+ PCount - case orddict:find(true, MsgIdsByStore) of
+ error -> 0;
+ {ok, Ids} -> length(Ids)
+ end,
+ PBytes1},
rabbit_queue_index:ack(Acks,
rabbit_queue_index:deliver(Delivers, IndexState))}.
remove_queue_entries1(
- #msg_status { msg_id = MsgId, seq_id = SeqId,
+ #msg_status { msg_id = MsgId, seq_id = SeqId, msg = Msg,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
- {MsgIdsByStore, Delivers, Acks}) ->
+ index_on_disk = IndexOnDisk, is_persistent = IsPersistent,
+ msg_props = #message_properties { size = Size } },
+ {MsgIdsByStore, RamBytes, PBytes, Delivers, Acks}) ->
{case MsgOnDisk of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
+ RamBytes - Size * one_if(Msg =/= undefined),
+ PBytes - Size * one_if(IsPersistent),
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
cons_if(IndexOnDisk, SeqId, Acks)}.
-sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) ->
- orddict:fold(
- fun (IsPersistent, MsgIds, LensByStore1) ->
- orddict:update_counter(IsPersistent, length(MsgIds), LensByStore1)
- end, LensByStore, MsgIdsByStore).
-
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
@@ -1287,8 +1361,15 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
none -> gb_trees:get(SeqId, DPA)
end.
-remove_pending_ack(SeqId, State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA }) ->
+%% First parameter = UpdatePersistentCount
+remove_pending_ack(true, SeqId, State) ->
+ {MsgStatus, State1 = #vqstate { persistent_count = PCount }} =
+ remove_pending_ack(false, SeqId, State),
+ PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent),
+ {MsgStatus, upd_bytes(0, -1, MsgStatus,
+ State1 # vqstate{ persistent_count = PCount1 })};
+remove_pending_ack(false, SeqId, State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
case gb_trees:lookup(SeqId, RPA) of
{value, V} -> RPA1 = gb_trees:delete(SeqId, RPA),
{V, State #vqstate { ram_pending_ack = RPA1 }};
@@ -1338,12 +1419,6 @@ accumulate_ack(#msg_status { seq_id = SeqId,
end,
[MsgId | AllMsgIds]}.
-find_persistent_count(LensByStore) ->
- case orddict:find(true, LensByStore) of
- error -> 0;
- {ok, Len} -> Len
- end.
-
%%----------------------------------------------------------------------------
%% Internal plumbing for confirms (aka publisher acks)
%%----------------------------------------------------------------------------
@@ -1391,9 +1466,15 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
{Msg, State1} = read_msg(MsgStatus, State),
- {MsgStatus#msg_status { msg = Msg }, inc_ram_msg_count(State1)};
+ {MsgStatus#msg_status { msg = Msg },
+ upd_ram_bytes(1, MsgStatus, inc_ram_msg_count(State1))}; %% [1]
publish_alpha(MsgStatus, State) ->
{MsgStatus, inc_ram_msg_count(State)}.
+%% [1] We increase the ram_bytes here because we paged the message in
+%% to requeue it, not purely because we requeued it. Hence in the
+%% second head it's already accounted for as already in memory. OTOH
+%% ram_msg_count does not include unacked messages, so it needs
+%% incrementing in both heads.
publish_beta(MsgStatus, State) ->
{MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
@@ -1419,7 +1500,7 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
{#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
PubFun(MsgStatus, State1),
queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
- Limit, PubFun, State2)
+ Limit, PubFun, upd_bytes(1, -1, MsgStatus, State2))
end;
queue_merge(SeqIds, Q, Front, MsgIds,
_Limit, _PubFun, State) ->
@@ -1433,13 +1514,14 @@ delta_merge(SeqIds, Delta, MsgIds, State) ->
msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
maybe_write_to_disk(true, true, MsgStatus, State1),
- {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2}
+ {expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
+ upd_bytes(1, -1, MsgStatus, State2)}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
msg_from_pending_ack(SeqId, State) ->
{#msg_status { msg_props = MsgProps } = MsgStatus, State1} =
- remove_pending_ack(SeqId, State),
+ remove_pending_ack(false, SeqId, State),
{MsgStatus #msg_status {
msg_props = MsgProps #message_properties { needs_confirming = false } },
State1}.
@@ -1591,8 +1673,10 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
maybe_write_to_disk(true, false, MsgStatus, State),
DPA1 = gb_trees:insert(SeqId, m(trim_msg_status(MsgStatus1)), DPA),
limit_ram_acks(Quota - 1,
- State1 #vqstate { ram_pending_ack = RPA1,
- disk_pending_ack = DPA1 })
+ upd_ram_bytes(
+ -1, MsgStatus1,
+ State1 #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1 }))
end.
permitted_beta_count(#vqstate { len = 0 }) ->
@@ -1728,9 +1812,12 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
State1 = #vqstate { ram_msg_count = RamMsgCount }} =
maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = Consumer(MsgStatus2, Qa,
- State1 #vqstate {
- ram_msg_count = RamMsgCount - 1 }),
+ State2 = Consumer(
+ MsgStatus2, Qa,
+ upd_ram_bytes(
+ -1, MsgStatus2,
+ State1 #vqstate {
+ ram_msg_count = RamMsgCount - 1})),
push_alphas_to_betas(Generator, Consumer, Quota - 1,
Qa, State2)
end
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
index d943b599..3a041508 100644
--- a/src/rabbit_version.erl
+++ b/src/rabbit_version.erl
@@ -114,8 +114,8 @@ upgrades_required(Scope) ->
with_upgrade_graph(Fun, Scope) ->
case rabbit_misc:build_acyclic_graph(
- fun (Module, Steps) -> vertices(Module, Steps, Scope) end,
- fun (Module, Steps) -> edges(Module, Steps, Scope) end,
+ fun ({_App, Module, Steps}) -> vertices(Module, Steps, Scope) end,
+ fun ({_App, Module, Steps}) -> edges(Module, Steps, Scope) end,
rabbit_misc:all_module_attributes(rabbit_upgrade)) of
{ok, G} -> try
Fun(G)
@@ -161,7 +161,7 @@ heads(G) ->
categorise_by_scope(Version) when is_list(Version) ->
Categorised =
- [{Scope, Name} || {_Module, Attributes} <-
+ [{Scope, Name} || {_App, _Module, Attributes} <-
rabbit_misc:all_module_attributes(rabbit_upgrade),
{Name, Scope, _Requires} <- Attributes,
lists:member(Name, Version)],