summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-09-18 15:51:09 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-09-18 15:51:09 +0100
commit527c72cc285638bdd784f8780099b80b6937e3eb (patch)
treefaae41dd8ca19bb5cb371823d24d3c0cd713c3b8
parent6f5870b868366876d9db51a6ae353d9c021e3378 (diff)
parent4ac537045776c03e790946f3ca0e4543fcd63c06 (diff)
downloadrabbitmq-server-527c72cc285638bdd784f8780099b80b6937e3eb.tar.gz
Merge bug24997
-rw-r--r--LICENSE3
-rw-r--r--LICENSE-MIT-Mochi9
-rw-r--r--docs/rabbitmq-server.1.xml3
-rw-r--r--docs/rabbitmqctl.1.xml201
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--packaging/macports/Portfile.in2
-rwxr-xr-xscripts/rabbitmq-server17
-rw-r--r--src/gm.erl33
-rw-r--r--src/mirrored_supervisor.erl2
-rw-r--r--src/mochijson2.erl893
-rw-r--r--src/mochinum.erl358
-rw-r--r--src/rabbit.erl16
-rw-r--r--src/rabbit_amqqueue.erl45
-rw-r--r--src/rabbit_amqqueue_process.erl137
-rw-r--r--src/rabbit_backing_queue.erl16
-rw-r--r--src/rabbit_backing_queue_qc.erl2
-rw-r--r--src/rabbit_binding.erl58
-rw-r--r--src/rabbit_channel.erl12
-rw-r--r--src/rabbit_control_main.erl90
-rw-r--r--src/rabbit_direct.erl28
-rw-r--r--src/rabbit_disk_monitor.erl12
-rw-r--r--src/rabbit_exchange.erl7
-rw-r--r--src/rabbit_heartbeat.erl45
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl44
-rw-r--r--src/rabbit_mirror_queue_master.erl28
-rw-r--r--src/rabbit_mirror_queue_misc.erl4
-rw-r--r--src/rabbit_mirror_queue_slave.erl302
-rw-r--r--src/rabbit_misc.erl53
-rw-r--r--src/rabbit_mnesia.erl1289
-rw-r--r--src/rabbit_msg_store.erl10
-rw-r--r--src/rabbit_net.erl33
-rw-r--r--src/rabbit_networking.erl12
-rw-r--r--src/rabbit_node_monitor.erl253
-rw-r--r--src/rabbit_parameter_validation.erl16
-rw-r--r--src/rabbit_policy.erl60
-rw-r--r--src/rabbit_queue_index.erl8
-rw-r--r--src/rabbit_reader.erl19
-rw-r--r--src/rabbit_runtime_parameter.erl18
-rw-r--r--src/rabbit_runtime_parameters.erl193
-rw-r--r--src/rabbit_runtime_parameters_test.erl18
-rw-r--r--src/rabbit_tests.erl292
-rw-r--r--src/rabbit_types.erl2
-rw-r--r--src/rabbit_upgrade.erl18
-rw-r--r--src/rabbit_variable_queue.erl15
-rw-r--r--src/rabbit_vhost.erl13
-rw-r--r--src/supervisor2.erl135
-rw-r--r--src/supervisor2_tests.erl70
47 files changed, 3413 insertions, 1483 deletions
diff --git a/LICENSE b/LICENSE
index 89640485..9feeceac 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,5 +1,8 @@
This package, the RabbitMQ server is licensed under the MPL. For the
MPL, please see LICENSE-MPL-RabbitMQ.
+The files `mochijson2.erl' and `mochinum.erl' are (c) 2007 Mochi Media, Inc and
+licensed under a MIT license, see LICENSE-MIT-Mochi.
+
If you have any questions regarding licensing, please contact us at
info@rabbitmq.com.
diff --git a/LICENSE-MIT-Mochi b/LICENSE-MIT-Mochi
new file mode 100644
index 00000000..c85b65a4
--- /dev/null
+++ b/LICENSE-MIT-Mochi
@@ -0,0 +1,9 @@
+This is the MIT license.
+
+Copyright (c) 2007 Mochi Media, Inc.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml
index ca63927c..32ae842c 100644
--- a/docs/rabbitmq-server.1.xml
+++ b/docs/rabbitmq-server.1.xml
@@ -109,7 +109,8 @@ Defaults to 5672.
<term>-detached</term>
<listitem>
<para>
- start the server process in the background
+ Start the server process in the background. Note that this will
+ cause the pid not to be written to the pid file.
</para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmq-server -detached</screen>
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 2d25edee..11d85e9e 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -288,105 +288,161 @@
<title>Cluster management</title>
<variablelist>
- <varlistentry id="cluster">
- <term><cmdsynopsis><command>cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
+ <varlistentry id="join_cluster">
+ <term><cmdsynopsis><command>join_cluster</command> <arg choice="req"><replaceable>clusternode</replaceable></arg><arg choice="opt"><replaceable>--ram</replaceable></arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
<term>clusternode</term>
- <listitem><para>Subset of the nodes of the cluster to which this node should be connected.</para></listitem>
+ <listitem><para>Node to cluster with.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><arg choice="opt">--ram</arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ If provided, the node will join the cluster as a RAM node.
+ </para>
+ </listitem>
</varlistentry>
</variablelist>
<para>
- Instruct the node to become member of a cluster with the
- specified nodes. To cluster with currently offline nodes,
- use <link linkend="force_cluster"><command>force_cluster</command></link>.
+ Instruct the node to become a member of the cluster that the
+ specified node is in. Before clustering, the node is reset, so be
+ careful when using this command. For this command to succeed the
+ RabbitMQ application must have been stopped, e.g. with <link
+ linkend="stop_app"><command>stop_app</command></link>.
</para>
<para>
- Cluster nodes can be of two types: disk or ram. Disk nodes
- replicate data in ram and on disk, thus providing
- redundancy in the event of node failure and recovery from
- global events such as power failure across all nodes. Ram
- nodes replicate data in ram only and are mainly used for
- scalability. A cluster must always have at least one disk node.
+ Cluster nodes can be of two types: disk or RAM. Disk nodes
+ replicate data in RAM and on disk, thus providing redundancy in
+ the event of node failure and recovery from global events such
+ as power failure across all nodes. RAM nodes replicate data in
+ RAM only (with the exception of queue contents, which can reside
+ on disc if the queue is persistent or too big to fit in memory)
+ and are mainly used for scalability. RAM nodes are more
+ performant only when managing resources (e.g. adding/removing
+ queues, exchanges, or bindings). A cluster must always have at
+ least one disk node, and usually should have more than one.
</para>
<para>
- If the current node is to become a disk node it needs to
- appear in the cluster node list. Otherwise it becomes a
- ram node. If the node list is empty or only contains the
- current node then the node becomes a standalone,
- i.e. non-clustered, (disk) node.
+ The node will be a disk node by default. If you wish to
+ create a RAM node, provide the <command>--ram</command> flag.
</para>
<para>
After executing the <command>cluster</command> command, whenever
- the RabbitMQ application is started on the current node it
- will attempt to connect to the specified nodes, thus
- becoming an active node in the cluster comprising those
- nodes (and possibly others).
+ the RabbitMQ application is started on the current node it will
+ attempt to connect to the nodes that were in the cluster when the
+ node went down.
</para>
<para>
- The list of nodes does not have to contain all the
- cluster's nodes; a subset is sufficient. Also, clustering
- generally succeeds as long as at least one of the
- specified nodes is active. Hence adjustments to the list
- are only necessary if the cluster configuration is to be
- altered radically.
+ To leave a cluster, <command>reset</command> the node. You can
+ also remove nodes remotely with the
+ <command>forget_cluster_node</command> command.
</para>
<para>
- For this command to succeed the RabbitMQ application must
- have been stopped, e.g. with <link linkend="stop_app"><command>stop_app</command></link>. Furthermore,
- turning a standalone node into a clustered node requires
- the node be <link linkend="reset"><command>reset</command></link> first,
- in order to avoid accidental destruction of data with the
- <command>cluster</command> command.
+ For more details see the <ulink
+ url="http://www.rabbitmq.com/clustering.html">clustering
+ guide</ulink>.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl join_cluster hare@elena --ram</screen>
+ <para role="example">
+ This command instructs the RabbitMQ node to join the cluster that
+ <command>hare@elena</command> is part of, as a ram node.
</para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>cluster_status</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Displays all the nodes in the cluster grouped by node type,
+ together with the currently running nodes.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl cluster_status</screen>
+ <para role="example">
+ This command displays the nodes in the cluster.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>change_cluster_node_type</command> <arg choice="req">disk | ram</arg></cmdsynopsis>
+ </term>
+ <listitem>
<para>
- For more details see the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink>.
+ Changes the type of the cluster node. The node must be stopped for
+ this operation to succeed, and when turning a node into a RAM node
+ the node must not be the only disk node in the cluster.
</para>
<para role="example-prefix">For example:</para>
- <screen role="example">rabbitmqctl cluster rabbit@tanto hare@elena</screen>
+ <screen role="example">rabbitmqctl change_cluster_node_type disk</screen>
<para role="example">
- This command instructs the RabbitMQ node to join the
- cluster with nodes <command>rabbit@tanto</command> and
- <command>hare@elena</command>. If the node is one of these then
- it becomes a disk node, otherwise a ram node.
+ This command will turn a RAM node into a disk node.
</para>
</listitem>
</varlistentry>
- <varlistentry id="force_cluster">
- <term><cmdsynopsis><command>force_cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
+ <varlistentry>
+ <term><cmdsynopsis><command>forget_cluster_node</command> <arg choice="opt">--offline</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
- <term>clusternode</term>
- <listitem><para>Subset of the nodes of the cluster to which this node should be connected.</para></listitem>
+ <term><cmdsynopsis><arg choice="opt">--offline</arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Enables node removal from an offline node. This is only
+ useful in the situation where all the nodes are offline and
+ the last node to go down cannot be brought online, thus
+ preventing the whole cluster from starting. It should not be
+ used in any other circumstances since it can lead to
+ inconsistencies.
+ </para>
+ </listitem>
</varlistentry>
</variablelist>
<para>
- Instruct the node to become member of a cluster with the
- specified nodes. This will succeed even if the specified nodes
- are offline. For a more detailed description, see
- <link linkend="cluster"><command>cluster</command>.</link>
+ Removes a cluster node remotely. The node that is being removed
+ must be offline, while the node we are removing from must be
+ online, except when using the <command>--offline</command> flag.
</para>
- <para>
- Note that this variant of the cluster command just
- ignores the current status of the specified nodes.
- Clustering may still fail for a variety of other
- reasons.
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl -n hare@mcnulty forget_cluster_node rabbit@stringer</screen>
+ <para role="example">
+ This command will remove the node
+ <command>rabbit@stringer</command> from the node
+ <command>hare@mcnulty</command>.
</para>
</listitem>
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>cluster_status</command></cmdsynopsis></term>
+ <term><cmdsynopsis><command>update_cluster_nodes</command> <arg choice="req">clusternode</arg></cmdsynopsis>
+ </term>
<listitem>
+ <variablelist>
+ <varlistentry>
+ <term>clusternode</term>
+ <listitem>
+ <para>
+ The node to consult for up to date information.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
<para>
- Displays all the nodes in the cluster grouped by node type,
- together with the currently running nodes.
+ Instructs an already clustered node to contact
+ <command>clusternode</command> to cluster when waking up. This is
+ different from <command>join_cluster</command> since it does not
+ join any cluster - it checks that the node is already in a cluster
+ with <command>clusternode</command>.
</para>
- <para role="example-prefix">For example:</para>
- <screen role="example">rabbitmqctl cluster_status</screen>
- <para role="example">
- This command displays the nodes in the cluster.
+ <para>
+ The need for this command is motivated by the fact that clusters
+ can change while a node is offline. Consider the situation in
+ which node A and B are clustered. A goes down, C clusters with B,
+ and then B leaves the cluster. When A wakes up, it'll try to
+ contact B, but this will fail since B is not in the cluster
+ anymore. <command>update_cluster_nodes -n A C</command> will solve
+ this situation.
</para>
</listitem>
</varlistentry>
@@ -581,7 +637,7 @@
</para>
<para>
Deleting a virtual host deletes all its exchanges,
- queues, user mappings and associated permissions.
+ queues, bindings, user permissions and parameters.
</para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl delete_vhost test</screen>
@@ -750,15 +806,16 @@
Certain features of RabbitMQ (such as the federation plugin)
are controlled by dynamic,
cluster-wide <emphasis>parameters</emphasis>. Each parameter
- consists of a component name, a key and a value. The
- component name and key are strings, and the value is an
- Erlang term. Parameters can be set, cleared and listed. In
- general you should refer to the documentation for the feature
- in question to see how to set parameters.
+ consists of a component name, a key and a value, and is
+ associated with a virtual host. The component name and key are
+ strings, and the value is an Erlang term. Parameters can be
+ set, cleared and listed. In general you should refer to the
+ documentation for the feature in question to see how to set
+ parameters.
</para>
<variablelist>
<varlistentry>
- <term><cmdsynopsis><command>set_parameter</command> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>set_parameter</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term>
<listitem>
<para>
Sets a parameter.
@@ -789,12 +846,12 @@
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl set_parameter federation local_username '&lt;&lt;"guest">>'</screen>
<para role="example">
- This command sets the parameter <command>local_username</command> for the <command>federation</command> component to the Erlang term <command>&lt;&lt;"guest">></command>.
+ This command sets the parameter <command>local_username</command> for the <command>federation</command> component in the default virtual host to the Erlang term <command>&lt;&lt;"guest">></command>.
</para>
</listitem>
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>clear_parameter</command> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>clear_parameter</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg></cmdsynopsis></term>
<listitem>
<para>
Clears a parameter.
@@ -817,20 +874,20 @@
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl clear_parameter federation local_username</screen>
<para role="example">
- This command clears the parameter <command>local_username</command> for the <command>federation</command> component.
+ This command clears the parameter <command>local_username</command> for the <command>federation</command> component in the default virtual host.
</para>
</listitem>
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>list_parameters</command></cmdsynopsis></term>
+ <term><cmdsynopsis><command>list_parameters</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg></cmdsynopsis></term>
<listitem>
<para>
- Lists all parameters.
+ Lists all parameters for a virtual host.
</para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl list_parameters</screen>
<para role="example">
- This command lists all parameters.
+ This command lists all parameters in the default virtual host.
</para>
</listitem>
</varlistentry>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 087c62a9..78842281 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -33,7 +33,7 @@
{default_user_tags, [administrator]},
{default_vhost, <<"/">>},
{default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
- {cluster_nodes, []},
+ {cluster_nodes, {[], true}},
{server_properties, []},
{collect_statistics, none},
{collect_statistics_interval, 5000},
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index e461e49e..82c1fb0c 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -59,7 +59,7 @@ set mandest ${destroot}${prefix}/share/man
use_configure no
-use_parallel_build yes
+use_parallel_build no
build.env-append HOME=${workpath}
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 34915b3d..e1686627 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -65,9 +65,20 @@ case "$(uname -s)" in
CYGWIN*) # we make no attempt to record the cygwin pid; rabbitmqctl wait
# will not be able to make sense of it anyway
;;
- *) mkdir -p $(dirname ${RABBITMQ_PID_FILE});
- echo $$ > ${RABBITMQ_PID_FILE}
- ;;
+ *) # When -detached is passed, we don't write the pid, since it'd be the
+ # wrong one
+ detached=""
+ for opt in "$@"; do
+ if [ "$opt" = "-detached" ]; then
+ detached="true"
+ fi
+ done
+ if [ $detached ]; then
+ echo "Warning: PID file not written; -detached was passed." 1>&2
+ else
+ mkdir -p $(dirname ${RABBITMQ_PID_FILE});
+ echo $$ > ${RABBITMQ_PID_FILE}
+ fi
esac
RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin"
diff --git a/src/gm.erl b/src/gm.erl
index f88ed18f..90433e84 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -77,9 +77,13 @@
%% confirmed_broadcast/2 directly from the callback module otherwise
%% you will deadlock the entire group.
%%
-%% group_members/1
-%% Provide the Pid. Returns a list of the current group members.
+%% info/1
+%% Provide the Pid. Returns a proplist with various facts, including
+%% the group name and the current group members.
%%
+%% forget_group/1
+%% Provide the group name. Removes its mnesia record. Makes no attempt
+%% to ensure the group is empty.
%%
%% Implementation Overview
%% -----------------------
@@ -373,7 +377,7 @@
-behaviour(gen_server2).
-export([create_tables/0, start_link/3, leave/1, broadcast/2,
- confirmed_broadcast/2, group_members/1]).
+ confirmed_broadcast/2, info/1, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_info/2]).
@@ -431,7 +435,8 @@
-spec(leave/1 :: (pid()) -> 'ok').
-spec(broadcast/2 :: (pid(), any()) -> 'ok').
-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
--spec(group_members/1 :: (pid()) -> [pid()]).
+-spec(info/1 :: (pid()) -> rabbit_types:infos()).
+-spec(forget_group/1 :: (group_name()) -> 'ok').
%% The joined, members_changed and handle_msg callbacks can all return
%% any of the following terms:
@@ -514,9 +519,15 @@ broadcast(Server, Msg) ->
confirmed_broadcast(Server, Msg) ->
gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
-group_members(Server) ->
- gen_server2:call(Server, group_members, infinity).
+info(Server) ->
+ gen_server2:call(Server, info, infinity).
+forget_group(GroupName) ->
+ {atomic, ok} = mnesia:sync_transaction(
+ fun () ->
+ mnesia:delete({?GROUP_TABLE, GroupName})
+ end),
+ ok.
init([GroupName, Module, Args]) ->
{MegaSecs, Secs, MicroSecs} = now(),
@@ -553,12 +564,16 @@ handle_call({confirmed_broadcast, Msg}, _From,
handle_call({confirmed_broadcast, Msg}, From, State) ->
internal_broadcast(Msg, From, State);
-handle_call(group_members, _From,
+handle_call(info, _From,
State = #state { members_state = undefined }) ->
reply(not_joined, State);
-handle_call(group_members, _From, State = #state { view = View }) ->
- reply(get_pids(alive_view_members(View)), State);
+handle_call(info, _From, State = #state { group_name = GroupName,
+ module = Module,
+ view = View }) ->
+ reply([{group_name, GroupName},
+ {module, Module},
+ {group_members, get_pids(alive_view_members(View))}], State);
handle_call({add_on_right, _NewMember}, _From,
State = #state { members_state = undefined }) ->
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 4fc488b8..24c3ebd0 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -174,7 +174,7 @@
-spec start_internal(Group, ChildSpecs) -> Result when
Group :: group_name(),
ChildSpecs :: [supervisor2:child_spec()],
- Result :: supervisor2:startlink_ret().
+ Result :: {'ok', pid()} | {'error', term()}.
-spec create_tables() -> Result when
Result :: 'ok'.
diff --git a/src/mochijson2.erl b/src/mochijson2.erl
new file mode 100644
index 00000000..bddb52cc
--- /dev/null
+++ b/src/mochijson2.erl
@@ -0,0 +1,893 @@
+%% This file is a copy of `mochijson2.erl' from mochiweb, revision
+%% d541e9a0f36c00dcadc2e589f20e47fbf46fc76f. For the license, see
+%% `LICENSE-MIT-Mochi'.
+
+%% @author Bob Ippolito <bob@mochimedia.com>
+%% @copyright 2007 Mochi Media, Inc.
+
+%% @doc Yet another JSON (RFC 4627) library for Erlang. mochijson2 works
+%% with binaries as strings, arrays as lists (without an {array, _})
+%% wrapper and it only knows how to decode UTF-8 (and ASCII).
+%%
+%% JSON terms are decoded as follows (javascript -> erlang):
+%% <ul>
+%% <li>{"key": "value"} ->
+%% {struct, [{&lt;&lt;"key">>, &lt;&lt;"value">>}]}</li>
+%% <li>["array", 123, 12.34, true, false, null] ->
+%% [&lt;&lt;"array">>, 123, 12.34, true, false, null]
+%% </li>
+%% </ul>
+%% <ul>
+%% <li>Strings in JSON decode to UTF-8 binaries in Erlang</li>
+%% <li>Objects decode to {struct, PropList}</li>
+%% <li>Numbers decode to integer or float</li>
+%% <li>true, false, null decode to their respective terms.</li>
+%% </ul>
+%% The encoder will accept the same format that the decoder will produce,
+%% but will also allow additional cases for leniency:
+%% <ul>
+%% <li>atoms other than true, false, null will be considered UTF-8
+%% strings (even as a proplist key)
+%% </li>
+%% <li>{json, IoList} will insert IoList directly into the output
+%% with no validation
+%% </li>
+%% <li>{array, Array} will be encoded as Array
+%% (legacy mochijson style)
+%% </li>
+%% <li>A non-empty raw proplist will be encoded as an object as long
+%% as the first pair does not have an atom key of json, struct,
+%% or array
+%% </li>
+%% </ul>
+
+-module(mochijson2).
+-author('bob@mochimedia.com').
+-export([encoder/1, encode/1]).
+-export([decoder/1, decode/1, decode/2]).
+
+%% This is a macro to placate syntax highlighters..
+-define(Q, $\").
+-define(ADV_COL(S, N), S#decoder{offset=N+S#decoder.offset,
+ column=N+S#decoder.column}).
+-define(INC_COL(S), S#decoder{offset=1+S#decoder.offset,
+ column=1+S#decoder.column}).
+-define(INC_LINE(S), S#decoder{offset=1+S#decoder.offset,
+ column=1,
+ line=1+S#decoder.line}).
+-define(INC_CHAR(S, C),
+ case C of
+ $\n ->
+ S#decoder{column=1,
+ line=1+S#decoder.line,
+ offset=1+S#decoder.offset};
+ _ ->
+ S#decoder{column=1+S#decoder.column,
+ offset=1+S#decoder.offset}
+ end).
+-define(IS_WHITESPACE(C),
+ (C =:= $\s orelse C =:= $\t orelse C =:= $\r orelse C =:= $\n)).
+
+%% @type json_string() = atom | binary()
+%% @type json_number() = integer() | float()
+%% @type json_array() = [json_term()]
+%% @type json_object() = {struct, [{json_string(), json_term()}]}
+%% @type json_eep18_object() = {[{json_string(), json_term()}]}
+%% @type json_iolist() = {json, iolist()}
+%% @type json_term() = json_string() | json_number() | json_array() |
+%% json_object() | json_eep18_object() | json_iolist()
+
+-record(encoder, {handler=null,
+ utf8=false}).
+
+-record(decoder, {object_hook=null,
+ offset=0,
+ line=1,
+ column=1,
+ state=null}).
+
+%% @spec encoder([encoder_option()]) -> function()
+%% @doc Create an encoder/1 with the given options.
+%% @type encoder_option() = handler_option() | utf8_option()
+%% @type utf8_option() = boolean(). Emit unicode as utf8 (default - false)
+encoder(Options) ->
+ State = parse_encoder_options(Options, #encoder{}),
+ fun (O) -> json_encode(O, State) end.
+
+%% @spec encode(json_term()) -> iolist()
+%% @doc Encode the given as JSON to an iolist.
+encode(Any) ->
+ json_encode(Any, #encoder{}).
+
+%% @spec decoder([decoder_option()]) -> function()
+%% @doc Create a decoder/1 with the given options.
+decoder(Options) ->
+ State = parse_decoder_options(Options, #decoder{}),
+ fun (O) -> json_decode(O, State) end.
+
+%% @spec decode(iolist(), [{format, proplist | eep18 | struct}]) -> json_term()
+%% @doc Decode the given iolist to Erlang terms using the given object format
+%% for decoding, where proplist returns JSON objects as [{binary(), json_term()}]
+%% proplists, eep18 returns JSON objects as {[binary(), json_term()]}, and struct
+%% returns them as-is.
+decode(S, Options) ->
+ json_decode(S, parse_decoder_options(Options, #decoder{})).
+
+%% @spec decode(iolist()) -> json_term()
+%% @doc Decode the given iolist to Erlang terms.
+decode(S) ->
+ json_decode(S, #decoder{}).
+
+%% Internal API
+
+parse_encoder_options([], State) ->
+ State;
+parse_encoder_options([{handler, Handler} | Rest], State) ->
+ parse_encoder_options(Rest, State#encoder{handler=Handler});
+parse_encoder_options([{utf8, Switch} | Rest], State) ->
+ parse_encoder_options(Rest, State#encoder{utf8=Switch}).
+
+parse_decoder_options([], State) ->
+ State;
+parse_decoder_options([{object_hook, Hook} | Rest], State) ->
+ parse_decoder_options(Rest, State#decoder{object_hook=Hook});
+parse_decoder_options([{format, Format} | Rest], State)
+ when Format =:= struct orelse Format =:= eep18 orelse Format =:= proplist ->
+ parse_decoder_options(Rest, State#decoder{object_hook=Format}).
+
+json_encode(true, _State) ->
+ <<"true">>;
+json_encode(false, _State) ->
+ <<"false">>;
+json_encode(null, _State) ->
+ <<"null">>;
+json_encode(I, _State) when is_integer(I) ->
+ integer_to_list(I);
+json_encode(F, _State) when is_float(F) ->
+ mochinum:digits(F);
+json_encode(S, State) when is_binary(S); is_atom(S) ->
+ json_encode_string(S, State);
+json_encode([{K, _}|_] = Props, State) when (K =/= struct andalso
+ K =/= array andalso
+ K =/= json) ->
+ json_encode_proplist(Props, State);
+json_encode({struct, Props}, State) when is_list(Props) ->
+ json_encode_proplist(Props, State);
+json_encode({Props}, State) when is_list(Props) ->
+ json_encode_proplist(Props, State);
+json_encode({}, State) ->
+ json_encode_proplist([], State);
+json_encode(Array, State) when is_list(Array) ->
+ json_encode_array(Array, State);
+json_encode({array, Array}, State) when is_list(Array) ->
+ json_encode_array(Array, State);
+json_encode({json, IoList}, _State) ->
+ IoList;
+json_encode(Bad, #encoder{handler=null}) ->
+ exit({json_encode, {bad_term, Bad}});
+json_encode(Bad, State=#encoder{handler=Handler}) ->
+ json_encode(Handler(Bad), State).
+
+json_encode_array([], _State) ->
+ <<"[]">>;
+json_encode_array(L, State) ->
+ F = fun (O, Acc) ->
+ [$,, json_encode(O, State) | Acc]
+ end,
+ [$, | Acc1] = lists:foldl(F, "[", L),
+ lists:reverse([$\] | Acc1]).
+
+json_encode_proplist([], _State) ->
+ <<"{}">>;
+json_encode_proplist(Props, State) ->
+ F = fun ({K, V}, Acc) ->
+ KS = json_encode_string(K, State),
+ VS = json_encode(V, State),
+ [$,, VS, $:, KS | Acc]
+ end,
+ [$, | Acc1] = lists:foldl(F, "{", Props),
+ lists:reverse([$\} | Acc1]).
+
+json_encode_string(A, State) when is_atom(A) ->
+ L = atom_to_list(A),
+ case json_string_is_safe(L) of
+ true ->
+ [?Q, L, ?Q];
+ false ->
+ json_encode_string_unicode(xmerl_ucs:from_utf8(L), State, [?Q])
+ end;
+json_encode_string(B, State) when is_binary(B) ->
+ case json_bin_is_safe(B) of
+ true ->
+ [?Q, B, ?Q];
+ false ->
+ json_encode_string_unicode(xmerl_ucs:from_utf8(B), State, [?Q])
+ end;
+json_encode_string(I, _State) when is_integer(I) ->
+ [?Q, integer_to_list(I), ?Q];
+json_encode_string(L, State) when is_list(L) ->
+ case json_string_is_safe(L) of
+ true ->
+ [?Q, L, ?Q];
+ false ->
+ json_encode_string_unicode(L, State, [?Q])
+ end.
+
+json_string_is_safe([]) ->
+ true;
+json_string_is_safe([C | Rest]) ->
+ case C of
+ ?Q ->
+ false;
+ $\\ ->
+ false;
+ $\b ->
+ false;
+ $\f ->
+ false;
+ $\n ->
+ false;
+ $\r ->
+ false;
+ $\t ->
+ false;
+ C when C >= 0, C < $\s; C >= 16#7f, C =< 16#10FFFF ->
+ false;
+ C when C < 16#7f ->
+ json_string_is_safe(Rest);
+ _ ->
+ false
+ end.
+
+json_bin_is_safe(<<>>) ->
+ true;
+json_bin_is_safe(<<C, Rest/binary>>) ->
+ case C of
+ ?Q ->
+ false;
+ $\\ ->
+ false;
+ $\b ->
+ false;
+ $\f ->
+ false;
+ $\n ->
+ false;
+ $\r ->
+ false;
+ $\t ->
+ false;
+ C when C >= 0, C < $\s; C >= 16#7f ->
+ false;
+ C when C < 16#7f ->
+ json_bin_is_safe(Rest)
+ end.
+
+json_encode_string_unicode([], _State, Acc) ->
+ lists:reverse([$\" | Acc]);
+json_encode_string_unicode([C | Cs], State, Acc) ->
+ Acc1 = case C of
+ ?Q ->
+ [?Q, $\\ | Acc];
+ %% Escaping solidus is only useful when trying to protect
+ %% against "</script>" injection attacks which are only
+ %% possible when JSON is inserted into a HTML document
+ %% in-line. mochijson2 does not protect you from this, so
+ %% if you do insert directly into HTML then you need to
+ %% uncomment the following case or escape the output of encode.
+ %%
+ %% $/ ->
+ %% [$/, $\\ | Acc];
+ %%
+ $\\ ->
+ [$\\, $\\ | Acc];
+ $\b ->
+ [$b, $\\ | Acc];
+ $\f ->
+ [$f, $\\ | Acc];
+ $\n ->
+ [$n, $\\ | Acc];
+ $\r ->
+ [$r, $\\ | Acc];
+ $\t ->
+ [$t, $\\ | Acc];
+ C when C >= 0, C < $\s ->
+ [unihex(C) | Acc];
+ C when C >= 16#7f, C =< 16#10FFFF, State#encoder.utf8 ->
+ [xmerl_ucs:to_utf8(C) | Acc];
+ C when C >= 16#7f, C =< 16#10FFFF, not State#encoder.utf8 ->
+ [unihex(C) | Acc];
+ C when C < 16#7f ->
+ [C | Acc];
+ _ ->
+ exit({json_encode, {bad_char, C}})
+ end,
+ json_encode_string_unicode(Cs, State, Acc1).
+
+hexdigit(C) when C >= 0, C =< 9 ->
+ C + $0;
+hexdigit(C) when C =< 15 ->
+ C + $a - 10.
+
+unihex(C) when C < 16#10000 ->
+ <<D3:4, D2:4, D1:4, D0:4>> = <<C:16>>,
+ Digits = [hexdigit(D) || D <- [D3, D2, D1, D0]],
+ [$\\, $u | Digits];
+unihex(C) when C =< 16#10FFFF ->
+ N = C - 16#10000,
+ S1 = 16#d800 bor ((N bsr 10) band 16#3ff),
+ S2 = 16#dc00 bor (N band 16#3ff),
+ [unihex(S1), unihex(S2)].
+
+json_decode(L, S) when is_list(L) ->
+ json_decode(iolist_to_binary(L), S);
+json_decode(B, S) ->
+ {Res, S1} = decode1(B, S),
+ {eof, _} = tokenize(B, S1#decoder{state=trim}),
+ Res.
+
+decode1(B, S=#decoder{state=null}) ->
+ case tokenize(B, S#decoder{state=any}) of
+ {{const, C}, S1} ->
+ {C, S1};
+ {start_array, S1} ->
+ decode_array(B, S1);
+ {start_object, S1} ->
+ decode_object(B, S1)
+ end.
+
+make_object(V, #decoder{object_hook=N}) when N =:= null orelse N =:= struct ->
+ V;
+make_object({struct, P}, #decoder{object_hook=eep18}) ->
+ {P};
+make_object({struct, P}, #decoder{object_hook=proplist}) ->
+ P;
+make_object(V, #decoder{object_hook=Hook}) ->
+ Hook(V).
+
+decode_object(B, S) ->
+ decode_object(B, S#decoder{state=key}, []).
+
+decode_object(B, S=#decoder{state=key}, Acc) ->
+ case tokenize(B, S) of
+ {end_object, S1} ->
+ V = make_object({struct, lists:reverse(Acc)}, S1),
+ {V, S1#decoder{state=null}};
+ {{const, K}, S1} ->
+ {colon, S2} = tokenize(B, S1),
+ {V, S3} = decode1(B, S2#decoder{state=null}),
+ decode_object(B, S3#decoder{state=comma}, [{K, V} | Acc])
+ end;
+decode_object(B, S=#decoder{state=comma}, Acc) ->
+ case tokenize(B, S) of
+ {end_object, S1} ->
+ V = make_object({struct, lists:reverse(Acc)}, S1),
+ {V, S1#decoder{state=null}};
+ {comma, S1} ->
+ decode_object(B, S1#decoder{state=key}, Acc)
+ end.
+
+decode_array(B, S) ->
+ decode_array(B, S#decoder{state=any}, []).
+
+decode_array(B, S=#decoder{state=any}, Acc) ->
+ case tokenize(B, S) of
+ {end_array, S1} ->
+ {lists:reverse(Acc), S1#decoder{state=null}};
+ {start_array, S1} ->
+ {Array, S2} = decode_array(B, S1),
+ decode_array(B, S2#decoder{state=comma}, [Array | Acc]);
+ {start_object, S1} ->
+ {Array, S2} = decode_object(B, S1),
+ decode_array(B, S2#decoder{state=comma}, [Array | Acc]);
+ {{const, Const}, S1} ->
+ decode_array(B, S1#decoder{state=comma}, [Const | Acc])
+ end;
+decode_array(B, S=#decoder{state=comma}, Acc) ->
+ case tokenize(B, S) of
+ {end_array, S1} ->
+ {lists:reverse(Acc), S1#decoder{state=null}};
+ {comma, S1} ->
+ decode_array(B, S1#decoder{state=any}, Acc)
+ end.
+
+tokenize_string(B, S=#decoder{offset=O}) ->
+ case tokenize_string_fast(B, O) of
+ {escape, O1} ->
+ Length = O1 - O,
+ S1 = ?ADV_COL(S, Length),
+ <<_:O/binary, Head:Length/binary, _/binary>> = B,
+ tokenize_string(B, S1, lists:reverse(binary_to_list(Head)));
+ O1 ->
+ Length = O1 - O,
+ <<_:O/binary, String:Length/binary, ?Q, _/binary>> = B,
+ {{const, String}, ?ADV_COL(S, Length + 1)}
+ end.
+
+tokenize_string_fast(B, O) ->
+ case B of
+ <<_:O/binary, ?Q, _/binary>> ->
+ O;
+ <<_:O/binary, $\\, _/binary>> ->
+ {escape, O};
+ <<_:O/binary, C1, _/binary>> when C1 < 128 ->
+ tokenize_string_fast(B, 1 + O);
+ <<_:O/binary, C1, C2, _/binary>> when C1 >= 194, C1 =< 223,
+ C2 >= 128, C2 =< 191 ->
+ tokenize_string_fast(B, 2 + O);
+ <<_:O/binary, C1, C2, C3, _/binary>> when C1 >= 224, C1 =< 239,
+ C2 >= 128, C2 =< 191,
+ C3 >= 128, C3 =< 191 ->
+ tokenize_string_fast(B, 3 + O);
+ <<_:O/binary, C1, C2, C3, C4, _/binary>> when C1 >= 240, C1 =< 244,
+ C2 >= 128, C2 =< 191,
+ C3 >= 128, C3 =< 191,
+ C4 >= 128, C4 =< 191 ->
+ tokenize_string_fast(B, 4 + O);
+ _ ->
+ throw(invalid_utf8)
+ end.
+
+tokenize_string(B, S=#decoder{offset=O}, Acc) ->
+ case B of
+ <<_:O/binary, ?Q, _/binary>> ->
+ {{const, iolist_to_binary(lists:reverse(Acc))}, ?INC_COL(S)};
+ <<_:O/binary, "\\\"", _/binary>> ->
+ tokenize_string(B, ?ADV_COL(S, 2), [$\" | Acc]);
+ <<_:O/binary, "\\\\", _/binary>> ->
+ tokenize_string(B, ?ADV_COL(S, 2), [$\\ | Acc]);
+ <<_:O/binary, "\\/", _/binary>> ->
+ tokenize_string(B, ?ADV_COL(S, 2), [$/ | Acc]);
+ <<_:O/binary, "\\b", _/binary>> ->
+ tokenize_string(B, ?ADV_COL(S, 2), [$\b | Acc]);
+ <<_:O/binary, "\\f", _/binary>> ->
+ tokenize_string(B, ?ADV_COL(S, 2), [$\f | Acc]);
+ <<_:O/binary, "\\n", _/binary>> ->
+ tokenize_string(B, ?ADV_COL(S, 2), [$\n | Acc]);
+ <<_:O/binary, "\\r", _/binary>> ->
+ tokenize_string(B, ?ADV_COL(S, 2), [$\r | Acc]);
+ <<_:O/binary, "\\t", _/binary>> ->
+ tokenize_string(B, ?ADV_COL(S, 2), [$\t | Acc]);
+ <<_:O/binary, "\\u", C3, C2, C1, C0, Rest/binary>> ->
+ C = erlang:list_to_integer([C3, C2, C1, C0], 16),
+ if C > 16#D7FF, C < 16#DC00 ->
+ %% coalesce UTF-16 surrogate pair
+ <<"\\u", D3, D2, D1, D0, _/binary>> = Rest,
+ D = erlang:list_to_integer([D3,D2,D1,D0], 16),
+ [CodePoint] = xmerl_ucs:from_utf16be(<<C:16/big-unsigned-integer,
+ D:16/big-unsigned-integer>>),
+ Acc1 = lists:reverse(xmerl_ucs:to_utf8(CodePoint), Acc),
+ tokenize_string(B, ?ADV_COL(S, 12), Acc1);
+ true ->
+ Acc1 = lists:reverse(xmerl_ucs:to_utf8(C), Acc),
+ tokenize_string(B, ?ADV_COL(S, 6), Acc1)
+ end;
+ <<_:O/binary, C1, _/binary>> when C1 < 128 ->
+ tokenize_string(B, ?INC_CHAR(S, C1), [C1 | Acc]);
+ <<_:O/binary, C1, C2, _/binary>> when C1 >= 194, C1 =< 223,
+ C2 >= 128, C2 =< 191 ->
+ tokenize_string(B, ?ADV_COL(S, 2), [C2, C1 | Acc]);
+ <<_:O/binary, C1, C2, C3, _/binary>> when C1 >= 224, C1 =< 239,
+ C2 >= 128, C2 =< 191,
+ C3 >= 128, C3 =< 191 ->
+ tokenize_string(B, ?ADV_COL(S, 3), [C3, C2, C1 | Acc]);
+ <<_:O/binary, C1, C2, C3, C4, _/binary>> when C1 >= 240, C1 =< 244,
+ C2 >= 128, C2 =< 191,
+ C3 >= 128, C3 =< 191,
+ C4 >= 128, C4 =< 191 ->
+ tokenize_string(B, ?ADV_COL(S, 4), [C4, C3, C2, C1 | Acc]);
+ _ ->
+ throw(invalid_utf8)
+ end.
+
+tokenize_number(B, S) ->
+ case tokenize_number(B, sign, S, []) of
+ {{int, Int}, S1} ->
+ {{const, list_to_integer(Int)}, S1};
+ {{float, Float}, S1} ->
+ {{const, list_to_float(Float)}, S1}
+ end.
+
+tokenize_number(B, sign, S=#decoder{offset=O}, []) ->
+ case B of
+ <<_:O/binary, $-, _/binary>> ->
+ tokenize_number(B, int, ?INC_COL(S), [$-]);
+ _ ->
+ tokenize_number(B, int, S, [])
+ end;
+tokenize_number(B, int, S=#decoder{offset=O}, Acc) ->
+ case B of
+ <<_:O/binary, $0, _/binary>> ->
+ tokenize_number(B, frac, ?INC_COL(S), [$0 | Acc]);
+ <<_:O/binary, C, _/binary>> when C >= $1 andalso C =< $9 ->
+ tokenize_number(B, int1, ?INC_COL(S), [C | Acc])
+ end;
+tokenize_number(B, int1, S=#decoder{offset=O}, Acc) ->
+ case B of
+ <<_:O/binary, C, _/binary>> when C >= $0 andalso C =< $9 ->
+ tokenize_number(B, int1, ?INC_COL(S), [C | Acc]);
+ _ ->
+ tokenize_number(B, frac, S, Acc)
+ end;
+tokenize_number(B, frac, S=#decoder{offset=O}, Acc) ->
+ case B of
+ <<_:O/binary, $., C, _/binary>> when C >= $0, C =< $9 ->
+ tokenize_number(B, frac1, ?ADV_COL(S, 2), [C, $. | Acc]);
+ <<_:O/binary, E, _/binary>> when E =:= $e orelse E =:= $E ->
+ tokenize_number(B, esign, ?INC_COL(S), [$e, $0, $. | Acc]);
+ _ ->
+ {{int, lists:reverse(Acc)}, S}
+ end;
+tokenize_number(B, frac1, S=#decoder{offset=O}, Acc) ->
+ case B of
+ <<_:O/binary, C, _/binary>> when C >= $0 andalso C =< $9 ->
+ tokenize_number(B, frac1, ?INC_COL(S), [C | Acc]);
+ <<_:O/binary, E, _/binary>> when E =:= $e orelse E =:= $E ->
+ tokenize_number(B, esign, ?INC_COL(S), [$e | Acc]);
+ _ ->
+ {{float, lists:reverse(Acc)}, S}
+ end;
+tokenize_number(B, esign, S=#decoder{offset=O}, Acc) ->
+ case B of
+ <<_:O/binary, C, _/binary>> when C =:= $- orelse C=:= $+ ->
+ tokenize_number(B, eint, ?INC_COL(S), [C | Acc]);
+ _ ->
+ tokenize_number(B, eint, S, Acc)
+ end;
+tokenize_number(B, eint, S=#decoder{offset=O}, Acc) ->
+ case B of
+ <<_:O/binary, C, _/binary>> when C >= $0 andalso C =< $9 ->
+ tokenize_number(B, eint1, ?INC_COL(S), [C | Acc])
+ end;
+tokenize_number(B, eint1, S=#decoder{offset=O}, Acc) ->
+ case B of
+ <<_:O/binary, C, _/binary>> when C >= $0 andalso C =< $9 ->
+ tokenize_number(B, eint1, ?INC_COL(S), [C | Acc]);
+ _ ->
+ {{float, lists:reverse(Acc)}, S}
+ end.
+
+tokenize(B, S=#decoder{offset=O}) ->
+ case B of
+ <<_:O/binary, C, _/binary>> when ?IS_WHITESPACE(C) ->
+ tokenize(B, ?INC_CHAR(S, C));
+ <<_:O/binary, "{", _/binary>> ->
+ {start_object, ?INC_COL(S)};
+ <<_:O/binary, "}", _/binary>> ->
+ {end_object, ?INC_COL(S)};
+ <<_:O/binary, "[", _/binary>> ->
+ {start_array, ?INC_COL(S)};
+ <<_:O/binary, "]", _/binary>> ->
+ {end_array, ?INC_COL(S)};
+ <<_:O/binary, ",", _/binary>> ->
+ {comma, ?INC_COL(S)};
+ <<_:O/binary, ":", _/binary>> ->
+ {colon, ?INC_COL(S)};
+ <<_:O/binary, "null", _/binary>> ->
+ {{const, null}, ?ADV_COL(S, 4)};
+ <<_:O/binary, "true", _/binary>> ->
+ {{const, true}, ?ADV_COL(S, 4)};
+ <<_:O/binary, "false", _/binary>> ->
+ {{const, false}, ?ADV_COL(S, 5)};
+ <<_:O/binary, "\"", _/binary>> ->
+ tokenize_string(B, ?INC_COL(S));
+ <<_:O/binary, C, _/binary>> when (C >= $0 andalso C =< $9)
+ orelse C =:= $- ->
+ tokenize_number(B, S);
+ <<_:O/binary>> ->
+ trim = S#decoder.state,
+ {eof, S}
+ end.
+%%
+%% Tests
+%%
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+
+%% testing constructs borrowed from the Yaws JSON implementation.
+
+%% Create an object from a list of Key/Value pairs.
+
+obj_new() ->
+ {struct, []}.
+
+is_obj({struct, Props}) ->
+ F = fun ({K, _}) when is_binary(K) -> true end,
+ lists:all(F, Props).
+
+obj_from_list(Props) ->
+ Obj = {struct, Props},
+ ?assert(is_obj(Obj)),
+ Obj.
+
+%% Test for equivalence of Erlang terms.
+%% Due to arbitrary order of construction, equivalent objects might
+%% compare unequal as erlang terms, so we need to carefully recurse
+%% through aggregates (tuples and objects).
+
+equiv({struct, Props1}, {struct, Props2}) ->
+ equiv_object(Props1, Props2);
+equiv(L1, L2) when is_list(L1), is_list(L2) ->
+ equiv_list(L1, L2);
+equiv(N1, N2) when is_number(N1), is_number(N2) -> N1 == N2;
+equiv(B1, B2) when is_binary(B1), is_binary(B2) -> B1 == B2;
+equiv(A, A) when A =:= true orelse A =:= false orelse A =:= null -> true.
+
+%% Object representation and traversal order is unknown.
+%% Use the sledgehammer and sort property lists.
+
+equiv_object(Props1, Props2) ->
+ L1 = lists:keysort(1, Props1),
+ L2 = lists:keysort(1, Props2),
+ Pairs = lists:zip(L1, L2),
+ true = lists:all(fun({{K1, V1}, {K2, V2}}) ->
+ equiv(K1, K2) and equiv(V1, V2)
+ end, Pairs).
+
+%% Recursively compare tuple elements for equivalence.
+
+equiv_list([], []) ->
+ true;
+equiv_list([V1 | L1], [V2 | L2]) ->
+ equiv(V1, V2) andalso equiv_list(L1, L2).
+
+decode_test() ->
+ [1199344435545.0, 1] = decode(<<"[1199344435545.0,1]">>),
+ <<16#F0,16#9D,16#9C,16#95>> = decode([34,"\\ud835","\\udf15",34]).
+
+e2j_vec_test() ->
+ test_one(e2j_test_vec(utf8), 1).
+
+test_one([], _N) ->
+ %% io:format("~p tests passed~n", [N-1]),
+ ok;
+test_one([{E, J} | Rest], N) ->
+ %% io:format("[~p] ~p ~p~n", [N, E, J]),
+ true = equiv(E, decode(J)),
+ true = equiv(E, decode(encode(E))),
+ test_one(Rest, 1+N).
+
+e2j_test_vec(utf8) ->
+ [
+ {1, "1"},
+ {3.1416, "3.14160"}, %% text representation may truncate, trail zeroes
+ {-1, "-1"},
+ {-3.1416, "-3.14160"},
+ {12.0e10, "1.20000e+11"},
+ {1.234E+10, "1.23400e+10"},
+ {-1.234E-10, "-1.23400e-10"},
+ {10.0, "1.0e+01"},
+ {123.456, "1.23456E+2"},
+ {10.0, "1e1"},
+ {<<"foo">>, "\"foo\""},
+ {<<"foo", 5, "bar">>, "\"foo\\u0005bar\""},
+ {<<"">>, "\"\""},
+ {<<"\n\n\n">>, "\"\\n\\n\\n\""},
+ {<<"\" \b\f\r\n\t\"">>, "\"\\\" \\b\\f\\r\\n\\t\\\"\""},
+ {obj_new(), "{}"},
+ {obj_from_list([{<<"foo">>, <<"bar">>}]), "{\"foo\":\"bar\"}"},
+ {obj_from_list([{<<"foo">>, <<"bar">>}, {<<"baz">>, 123}]),
+ "{\"foo\":\"bar\",\"baz\":123}"},
+ {[], "[]"},
+ {[[]], "[[]]"},
+ {[1, <<"foo">>], "[1,\"foo\"]"},
+
+ %% json array in a json object
+ {obj_from_list([{<<"foo">>, [123]}]),
+ "{\"foo\":[123]}"},
+
+ %% json object in a json object
+ {obj_from_list([{<<"foo">>, obj_from_list([{<<"bar">>, true}])}]),
+ "{\"foo\":{\"bar\":true}}"},
+
+ %% fold evaluation order
+ {obj_from_list([{<<"foo">>, []},
+ {<<"bar">>, obj_from_list([{<<"baz">>, true}])},
+ {<<"alice">>, <<"bob">>}]),
+ "{\"foo\":[],\"bar\":{\"baz\":true},\"alice\":\"bob\"}"},
+
+ %% json object in a json array
+ {[-123, <<"foo">>, obj_from_list([{<<"bar">>, []}]), null],
+ "[-123,\"foo\",{\"bar\":[]},null]"}
+ ].
+
+%% test utf8 encoding
+encoder_utf8_test() ->
+ %% safe conversion case (default)
+ [34,"\\u0001","\\u0442","\\u0435","\\u0441","\\u0442",34] =
+ encode(<<1,"\321\202\320\265\321\201\321\202">>),
+
+ %% raw utf8 output (optional)
+ Enc = mochijson2:encoder([{utf8, true}]),
+ [34,"\\u0001",[209,130],[208,181],[209,129],[209,130],34] =
+ Enc(<<1,"\321\202\320\265\321\201\321\202">>).
+
+input_validation_test() ->
+ Good = [
+ {16#00A3, <<?Q, 16#C2, 16#A3, ?Q>>}, %% pound
+ {16#20AC, <<?Q, 16#E2, 16#82, 16#AC, ?Q>>}, %% euro
+ {16#10196, <<?Q, 16#F0, 16#90, 16#86, 16#96, ?Q>>} %% denarius
+ ],
+ lists:foreach(fun({CodePoint, UTF8}) ->
+ Expect = list_to_binary(xmerl_ucs:to_utf8(CodePoint)),
+ Expect = decode(UTF8)
+ end, Good),
+
+ Bad = [
+ %% 2nd, 3rd, or 4th byte of a multi-byte sequence w/o leading byte
+ <<?Q, 16#80, ?Q>>,
+ %% missing continuations, last byte in each should be 80-BF
+ <<?Q, 16#C2, 16#7F, ?Q>>,
+ <<?Q, 16#E0, 16#80,16#7F, ?Q>>,
+ <<?Q, 16#F0, 16#80, 16#80, 16#7F, ?Q>>,
+ %% we don't support code points > 10FFFF per RFC 3629
+ <<?Q, 16#F5, 16#80, 16#80, 16#80, ?Q>>,
+ %% escape characters trigger a different code path
+ <<?Q, $\\, $\n, 16#80, ?Q>>
+ ],
+ lists:foreach(
+ fun(X) ->
+ ok = try decode(X) catch invalid_utf8 -> ok end,
+ %% could be {ucs,{bad_utf8_character_code}} or
+ %% {json_encode,{bad_char,_}}
+ {'EXIT', _} = (catch encode(X))
+ end, Bad).
+
+inline_json_test() ->
+ ?assertEqual(<<"\"iodata iodata\"">>,
+ iolist_to_binary(
+ encode({json, [<<"\"iodata">>, " iodata\""]}))),
+ ?assertEqual({struct, [{<<"key">>, <<"iodata iodata">>}]},
+ decode(
+ encode({struct,
+ [{key, {json, [<<"\"iodata">>, " iodata\""]}}]}))),
+ ok.
+
+big_unicode_test() ->
+ UTF8Seq = list_to_binary(xmerl_ucs:to_utf8(16#0001d120)),
+ ?assertEqual(
+ <<"\"\\ud834\\udd20\"">>,
+ iolist_to_binary(encode(UTF8Seq))),
+ ?assertEqual(
+ UTF8Seq,
+ decode(iolist_to_binary(encode(UTF8Seq)))),
+ ok.
+
+custom_decoder_test() ->
+ ?assertEqual(
+ {struct, [{<<"key">>, <<"value">>}]},
+ (decoder([]))("{\"key\": \"value\"}")),
+ F = fun ({struct, [{<<"key">>, <<"value">>}]}) -> win end,
+ ?assertEqual(
+ win,
+ (decoder([{object_hook, F}]))("{\"key\": \"value\"}")),
+ ok.
+
+atom_test() ->
+ %% JSON native atoms
+ [begin
+ ?assertEqual(A, decode(atom_to_list(A))),
+ ?assertEqual(iolist_to_binary(atom_to_list(A)),
+ iolist_to_binary(encode(A)))
+ end || A <- [true, false, null]],
+ %% Atom to string
+ ?assertEqual(
+ <<"\"foo\"">>,
+ iolist_to_binary(encode(foo))),
+ ?assertEqual(
+ <<"\"\\ud834\\udd20\"">>,
+ iolist_to_binary(encode(list_to_atom(xmerl_ucs:to_utf8(16#0001d120))))),
+ ok.
+
+key_encode_test() ->
+ %% Some forms are accepted as keys that would not be strings in other
+ %% cases
+ ?assertEqual(
+ <<"{\"foo\":1}">>,
+ iolist_to_binary(encode({struct, [{foo, 1}]}))),
+ ?assertEqual(
+ <<"{\"foo\":1}">>,
+ iolist_to_binary(encode({struct, [{<<"foo">>, 1}]}))),
+ ?assertEqual(
+ <<"{\"foo\":1}">>,
+ iolist_to_binary(encode({struct, [{"foo", 1}]}))),
+ ?assertEqual(
+ <<"{\"foo\":1}">>,
+ iolist_to_binary(encode([{foo, 1}]))),
+ ?assertEqual(
+ <<"{\"foo\":1}">>,
+ iolist_to_binary(encode([{<<"foo">>, 1}]))),
+ ?assertEqual(
+ <<"{\"foo\":1}">>,
+ iolist_to_binary(encode([{"foo", 1}]))),
+ ?assertEqual(
+ <<"{\"\\ud834\\udd20\":1}">>,
+ iolist_to_binary(
+ encode({struct, [{[16#0001d120], 1}]}))),
+ ?assertEqual(
+ <<"{\"1\":1}">>,
+ iolist_to_binary(encode({struct, [{1, 1}]}))),
+ ok.
+
+unsafe_chars_test() ->
+ Chars = "\"\\\b\f\n\r\t",
+ [begin
+ ?assertEqual(false, json_string_is_safe([C])),
+ ?assertEqual(false, json_bin_is_safe(<<C>>)),
+ ?assertEqual(<<C>>, decode(encode(<<C>>)))
+ end || C <- Chars],
+ ?assertEqual(
+ false,
+ json_string_is_safe([16#0001d120])),
+ ?assertEqual(
+ false,
+ json_bin_is_safe(list_to_binary(xmerl_ucs:to_utf8(16#0001d120)))),
+ ?assertEqual(
+ [16#0001d120],
+ xmerl_ucs:from_utf8(
+ binary_to_list(
+ decode(encode(list_to_atom(xmerl_ucs:to_utf8(16#0001d120))))))),
+ ?assertEqual(
+ false,
+ json_string_is_safe([16#110000])),
+ ?assertEqual(
+ false,
+ json_bin_is_safe(list_to_binary(xmerl_ucs:to_utf8([16#110000])))),
+ %% solidus can be escaped but isn't unsafe by default
+ ?assertEqual(
+ <<"/">>,
+ decode(<<"\"\\/\"">>)),
+ ok.
+
+int_test() ->
+ ?assertEqual(0, decode("0")),
+ ?assertEqual(1, decode("1")),
+ ?assertEqual(11, decode("11")),
+ ok.
+
+large_int_test() ->
+ ?assertEqual(<<"-2147483649214748364921474836492147483649">>,
+ iolist_to_binary(encode(-2147483649214748364921474836492147483649))),
+ ?assertEqual(<<"2147483649214748364921474836492147483649">>,
+ iolist_to_binary(encode(2147483649214748364921474836492147483649))),
+ ok.
+
+float_test() ->
+ ?assertEqual(<<"-2147483649.0">>, iolist_to_binary(encode(-2147483649.0))),
+ ?assertEqual(<<"2147483648.0">>, iolist_to_binary(encode(2147483648.0))),
+ ok.
+
+handler_test() ->
+ ?assertEqual(
+ {'EXIT',{json_encode,{bad_term,{x,y}}}},
+ catch encode({x,y})),
+ F = fun ({x,y}) -> [] end,
+ ?assertEqual(
+ <<"[]">>,
+ iolist_to_binary((encoder([{handler, F}]))({x, y}))),
+ ok.
+
+encode_empty_test_() ->
+ [{A, ?_assertEqual(<<"{}">>, iolist_to_binary(encode(B)))}
+ || {A, B} <- [{"eep18 {}", {}},
+ {"eep18 {[]}", {[]}},
+ {"{struct, []}", {struct, []}}]].
+
+encode_test_() ->
+ P = [{<<"k">>, <<"v">>}],
+ JSON = iolist_to_binary(encode({struct, P})),
+ [{atom_to_list(F),
+ ?_assertEqual(JSON, iolist_to_binary(encode(decode(JSON, [{format, F}]))))}
+ || F <- [struct, eep18, proplist]].
+
+format_test_() ->
+ P = [{<<"k">>, <<"v">>}],
+ JSON = iolist_to_binary(encode({struct, P})),
+ [{atom_to_list(F),
+ ?_assertEqual(A, decode(JSON, [{format, F}]))}
+ || {F, A} <- [{struct, {struct, P}},
+ {eep18, {P}},
+ {proplist, P}]].
+
+-endif.
diff --git a/src/mochinum.erl b/src/mochinum.erl
new file mode 100644
index 00000000..4ea7a22a
--- /dev/null
+++ b/src/mochinum.erl
@@ -0,0 +1,358 @@
+%% This file is a copy of `mochijson2.erl' from mochiweb, revision
+%% d541e9a0f36c00dcadc2e589f20e47fbf46fc76f. For the license, see
+%% `LICENSE-MIT-Mochi'.
+
+%% @copyright 2007 Mochi Media, Inc.
+%% @author Bob Ippolito <bob@mochimedia.com>
+
+%% @doc Useful numeric algorithms for floats that cover some deficiencies
+%% in the math module. More interesting is digits/1, which implements
+%% the algorithm from:
+%% http://www.cs.indiana.edu/~burger/fp/index.html
+%% See also "Printing Floating-Point Numbers Quickly and Accurately"
+%% in Proceedings of the SIGPLAN '96 Conference on Programming Language
+%% Design and Implementation.
+
+-module(mochinum).
+-author("Bob Ippolito <bob@mochimedia.com>").
+-export([digits/1, frexp/1, int_pow/2, int_ceil/1]).
+
+%% IEEE 754 Float exponent bias
+-define(FLOAT_BIAS, 1022).
+-define(MIN_EXP, -1074).
+-define(BIG_POW, 4503599627370496).
+
+%% External API
+
+%% @spec digits(number()) -> string()
+%% @doc Returns a string that accurately represents the given integer or float
+%% using a conservative amount of digits. Great for generating
+%% human-readable output, or compact ASCII serializations for floats.
+digits(N) when is_integer(N) ->
+ integer_to_list(N);
+digits(0.0) ->
+ "0.0";
+digits(Float) ->
+ {Frac1, Exp1} = frexp_int(Float),
+ [Place0 | Digits0] = digits1(Float, Exp1, Frac1),
+ {Place, Digits} = transform_digits(Place0, Digits0),
+ R = insert_decimal(Place, Digits),
+ case Float < 0 of
+ true ->
+ [$- | R];
+ _ ->
+ R
+ end.
+
+%% @spec frexp(F::float()) -> {Frac::float(), Exp::float()}
+%% @doc Return the fractional and exponent part of an IEEE 754 double,
+%% equivalent to the libc function of the same name.
+%% F = Frac * pow(2, Exp).
+frexp(F) ->
+ frexp1(unpack(F)).
+
+%% @spec int_pow(X::integer(), N::integer()) -> Y::integer()
+%% @doc Moderately efficient way to exponentiate integers.
+%% int_pow(10, 2) = 100.
+int_pow(_X, 0) ->
+ 1;
+int_pow(X, N) when N > 0 ->
+ int_pow(X, N, 1).
+
+%% @spec int_ceil(F::float()) -> integer()
+%% @doc Return the ceiling of F as an integer. The ceiling is defined as
+%% F when F == trunc(F);
+%% trunc(F) when F &lt; 0;
+%% trunc(F) + 1 when F &gt; 0.
+int_ceil(X) ->
+ T = trunc(X),
+ case (X - T) of
+ Pos when Pos > 0 -> T + 1;
+ _ -> T
+ end.
+
+
+%% Internal API
+
+int_pow(X, N, R) when N < 2 ->
+ R * X;
+int_pow(X, N, R) ->
+ int_pow(X * X, N bsr 1, case N band 1 of 1 -> R * X; 0 -> R end).
+
+insert_decimal(0, S) ->
+ "0." ++ S;
+insert_decimal(Place, S) when Place > 0 ->
+ L = length(S),
+ case Place - L of
+ 0 ->
+ S ++ ".0";
+ N when N < 0 ->
+ {S0, S1} = lists:split(L + N, S),
+ S0 ++ "." ++ S1;
+ N when N < 6 ->
+ %% More places than digits
+ S ++ lists:duplicate(N, $0) ++ ".0";
+ _ ->
+ insert_decimal_exp(Place, S)
+ end;
+insert_decimal(Place, S) when Place > -6 ->
+ "0." ++ lists:duplicate(abs(Place), $0) ++ S;
+insert_decimal(Place, S) ->
+ insert_decimal_exp(Place, S).
+
+insert_decimal_exp(Place, S) ->
+ [C | S0] = S,
+ S1 = case S0 of
+ [] ->
+ "0";
+ _ ->
+ S0
+ end,
+ Exp = case Place < 0 of
+ true ->
+ "e-";
+ false ->
+ "e+"
+ end,
+ [C] ++ "." ++ S1 ++ Exp ++ integer_to_list(abs(Place - 1)).
+
+
+digits1(Float, Exp, Frac) ->
+ Round = ((Frac band 1) =:= 0),
+ case Exp >= 0 of
+ true ->
+ BExp = 1 bsl Exp,
+ case (Frac =/= ?BIG_POW) of
+ true ->
+ scale((Frac * BExp * 2), 2, BExp, BExp,
+ Round, Round, Float);
+ false ->
+ scale((Frac * BExp * 4), 4, (BExp * 2), BExp,
+ Round, Round, Float)
+ end;
+ false ->
+ case (Exp =:= ?MIN_EXP) orelse (Frac =/= ?BIG_POW) of
+ true ->
+ scale((Frac * 2), 1 bsl (1 - Exp), 1, 1,
+ Round, Round, Float);
+ false ->
+ scale((Frac * 4), 1 bsl (2 - Exp), 2, 1,
+ Round, Round, Float)
+ end
+ end.
+
+scale(R, S, MPlus, MMinus, LowOk, HighOk, Float) ->
+ Est = int_ceil(math:log10(abs(Float)) - 1.0e-10),
+ %% Note that the scheme implementation uses a 326 element look-up table
+ %% for int_pow(10, N) where we do not.
+ case Est >= 0 of
+ true ->
+ fixup(R, S * int_pow(10, Est), MPlus, MMinus, Est,
+ LowOk, HighOk);
+ false ->
+ Scale = int_pow(10, -Est),
+ fixup(R * Scale, S, MPlus * Scale, MMinus * Scale, Est,
+ LowOk, HighOk)
+ end.
+
+fixup(R, S, MPlus, MMinus, K, LowOk, HighOk) ->
+ TooLow = case HighOk of
+ true ->
+ (R + MPlus) >= S;
+ false ->
+ (R + MPlus) > S
+ end,
+ case TooLow of
+ true ->
+ [(K + 1) | generate(R, S, MPlus, MMinus, LowOk, HighOk)];
+ false ->
+ [K | generate(R * 10, S, MPlus * 10, MMinus * 10, LowOk, HighOk)]
+ end.
+
+generate(R0, S, MPlus, MMinus, LowOk, HighOk) ->
+ D = R0 div S,
+ R = R0 rem S,
+ TC1 = case LowOk of
+ true ->
+ R =< MMinus;
+ false ->
+ R < MMinus
+ end,
+ TC2 = case HighOk of
+ true ->
+ (R + MPlus) >= S;
+ false ->
+ (R + MPlus) > S
+ end,
+ case TC1 of
+ false ->
+ case TC2 of
+ false ->
+ [D | generate(R * 10, S, MPlus * 10, MMinus * 10,
+ LowOk, HighOk)];
+ true ->
+ [D + 1]
+ end;
+ true ->
+ case TC2 of
+ false ->
+ [D];
+ true ->
+ case R * 2 < S of
+ true ->
+ [D];
+ false ->
+ [D + 1]
+ end
+ end
+ end.
+
+unpack(Float) ->
+ <<Sign:1, Exp:11, Frac:52>> = <<Float:64/float>>,
+ {Sign, Exp, Frac}.
+
+frexp1({_Sign, 0, 0}) ->
+ {0.0, 0};
+frexp1({Sign, 0, Frac}) ->
+ Exp = log2floor(Frac),
+ <<Frac1:64/float>> = <<Sign:1, ?FLOAT_BIAS:11, (Frac-1):52>>,
+ {Frac1, -(?FLOAT_BIAS) - 52 + Exp};
+frexp1({Sign, Exp, Frac}) ->
+ <<Frac1:64/float>> = <<Sign:1, ?FLOAT_BIAS:11, Frac:52>>,
+ {Frac1, Exp - ?FLOAT_BIAS}.
+
+log2floor(Int) ->
+ log2floor(Int, 0).
+
+log2floor(0, N) ->
+ N;
+log2floor(Int, N) ->
+ log2floor(Int bsr 1, 1 + N).
+
+
+transform_digits(Place, [0 | Rest]) ->
+ transform_digits(Place, Rest);
+transform_digits(Place, Digits) ->
+ {Place, [$0 + D || D <- Digits]}.
+
+
+frexp_int(F) ->
+ case unpack(F) of
+ {_Sign, 0, Frac} ->
+ {Frac, ?MIN_EXP};
+ {_Sign, Exp, Frac} ->
+ {Frac + (1 bsl 52), Exp - 53 - ?FLOAT_BIAS}
+ end.
+
+%%
+%% Tests
+%%
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+int_ceil_test() ->
+ ?assertEqual(1, int_ceil(0.0001)),
+ ?assertEqual(0, int_ceil(0.0)),
+ ?assertEqual(1, int_ceil(0.99)),
+ ?assertEqual(1, int_ceil(1.0)),
+ ?assertEqual(-1, int_ceil(-1.5)),
+ ?assertEqual(-2, int_ceil(-2.0)),
+ ok.
+
+int_pow_test() ->
+ ?assertEqual(1, int_pow(1, 1)),
+ ?assertEqual(1, int_pow(1, 0)),
+ ?assertEqual(1, int_pow(10, 0)),
+ ?assertEqual(10, int_pow(10, 1)),
+ ?assertEqual(100, int_pow(10, 2)),
+ ?assertEqual(1000, int_pow(10, 3)),
+ ok.
+
+digits_test() ->
+ ?assertEqual("0",
+ digits(0)),
+ ?assertEqual("0.0",
+ digits(0.0)),
+ ?assertEqual("1.0",
+ digits(1.0)),
+ ?assertEqual("-1.0",
+ digits(-1.0)),
+ ?assertEqual("0.1",
+ digits(0.1)),
+ ?assertEqual("0.01",
+ digits(0.01)),
+ ?assertEqual("0.001",
+ digits(0.001)),
+ ?assertEqual("1.0e+6",
+ digits(1000000.0)),
+ ?assertEqual("0.5",
+ digits(0.5)),
+ ?assertEqual("4503599627370496.0",
+ digits(4503599627370496.0)),
+ %% small denormalized number
+ %% 4.94065645841246544177e-324 =:= 5.0e-324
+ <<SmallDenorm/float>> = <<0,0,0,0,0,0,0,1>>,
+ ?assertEqual("5.0e-324",
+ digits(SmallDenorm)),
+ ?assertEqual(SmallDenorm,
+ list_to_float(digits(SmallDenorm))),
+ %% large denormalized number
+ %% 2.22507385850720088902e-308
+ <<BigDenorm/float>> = <<0,15,255,255,255,255,255,255>>,
+ ?assertEqual("2.225073858507201e-308",
+ digits(BigDenorm)),
+ ?assertEqual(BigDenorm,
+ list_to_float(digits(BigDenorm))),
+ %% small normalized number
+ %% 2.22507385850720138309e-308
+ <<SmallNorm/float>> = <<0,16,0,0,0,0,0,0>>,
+ ?assertEqual("2.2250738585072014e-308",
+ digits(SmallNorm)),
+ ?assertEqual(SmallNorm,
+ list_to_float(digits(SmallNorm))),
+ %% large normalized number
+ %% 1.79769313486231570815e+308
+ <<LargeNorm/float>> = <<127,239,255,255,255,255,255,255>>,
+ ?assertEqual("1.7976931348623157e+308",
+ digits(LargeNorm)),
+ ?assertEqual(LargeNorm,
+ list_to_float(digits(LargeNorm))),
+ %% issue #10 - mochinum:frexp(math:pow(2, -1074)).
+ ?assertEqual("5.0e-324",
+ digits(math:pow(2, -1074))),
+ ok.
+
+frexp_test() ->
+ %% zero
+ ?assertEqual({0.0, 0}, frexp(0.0)),
+ %% one
+ ?assertEqual({0.5, 1}, frexp(1.0)),
+ %% negative one
+ ?assertEqual({-0.5, 1}, frexp(-1.0)),
+ %% small denormalized number
+ %% 4.94065645841246544177e-324
+ <<SmallDenorm/float>> = <<0,0,0,0,0,0,0,1>>,
+ ?assertEqual({0.5, -1073}, frexp(SmallDenorm)),
+ %% large denormalized number
+ %% 2.22507385850720088902e-308
+ <<BigDenorm/float>> = <<0,15,255,255,255,255,255,255>>,
+ ?assertEqual(
+ {0.99999999999999978, -1022},
+ frexp(BigDenorm)),
+ %% small normalized number
+ %% 2.22507385850720138309e-308
+ <<SmallNorm/float>> = <<0,16,0,0,0,0,0,0>>,
+ ?assertEqual({0.5, -1021}, frexp(SmallNorm)),
+ %% large normalized number
+ %% 1.79769313486231570815e+308
+ <<LargeNorm/float>> = <<127,239,255,255,255,255,255,255>>,
+ ?assertEqual(
+ {0.99999999999999989, 1024},
+ frexp(LargeNorm)),
+ %% issue #10 - mochinum:frexp(math:pow(2, -1074)).
+ ?assertEqual(
+ {0.5, -1073},
+ frexp(math:pow(2, -1074))),
+ ok.
+
+-endif.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index ed258c71..7a021e37 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -176,7 +176,7 @@
-rabbit_boot_step({notify_cluster,
[{description, "notify cluster nodes"},
- {mfa, {rabbit_node_monitor, notify_cluster, []}},
+ {mfa, {rabbit_node_monitor, notify_node_up, []}},
{requires, networking}]}).
%%---------------------------------------------------------------------------
@@ -300,6 +300,8 @@ start() ->
%% We do not want to HiPE compile or upgrade
%% mnesia after just restarting the app
ok = ensure_application_loaded(),
+ ok = rabbit_node_monitor:prepare_cluster_status_files(),
+ ok = rabbit_mnesia:check_cluster_consistency(),
ok = ensure_working_log_handlers(),
ok = app_utils:start_applications(app_startup_order()),
ok = print_plugin_info(rabbit_plugins:active())
@@ -309,8 +311,13 @@ boot() ->
start_it(fun() ->
ok = ensure_application_loaded(),
maybe_hipe_compile(),
+ ok = rabbit_node_monitor:prepare_cluster_status_files(),
ok = ensure_working_log_handlers(),
ok = rabbit_upgrade:maybe_upgrade_mnesia(),
+ %% It's important that the consistency check happens after
+ %% the upgrade, since if we are a secondary node the
+ %% primary node will have forgotten us
+ ok = rabbit_mnesia:check_cluster_consistency(),
Plugins = rabbit_plugins:setup(),
ToBeLoaded = Plugins ++ ?APPS,
ok = app_utils:load_applications(ToBeLoaded),
@@ -408,7 +415,6 @@ start(normal, []) ->
end.
stop(_State) ->
- ok = rabbit_mnesia:record_running_nodes(),
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
ok = rabbit_alarm:stop(),
ok = case rabbit_mnesia:is_clustered() of
@@ -505,12 +511,12 @@ sort_boot_steps(UnsortedSteps) ->
end.
boot_step_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
+ AllNodes = rabbit_mnesia:all_clustered_nodes(),
{Err, Nodes} =
- case rabbit_mnesia:read_previously_running_nodes() of
+ case AllNodes -- [node()] of
[] -> {"Timeout contacting cluster nodes. Since RabbitMQ was"
" shut down forcefully~nit cannot determine which nodes"
- " are timing out. Details on all nodes will~nfollow.~n",
- rabbit_mnesia:all_clustered_nodes() -- [node()]};
+ " are timing out.~n", []};
Ns -> {rabbit_misc:format(
"Timeout contacting cluster nodes: ~p.~n", [Ns]),
Ns}
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a5f227bc..b2473f91 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -40,6 +40,8 @@
-define(INTEGER_ARG_TYPES, [byte, short, signedint, long]).
+-define(MAX_EXPIRY_TIMER, 4294967295).
+
-define(MORE_CONSUMER_CREDIT_AFTER, 50).
-define(FAILOVER_WAIT_MILLIS, 100).
@@ -311,8 +313,17 @@ with(Name, F, E) ->
case lookup(Name) of
{ok, Q = #amqqueue{slave_pids = []}} ->
rabbit_misc:with_exit_handler(E, fun () -> F(Q) end);
- {ok, Q} ->
- E1 = fun () -> timer:sleep(25), with(Name, F, E) end,
+ {ok, Q = #amqqueue{pid = QPid}} ->
+ %% We check is_process_alive(QPid) in case we receive a
+ %% nodedown (for example) in F() that has nothing to do
+ %% with the QPid.
+ E1 = fun () ->
+ case rabbit_misc:is_process_alive(QPid) of
+ true -> E();
+ false -> timer:sleep(25),
+ with(Name, F, E)
+ end
+ end,
rabbit_misc:with_exit_handler(E1, fun () -> F(Q) end);
{error, not_found} ->
E()
@@ -388,16 +399,18 @@ check_int_arg({Type, _}, _) ->
check_positive_int_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
- ok when Val > 0 -> ok;
- ok -> {error, {value_zero_or_less, Val}};
- Error -> Error
+ ok when Val > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, Val}};
+ ok when Val > 0 -> ok;
+ ok -> {error, {value_zero_or_less, Val}};
+ Error -> Error
end.
check_non_neg_int_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
- ok when Val >= 0 -> ok;
- ok -> {error, {value_less_than_zero, Val}};
- Error -> Error
+ ok when Val > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, Val}};
+ ok when Val >= 0 -> ok;
+ ok -> {error, {value_less_than_zero, Val}};
+ Error -> Error
end.
check_dlxrk_arg({longstr, _}, Args) ->
@@ -565,7 +578,12 @@ flush_all(QPids, ChPid) ->
internal_delete1(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
- ok = mnesia:delete({rabbit_durable_queue, QueueName}),
+ %% this 'guarded' delete prevents unnecessary writes to the mnesia
+ %% disk log
+ case mnesia:wread({rabbit_durable_queue, QueueName}) of
+ [] -> ok;
+ [_] -> ok = mnesia:delete({rabbit_durable_queue, QueueName})
+ end,
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName).
@@ -681,11 +699,10 @@ safe_delegate_call_ok(F, Pids) ->
fun () -> ok end,
fun () -> F(Pid) end)
end),
- case lists:filter(fun ({_Pid, {exit, {R, _}, _}}) ->
- rabbit_misc:is_abnormal_exit(R);
- ({_Pid, _}) ->
- false
- end, Bads) of
+ case lists:filter(
+ fun ({_Pid, {exit, {R, _}, _}}) -> rabbit_misc:is_abnormal_exit(R);
+ ({_Pid, _}) -> false
+ end, Bads) of
[] -> ok;
Bads1 -> {error, Bads1}
end.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b4071627..e647627c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -47,6 +47,7 @@
msg_id_to_channel,
ttl,
ttl_timer_ref,
+ ttl_timer_expiry,
senders,
publish_seqno,
unconfirmed,
@@ -107,7 +108,7 @@
]).
-define(INFO_KEYS,
- ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid, slave_pids]).
+ ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
%%----------------------------------------------------------------------------
@@ -559,24 +560,22 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
- msg_seq_no = MsgSeqNo,
sender = SenderPid}, State) ->
Confirm = should_confirm_message(Delivery, State),
case attempt_delivery(Delivery, Confirm, State) of
{true, State1} ->
maybe_record_confirm_message(Confirm, State1);
- %% the next two are optimisations
+ %% the next one is an optimisations
+ %% TODO: optimise the Confirm =/= never case too
{false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never ->
discard_delivery(Delivery, State1);
- {false, State1 = #q{ttl = 0, dlx = undefined}} ->
- rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
- discard_delivery(Delivery, State1);
{false, State1} ->
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_record_confirm_message(Confirm, State1),
Props = message_properties(Confirm, State2),
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
- ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
+ ensure_ttl_timer(Props#message_properties.expiry,
+ State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
@@ -716,28 +715,41 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ }) ->
Now = now_micros(),
DLXFun = dead_letter_fun(expired, State),
- ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
- case DLXFun of
- undefined -> {undefined, BQS1} = BQ:dropwhile(ExpirePred, false, BQS),
- BQS1;
- _ -> {Msgs, BQS1} = BQ:dropwhile(ExpirePred, true, BQS),
- lists:foreach(
- fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs),
- BQS1
- end,
- ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
-
-ensure_ttl_timer(State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- ttl = TTL,
- ttl_timer_ref = undefined})
- when TTL =/= undefined ->
- case BQ:is_empty(BQS) of
- true -> State;
- false -> TRef = erlang:send_after(TTL, self(), drop_expired),
- State#q{ttl_timer_ref = TRef}
+ ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
+ {Props, BQS1} =
+ case DLXFun of
+ undefined ->
+ {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS),
+ {Next, BQS2};
+ _ ->
+ {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
+ DLXFun(Msgs),
+ {Next, BQS2}
+ end,
+ ensure_ttl_timer(case Props of
+ undefined -> undefined;
+ #message_properties{expiry = Exp} -> Exp
+ end, State#q{backing_queue_state = BQS1}).
+
+ensure_ttl_timer(undefined, State) ->
+ State;
+ensure_ttl_timer(_Expiry, State = #q{ttl = undefined}) ->
+ State;
+ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
+ After = (case Expiry - now_micros() of
+ V when V > 0 -> V + 999; %% always fire later
+ _ -> 0
+ end) div 1000,
+ TRef = erlang:send_after(After, self(), drop_expired),
+ State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
+ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
+ ttl_timer_expiry = TExpiry})
+ when Expiry + 1000 < TExpiry ->
+ case erlang:cancel_timer(TRef) of
+ false -> State;
+ _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
end;
-ensure_ttl_timer(State) ->
+ensure_ttl_timer(_Expiry, State) ->
State.
ack_if_no_dlx(AckTags, State = #q{dlx = undefined,
@@ -751,37 +763,17 @@ ack_if_no_dlx(_AckTags, State) ->
dead_letter_fun(_Reason, #q{dlx = undefined}) ->
undefined;
dead_letter_fun(Reason, _State) ->
- fun(Msg, AckTag) ->
- gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason})
- end.
-
-dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) ->
- DLMsg = #basic_message{exchange_name = XName} =
- make_dead_letter_msg(Reason, Msg, State),
- case rabbit_exchange:lookup(XName) of
- {ok, X} ->
- Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo),
- {Queues, Cycles} = detect_dead_letter_cycles(
- DLMsg, rabbit_exchange:route(X, Delivery)),
- lists:foreach(fun log_cycle_once/1, Cycles),
- QPids = rabbit_amqqueue:lookup(Queues),
- {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
- DeliveredQPids;
- {error, not_found} ->
- []
- end.
-
-dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo,
- unconfirmed = UC}) ->
- QPids = dead_letter_publish(Msg, Reason, State),
- State1 = State#q{queue_monitors = pmon:monitor_all(
- QPids, State#q.queue_monitors),
- publish_seqno = MsgSeqNo + 1},
- case QPids of
- [] -> cleanup_after_confirm([AckTag], State1);
- _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC),
- noreply(State1#q{unconfirmed = UC1})
- end.
+ fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end.
+
+dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
+ DLMsg = make_dead_letter_msg(Reason, Msg, State),
+ Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo),
+ {Queues, Cycles} = detect_dead_letter_cycles(
+ DLMsg, rabbit_exchange:route(X, Delivery)),
+ lists:foreach(fun log_cycle_once/1, Cycles),
+ QPids = rabbit_amqqueue:lookup(Queues),
+ {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
+ DeliveredQPids.
handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
unconfirmed = UC}) ->
@@ -1228,7 +1220,12 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
true -> fun (State1) -> requeue_and_run(AckTags, State1) end;
false -> fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- Fun = dead_letter_fun(rejected, State1),
+ Fun =
+ case dead_letter_fun(rejected, State1) of
+ undefined -> undefined;
+ F -> fun(M, A) -> F([{M, A}])
+ end
+ end,
BQS1 = BQ:fold(Fun, BQS, AckTags),
ack_if_no_dlx(
AckTags,
@@ -1280,8 +1277,24 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
- dead_letter_msg(Msg, AckTag, Reason, State);
+handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
+ case rabbit_exchange:lookup(XName) of
+ {ok, X} ->
+ noreply(lists:foldl(
+ fun({Msg, AckTag}, State1 = #q{publish_seqno = SeqNo,
+ unconfirmed = UC,
+ queue_monitors = QMon}) ->
+ QPids = dead_letter_publish(Msg, Reason, X,
+ State1),
+ UC1 = dtree:insert(SeqNo, QPids, AckTag, UC),
+ QMons = pmon:monitor_all(QPids, QMon),
+ State1#q{queue_monitors = QMons,
+ publish_seqno = SeqNo + 1,
+ unconfirmed = UC1}
+ end, State, Msgs));
+ {error, not_found} ->
+ cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
+ end;
handle_cast(wake_up, State) ->
noreply(State).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 95523bed..d69a6c3b 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -124,9 +124,11 @@
%% necessitate an ack or not. If they do, the function returns a list of
%% messages with the respective acktags.
-callback dropwhile(msg_pred(), true, state())
- -> {[{rabbit_types:basic_message(), ack()}], state()};
+ -> {rabbit_types:message_properties() | undefined,
+ [{rabbit_types:basic_message(), ack()}], state()};
(msg_pred(), false, state())
- -> {undefined, state()}.
+ -> {rabbit_types:message_properties() | undefined,
+ undefined, state()}.
%% Produce the next message.
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
@@ -150,6 +152,9 @@
%% Is my queue empty?
-callback is_empty(state()) -> boolean().
+%% What's the queue depth, where depth = length + number of pending acks
+-callback depth(state()) -> non_neg_integer().
+
%% For the next three functions, the assumption is that you're
%% monitoring something like the ingress and egress rates of the
%% queue. The RAM duration is thus the length of time represented by
@@ -210,9 +215,10 @@ behaviour_info(callbacks) ->
{delete_and_terminate, 2}, {purge, 1}, {publish, 4},
{publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3},
{fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
- {is_empty, 1}, {set_ram_duration_target, 2}, {ram_duration, 1},
- {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1},
- {status, 1}, {invoke, 3}, {is_duplicate, 2}, {discard, 3}];
+ {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
+ {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
+ {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2},
+ {discard, 3}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index a84800c0..e40d9b29 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -268,7 +268,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
S#state{bqstate = BQ1};
next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) ->
- BQ = {call, erlang, element, [2, Res]},
+ BQ = {call, erlang, element, [3, Res]},
#state{messages = Messages} = S,
Msgs1 = drop_messages(Messages),
S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1};
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index f0ea514d..2e462354 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -277,21 +277,15 @@ has_for_source(SrcName) ->
remove_for_source(SrcName) ->
lock_route_tables(),
Match = #route{binding = #binding{source = SrcName, _ = '_'}},
- Routes = lists:usort(
- mnesia:match_object(rabbit_route, Match, write) ++
- mnesia:match_object(rabbit_durable_route, Match, write)),
- [begin
- sync_route(Route, fun mnesia:delete_object/3),
- Route#route.binding
- end || Route <- Routes].
+ remove_routes(
+ lists:usort(mnesia:match_object(rabbit_route, Match, write) ++
+ mnesia:match_object(rabbit_durable_route, Match, write))).
-remove_for_destination(Dst) ->
- remove_for_destination(
- Dst, fun (R) -> sync_route(R, fun mnesia:delete_object/3) end).
+remove_for_destination(DstName) ->
+ remove_for_destination(DstName, fun remove_routes/1).
-remove_transient_for_destination(Dst) ->
- remove_for_destination(
- Dst, fun (R) -> sync_transient_route(R, fun mnesia:delete_object/3) end).
+remove_transient_for_destination(DstName) ->
+ remove_for_destination(DstName, fun remove_transient_routes/1).
%%----------------------------------------------------------------------------
@@ -308,6 +302,14 @@ binding_action(Binding = #binding{source = SrcName,
Fun(Src, Dst, Binding#binding{args = SortedArgs})
end).
+delete_object(Tab, Record, LockKind) ->
+ %% this 'guarded' delete prevents unnecessary writes to the mnesia
+ %% disk log
+ case mnesia:match_object(Tab, Record, LockKind) of
+ [] -> ok;
+ [_] -> mnesia:delete_object(Tab, Record, LockKind)
+ end.
+
sync_route(R, Fun) -> sync_route(R, true, true, Fun).
sync_route(Route, true, true, Fun) ->
@@ -370,16 +372,32 @@ lock_route_tables() ->
rabbit_semi_durable_route,
rabbit_durable_route]].
-remove_for_destination(DstName, DeleteFun) ->
+remove_routes(Routes) ->
+ %% This partitioning allows us to suppress unnecessary delete
+ %% operations on disk tables, which require an fsync.
+ {TransientRoutes, DurableRoutes} =
+ lists:partition(fun (R) -> mnesia:match_object(
+ rabbit_durable_route, R, write) == [] end,
+ Routes),
+ [ok = sync_transient_route(R, fun mnesia:delete_object/3) ||
+ R <- TransientRoutes],
+ [ok = sync_route(R, fun mnesia:delete_object/3) ||
+ R <- DurableRoutes],
+ [R#route.binding || R <- Routes].
+
+remove_transient_routes(Routes) ->
+ [begin
+ ok = sync_transient_route(R, fun delete_object/3),
+ R#route.binding
+ end || R <- Routes].
+
+remove_for_destination(DstName, Fun) ->
lock_route_tables(),
Match = reverse_route(
#route{binding = #binding{destination = DstName, _ = '_'}}),
- ReverseRoutes = mnesia:match_object(rabbit_reverse_route, Match, write),
- Bindings = [begin
- Route = reverse_route(ReverseRoute),
- ok = DeleteFun(Route),
- Route#route.binding
- end || ReverseRoute <- ReverseRoutes],
+ Routes = [reverse_route(R) || R <- mnesia:match_object(
+ rabbit_reverse_route, Match, write)],
+ Bindings = Fun(Routes),
group_bindings_fold(fun maybe_auto_delete/3, new_deletions(),
lists:keysort(#binding.source, Bindings)).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 69fe0edc..e50e823c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -465,10 +465,14 @@ check_user_id_header(#'P_basic'{user_id = Username},
#ch{user = #user{username = Username}}) ->
ok;
check_user_id_header(#'P_basic'{user_id = Claimed},
- #ch{user = #user{username = Actual}}) ->
- precondition_failed(
- "user_id property set to '~s' but authenticated user was '~s'",
- [Claimed, Actual]).
+ #ch{user = #user{username = Actual,
+ tags = Tags}}) ->
+ case lists:member(impersonator, Tags) of
+ true -> ok;
+ false -> precondition_failed(
+ "user_id property set to '~s' but authenticated user was "
+ "'~s'", [Claimed, Actual])
+ end.
check_internal_exchange(#exchange{name = Name, internal = true}) ->
rabbit_misc:protocol_error(access_refused,
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 0dda32f1..bd01a1b1 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -25,10 +25,14 @@
-define(QUIET_OPT, "-q").
-define(NODE_OPT, "-n").
-define(VHOST_OPT, "-p").
+-define(RAM_OPT, "--ram").
+-define(OFFLINE_OPT, "--offline").
-define(QUIET_DEF, {?QUIET_OPT, flag}).
-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
-define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}).
+-define(RAM_DEF, {?RAM_OPT, flag}).
+-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}).
-define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node)]).
@@ -41,8 +45,10 @@
force_reset,
rotate_logs,
- cluster,
- force_cluster,
+ {join_cluster, [?RAM_DEF]},
+ change_cluster_node_type,
+ update_cluster_nodes,
+ {forget_cluster_node, [?OFFLINE_DEF]},
cluster_status,
add_user,
@@ -60,9 +66,9 @@
{list_permissions, [?VHOST_DEF]},
list_user_permissions,
- set_parameter,
- clear_parameter,
- list_parameters,
+ {set_parameter, [?VHOST_DEF]},
+ {clear_parameter, [?VHOST_DEF]},
+ {list_parameters, [?VHOST_DEF]},
{list_queues, [?VHOST_DEF]},
{list_exchanges, [?VHOST_DEF]},
@@ -164,8 +170,8 @@ start() ->
{error, Reason} ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
- {error_string, Reason} ->
- print_error("~s", [Reason]),
+ {parse_error, {_Line, Mod, Err}} ->
+ print_error("~s", [lists:flatten(Mod:format_error(Err))]),
rabbit_misc:quit(2);
{badrpc, {'EXIT', Reason}} ->
print_error("~p", [Reason]),
@@ -239,17 +245,31 @@ action(force_reset, Node, [], _Opts, Inform) ->
Inform("Forcefully resetting node ~p", [Node]),
call(Node, {rabbit_mnesia, force_reset, []});
-action(cluster, Node, ClusterNodeSs, _Opts, Inform) ->
- ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
- Inform("Clustering node ~p with ~p",
- [Node, ClusterNodes]),
- rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]);
-
-action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
- ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
- Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)",
- [Node, ClusterNodes]),
- rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]);
+action(join_cluster, Node, [ClusterNodeS], Opts, Inform) ->
+ ClusterNode = list_to_atom(ClusterNodeS),
+ DiscNode = not proplists:get_bool(?RAM_OPT, Opts),
+ Inform("Clustering node ~p with ~p", [Node, ClusterNode]),
+ rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, DiscNode]);
+
+action(change_cluster_node_type, Node, ["ram"], _Opts, Inform) ->
+ Inform("Turning ~p into a ram node", [Node]),
+ rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [ram]);
+action(change_cluster_node_type, Node, [Type], _Opts, Inform)
+ when Type =:= "disc" orelse Type =:= "disk" ->
+ Inform("Turning ~p into a disc node", [Node]),
+ rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [disc]);
+
+action(update_cluster_nodes, Node, [ClusterNodeS], _Opts, Inform) ->
+ ClusterNode = list_to_atom(ClusterNodeS),
+ Inform("Updating cluster nodes for ~p from ~p", [Node, ClusterNode]),
+ rpc_call(Node, rabbit_mnesia, update_cluster_nodes, [ClusterNode]);
+
+action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) ->
+ ClusterNode = list_to_atom(ClusterNodeS),
+ RemoveWhenOffline = proplists:get_bool(?OFFLINE_OPT, Opts),
+ Inform("Removing node ~p from cluster", [ClusterNode]),
+ rpc_call(Node, rabbit_mnesia, forget_cluster_node,
+ [ClusterNode, RemoveWhenOffline]);
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
@@ -414,21 +434,25 @@ action(list_permissions, Node, [], Opts, Inform) ->
list_vhost_permissions, [VHost]}),
rabbit_auth_backend_internal:vhost_perms_info_keys());
-action(set_parameter, Node, [Component, Key, Value], _Opts, Inform) ->
+action(set_parameter, Node, [Component, Key, Value], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Setting runtime parameter ~p for component ~p to ~p",
[Key, Component, Value]),
rpc_call(Node, rabbit_runtime_parameters, parse_set,
- [list_to_binary(Component), list_to_binary(Key), Value]);
+ [VHostArg, list_to_binary(Component), list_to_binary(Key), Value]);
-action(clear_parameter, Node, [Component, Key], _Opts, Inform) ->
+action(clear_parameter, Node, [Component, Key], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Clearing runtime parameter ~p for component ~p", [Key, Component]),
- rpc_call(Node, rabbit_runtime_parameters, clear, [list_to_binary(Component),
+ rpc_call(Node, rabbit_runtime_parameters, clear, [VHostArg,
+ list_to_binary(Component),
list_to_binary(Key)]);
-action(list_parameters, Node, Args = [], _Opts, Inform) ->
+action(list_parameters, Node, [], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Listing runtime parameters", []),
display_info_list(
- rpc_call(Node, rabbit_runtime_parameters, list_formatted, Args),
+ rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]),
rabbit_runtime_parameters:info_keys());
action(report, Node, _Args, _Opts, Inform) ->
@@ -445,16 +469,15 @@ action(eval, Node, [Expr], _Opts, _Inform) ->
case erl_scan:string(Expr) of
{ok, Scanned, _} ->
case erl_parse:parse_exprs(Scanned) of
- {ok, Parsed} ->
- {value, Value, _} = unsafe_rpc(
- Node, erl_eval, exprs, [Parsed, []]),
- io:format("~p~n", [Value]),
- ok;
- {error, E} ->
- {error_string, format_parse_error(E)}
+ {ok, Parsed} -> {value, Value, _} =
+ unsafe_rpc(
+ Node, erl_eval, exprs, [Parsed, []]),
+ io:format("~p~n", [Value]),
+ ok;
+ {error, E} -> {parse_error, E}
end;
{error, E, _} ->
- {error_string, format_parse_error(E)}
+ {parse_error, E}
end.
%%----------------------------------------------------------------------------
@@ -543,9 +566,6 @@ exit_loop(Port) ->
{Port, _} -> exit_loop(Port)
end.
-format_parse_error({_Line, Mod, Err}) ->
- lists:flatten(Mod:format_error(Err)).
-
%%----------------------------------------------------------------------------
default_if_empty(List, Default) when is_list(List) ->
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index c87b1dc1..a669a2b3 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -31,8 +31,8 @@
-spec(force_event_refresh/0 :: () -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
--spec(connect/5 :: (rabbit_types:username(), rabbit_types:vhost(),
- rabbit_types:protocol(), pid(),
+-spec(connect/5 :: ((rabbit_types:username() | rabbit_types:user()),
+ rabbit_types:vhost(), rabbit_types:protocol(), pid(),
rabbit_event:event_props()) ->
{'ok', {rabbit_types:user(),
rabbit_framing:amqp_table()}}).
@@ -64,22 +64,22 @@ list() ->
%%----------------------------------------------------------------------------
+connect(User = #user{}, VHost, Protocol, Pid, Infos) ->
+ try rabbit_access_control:check_vhost_access(User, VHost) of
+ ok -> ok = pg_local:join(rabbit_direct, Pid),
+ rabbit_event:notify(connection_created, Infos),
+ {ok, {User, rabbit_reader:server_properties(Protocol)}}
+ catch
+ exit:#amqp_error{name = access_refused} ->
+ {error, access_refused}
+ end;
+
connect(Username, VHost, Protocol, Pid, Infos) ->
case rabbit:is_running() of
true ->
case rabbit_access_control:check_user_login(Username, []) of
- {ok, User} ->
- try rabbit_access_control:check_vhost_access(User, VHost) of
- ok -> ok = pg_local:join(rabbit_direct, Pid),
- rabbit_event:notify(connection_created, Infos),
- {ok, {User,
- rabbit_reader:server_properties(Protocol)}}
- catch
- exit:#amqp_error{name = access_refused} ->
- {error, access_refused}
- end;
- {refused, _Msg, _Args} ->
- {error, auth_failure}
+ {ok, User} -> connect(User, VHost, Protocol, Pid, Infos);
+ {refused, _M, _A} -> {error, auth_failure}
end;
false ->
{error, broker_not_found_on_node}
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index e72181c0..6330d555 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -137,7 +137,7 @@ dir() -> rabbit_mnesia:dir().
set_disk_limits(State, Limit) ->
State1 = State#state { limit = Limit },
rabbit_log:info("Disk free limit set to ~pMB~n",
- [trunc(interpret_limit(Limit) / 1048576)]),
+ [trunc(interpret_limit(Limit) / 1000000)]),
internal_update(State1).
internal_update(State = #state { limit = Limit,
@@ -148,10 +148,10 @@ internal_update(State = #state { limit = Limit,
NewAlarmed = CurrentFreeBytes < LimitBytes,
case {Alarmed, NewAlarmed} of
{false, true} ->
- emit_update_info("exceeded", CurrentFreeBytes, LimitBytes),
+ emit_update_info("insufficient", CurrentFreeBytes, LimitBytes),
rabbit_alarm:set_alarm({{resource_limit, disk, node()}, []});
{true, false} ->
- emit_update_info("below limit", CurrentFreeBytes, LimitBytes),
+ emit_update_info("sufficient", CurrentFreeBytes, LimitBytes),
rabbit_alarm:clear_alarm({resource_limit, disk, node()});
_ ->
ok
@@ -187,10 +187,10 @@ interpret_limit({mem_relative, R}) ->
interpret_limit(L) ->
L.
-emit_update_info(State, CurrentFree, Limit) ->
+emit_update_info(StateStr, CurrentFree, Limit) ->
rabbit_log:info(
- "Disk free space limit now ~s. Free bytes:~p Limit:~p~n",
- [State, CurrentFree, Limit]).
+ "Disk free space ~s. Free bytes:~p Limit:~p~n",
+ [StateStr, CurrentFree, Limit]).
start_timer(Timeout) ->
{ok, TRef} = timer:send_interval(Timeout, update),
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 57c571f1..4cc96ef5 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -402,7 +402,12 @@ conditional_delete(X = #exchange{name = XName}) ->
end.
unconditional_delete(X = #exchange{name = XName}) ->
- ok = mnesia:delete({rabbit_durable_exchange, XName}),
+ %% this 'guarded' delete prevents unnecessary writes to the mnesia
+ %% disk log
+ case mnesia:wread({rabbit_durable_exchange, XName}) of
+ [] -> ok;
+ [_] -> ok = mnesia:delete({rabbit_durable_exchange, XName})
+ end,
ok = mnesia:delete({rabbit_exchange, XName}),
ok = mnesia:delete({rabbit_exchange_serial, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 80b4e768..05aad8c9 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -59,21 +59,15 @@ start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
- heartbeater(
- {Sock, TimeoutSec * 1000 div 2, send_oct, 0,
- fun () ->
- SendFun(),
- continue
- end}).
+ heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0,
+ fun () -> SendFun(), continue end}).
start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
- heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () ->
- ReceiveFun(),
- stop
- end}).
+ heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
+ fun () -> ReceiveFun(), stop end}).
start_heartbeat_fun(SupPid) ->
fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
@@ -88,17 +82,11 @@ start_heartbeat_fun(SupPid) ->
{Sender, Receiver}
end.
-pause_monitor({_Sender, none}) ->
- ok;
-pause_monitor({_Sender, Receiver}) ->
- Receiver ! pause,
- ok.
+pause_monitor({_Sender, none}) -> ok;
+pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok.
-resume_monitor({_Sender, none}) ->
- ok;
-resume_monitor({_Sender, Receiver}) ->
- Receiver ! resume,
- ok.
+resume_monitor({_Sender, none}) -> ok;
+resume_monitor({_Sender, Receiver}) -> Receiver ! resume, ok.
%%----------------------------------------------------------------------------
start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) ->
@@ -106,8 +94,7 @@ start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) ->
start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) ->
supervisor2:start_child(
SupPid, {Name,
- {rabbit_heartbeat, Callback,
- [Sock, TimeoutSec, TimeoutFun]},
+ {rabbit_heartbeat, Callback, [Sock, TimeoutSec, TimeoutFun]},
transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}).
heartbeater(Params) ->
@@ -117,15 +104,11 @@ heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
{StatVal, SameCount}) ->
Recurse = fun (V) -> heartbeater(Params, V) end,
receive
- pause ->
- receive
- resume ->
- Recurse({0, 0});
- Other ->
- exit({unexpected_message, Other})
- end;
- Other ->
- exit({unexpected_message, Other})
+ pause -> receive
+ resume -> Recurse({0, 0});
+ Other -> exit({unexpected_message, Other})
+ end;
+ Other -> exit({unexpected_message, Other})
after TimeoutMillisec ->
case rabbit_net:getstat(Sock, [StatName]) of
{ok, [{StatName, NewStatVal}]} ->
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 10debb0b..4455b441 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -132,25 +132,31 @@
%% gm should be processed as normal, but fetches which are for
%% messages the slave has never seen should be ignored. Similarly,
%% acks for messages the slave never fetched should be
-%% ignored. Eventually, as the master is consumed from, the messages
-%% at the head of the queue which were there before the slave joined
-%% will disappear, and the slave will become fully synced with the
-%% state of the master. The detection of the sync-status of a slave is
-%% done entirely based on length: if the slave and the master both
-%% agree on the length of the queue after the fetch of the head of the
-%% queue (or a 'set_length' results in a slave having to drop some
-%% messages from the head of its queue), then the queues must be in
-%% sync. The only other possibility is that the slave's queue is
-%% shorter, and thus the fetch should be ignored. In case slaves are
-%% joined to an empty queue which only goes on to receive publishes,
-%% they start by asking the master to broadcast its length. This is
-%% enough for slaves to always be able to work out when their head
-%% does not differ from the master (and is much simpler and cheaper
-%% than getting the master to hang on to the guid of the msg at the
-%% head of its queue). When a slave is promoted to a master, it
-%% unilaterally broadcasts its length, in order to solve the problem
-%% of length requests from new slaves being unanswered by a dead
-%% master.
+%% ignored. Similarly, we don't republish rejected messages that we
+%% haven't seen. Eventually, as the master is consumed from, the
+%% messages at the head of the queue which were there before the slave
+%% joined will disappear, and the slave will become fully synced with
+%% the state of the master.
+%%
+%% The detection of the sync-status is based on the depth of the BQs,
+%% where the depth is defined as the sum of the length of the BQ (as
+%% per BQ:len) and the messages pending an acknowledgement. When the
+%% depth of the slave is equal to the master's, then the slave is
+%% synchronised. We only store the difference between the two for
+%% simplicity. Comparing the length is not enough since we need to
+%% take into account rejected messages which will make it back into
+%% the master queue but can't go back in the slave, since we don't
+%% want "holes" in the slave queue. Note that the depth, and the
+%% length likewise, must always be shorter on the slave - we assert
+%% that in various places. In case slaves are joined to an empty queue
+%% which only goes on to receive publishes, they start by asking the
+%% master to broadcast its depth. This is enough for slaves to always
+%% be able to work out when their head does not differ from the master
+%% (and is much simpler and cheaper than getting the master to hang on
+%% to the guid of the msg at the head of its queue). When a slave is
+%% promoted to a master, it unilaterally broadcasts its length, in
+%% order to solve the problem of length requests from new slaves being
+%% unanswered by a dead master.
%%
%% Obviously, due to the async nature of communication across gm, the
%% slaves can fall behind. This does not matter from a sync pov: if
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 750bcd56..c11a8ff7 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,8 +18,8 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
- requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3,
- set_ram_duration_target/2, ram_duration/1,
+ requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
+ dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3, fold/3]).
@@ -96,7 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
- ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
+ ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -127,10 +127,13 @@ terminate(Reason,
delete_and_terminate(Reason, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()],
+ Info = gm:info(GM),
+ Slaves = [Pid || Pid <- proplists:get_value(group_members, Info),
+ node(Pid) =/= node()],
MRefs = [erlang:monitor(process, S) || S <- Slaves],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
monitor_wait(MRefs),
+ ok = gm:forget_group(proplists:get_value(group_name, Info)),
State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
set_delivered = 0 }.
@@ -145,7 +148,7 @@ monitor_wait([MRef | MRefs]) ->
purge(State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {set_length, 0, false}),
+ ok = gm:broadcast(GM, {drop, 0, BQ:len(BQS), false}),
{Count, BQS1} = BQ:purge(BQS),
{Count, State #state { backing_queue_state = BQS1,
set_delivered = 0 }}.
@@ -185,13 +188,13 @@ dropwhile(Pred, AckRequired,
set_delivered = SetDelivered,
backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- {Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
+ {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
Len1 = BQ:len(BQS1),
- ok = gm:broadcast(GM, {set_length, Len1, AckRequired}),
Dropped = Len - Len1,
+ ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired}),
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- {Msgs, State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1 } }.
+ {Next, Msgs, State #state { backing_queue_state = BQS1,
+ set_delivered = SetDelivered1 } }.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -274,6 +277,9 @@ len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
is_empty(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:is_empty(BQS).
+depth(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:depth(BQS).
+
set_ram_duration_target(Target, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state =
@@ -372,7 +378,7 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
Len = BQ:len(BQS),
- ok = gm:broadcast(GM, {length, Len}),
+ ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -403,7 +409,7 @@ length_fun() ->
fun (?MODULE, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
+ ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
State
end)
end.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 29e2d29f..89e334dd 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -64,9 +64,7 @@ remove_from_queue(QueueName, DeadPids) ->
slave_pids = SPids }] ->
[QPid1 | SPids1] = Alive =
[Pid || Pid <- [QPid | SPids],
- not lists:member(node(Pid),
- DeadNodes) orelse
- rabbit_misc:is_process_alive(Pid)],
+ not lists:member(node(Pid), DeadNodes)],
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
{ok, QPid1, []};
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index e4d78c45..3e45f026 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -19,17 +19,8 @@
%% For general documentation of HA design, see
%% rabbit_mirror_queue_coordinator
%%
-%% We join the GM group before we add ourselves to the amqqueue
-%% record. As a result:
-%% 1. We can receive msgs from GM that correspond to messages we will
-%% never receive from publishers.
-%% 2. When we receive a message from publishers, we must receive a
-%% message from the GM group for it.
-%% 3. However, that instruction from the GM group can arrive either
-%% before or after the actual message. We need to be able to
-%% distinguish between GM instructions arriving early, and case (1)
-%% above.
-%%
+%% We receive messages from GM and from publishers, and the gm
+%% messages can arrive either before or after the 'actual' message.
%% All instructions from the GM group must be processed in the order
%% in which they're received.
@@ -86,31 +77,37 @@
msg_id_status,
known_senders,
- synchronised
+ %% Master depth - local depth
+ depth_delta
}).
-start_link(Q) ->
- gen_server2:start_link(?MODULE, Q, []).
+start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
-info(QPid) ->
- gen_server2:call(QPid, info, infinity).
+info(QPid) -> gen_server2:call(QPid, info, infinity).
init(#amqqueue { name = QueueName } = Q) ->
+ %% We join the GM group before we add ourselves to the amqqueue
+ %% record. As a result:
+ %% 1. We can receive msgs from GM that correspond to messages we will
+ %% never receive from publishers.
+ %% 2. When we receive a message from publishers, we must receive a
+ %% message from the GM group for it.
+ %% 3. However, that instruction from the GM group can arrive either
+ %% before or after the actual message. We need to be able to
+ %% distinguish between GM instructions arriving early, and case (1)
+ %% above.
+ %%
+ process_flag(trap_exit, true), %% amqqueue_process traps exits too.
+ {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ receive {joined, GM} -> ok end,
Self = self(),
Node = node(),
- case rabbit_misc:execute_mnesia_transaction(fun() ->
- init_it(Self, Node,
- QueueName)
- end) of
+ case rabbit_misc:execute_mnesia_transaction(
+ fun() -> init_it(Self, Node, QueueName) end) of
{new, MPid} ->
- process_flag(trap_exit, true), %% amqqueue_process traps exits too.
- {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
- receive {joined, GM} ->
- ok
- end,
erlang:monitor(process, MPid),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [Self]),
@@ -133,7 +130,7 @@ init(#amqqueue { name = QueueName } = Q) ->
msg_id_status = dict:new(),
known_senders = pmon:new(),
- synchronised = false
+ depth_delta = undefined
},
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
@@ -153,24 +150,21 @@ init_it(Self, Node, QueueName) ->
[Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
mnesia:read({rabbit_queue, QueueName}),
case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] ->
- MPids1 = MPids ++ [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{slave_pids = MPids1}),
- {new, QPid};
- [QPid] ->
- case rabbit_misc:is_process_alive(QPid) of
- true -> duplicate_live_master;
- false -> {stale, QPid}
- end;
- [SPid] ->
- case rabbit_misc:is_process_alive(SPid) of
- true -> existing;
- false -> MPids1 = (MPids -- [SPid]) ++ [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{slave_pids = MPids1}),
- {new, QPid}
- end
+ [] -> MPids1 = MPids ++ [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{slave_pids = MPids1}),
+ {new, QPid};
+ [QPid] -> case rabbit_misc:is_process_alive(QPid) of
+ true -> duplicate_live_master;
+ false -> {stale, QPid}
+ end;
+ [SPid] -> case rabbit_misc:is_process_alive(SPid) of
+ true -> existing;
+ false -> MPids1 = (MPids -- [SPid]) ++ [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{slave_pids = MPids1}),
+ {new, QPid}
+ end
end.
handle_call({deliver, Delivery = #delivery { immediate = true }},
@@ -356,14 +350,10 @@ prioritise_info(Msg, _State) ->
%% GM
%% ---------------------------------------------------------------------------
-joined([SPid], _Members) ->
- SPid ! {joined, self()},
- ok.
+joined([SPid], _Members) -> SPid ! {joined, self()}, ok.
-members_changed([_SPid], _Births, []) ->
- ok;
-members_changed([SPid], _Births, Deaths) ->
- inform_deaths(SPid, Deaths).
+members_changed([_SPid], _Births, []) -> ok;
+members_changed([ SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths).
handle_msg([_SPid], _From, master_changed) ->
ok;
@@ -396,7 +386,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _State) -> self();
i(name, #state { q = #amqqueue { name = Name } }) -> Name;
i(master_pid, #state { master_pid = MPid }) -> MPid;
-i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised;
+i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0;
i(Item, _State) -> throw({bad_argument, Item}).
bq_init(BQ, Q, Recover) ->
@@ -584,9 +574,9 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
confirm_messages(MsgIds, State #state {
backing_queue_state = BQS1 })),
case BQ:needs_timeout(BQS1) of
- false -> {stop_sync_timer(State1), hibernate};
- idle -> {stop_sync_timer(State1), 0 };
- timed -> {ensure_sync_timer(State1), 0 }
+ false -> {stop_sync_timer(State1), hibernate };
+ idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
+ timed -> {ensure_sync_timer(State1), 0 }
end.
backing_queue_timeout(State = #state { backing_queue = BQ }) ->
@@ -680,26 +670,24 @@ maybe_enqueue_message(
%% msg_seq_no was at the time. We do now!
ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { sender_queues = SQ1,
- msg_id_status = dict:erase(MsgId, MS) };
+ State1 #state { msg_id_status = dict:erase(MsgId, MS),
+ sender_queues = SQ1 };
{ok, {published, ChPid}} ->
%% It was published to the BQ and we didn't know the
%% msg_seq_no so couldn't confirm it at the time.
- case needs_confirming(Delivery, State1) of
- never ->
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
- sender_queues = SQ1 };
- eventually ->
- State1 #state {
- msg_id_status =
- dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) };
- immediately ->
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
- sender_queues = SQ1 }
- end;
+ {MS1, SQ1} =
+ case needs_confirming(Delivery, State1) of
+ never -> {dict:erase(MsgId, MS),
+ remove_from_pending_ch(MsgId, ChPid, SQ)};
+ eventually -> MMS = {published, ChPid, MsgSeqNo},
+ {dict:store(MsgId, MMS, MS), SQ};
+ immediately -> ok = rabbit_misc:confirm_to_sender(
+ ChPid, [MsgSeqNo]),
+ {dict:erase(MsgId, MS),
+ remove_from_pending_ch(MsgId, ChPid, SQ)}
+ end,
+ State1 #state { msg_id_status = MS1,
+ sender_queues = SQ1 };
{ok, discarded} ->
%% We've already heard from GM that the msg is to be
%% discarded. We won't see this again.
@@ -748,18 +736,17 @@ process_instruction(
msg_seq_no = MsgSeqNo,
message = #basic_message { id = MsgId } },
_EnqueueOnPromotion}}, MQ2} ->
- %% We received the msg from the channel first. Thus we
- %% need to deal with confirms here.
- case needs_confirming(Delivery, State1) of
- never ->
- {MQ2, PendingCh, MS};
- eventually ->
- {MQ2, PendingCh,
- dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)};
- immediately ->
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
- {MQ2, PendingCh, MS}
- end;
+ {MQ2, PendingCh,
+ %% We received the msg from the channel first. Thus
+ %% we need to deal with confirms here.
+ case needs_confirming(Delivery, State1) of
+ never -> MS;
+ eventually -> MMS = {published, ChPid, MsgSeqNo},
+ dict:store(MsgId, MMS , MS);
+ immediately -> ok = rabbit_misc:confirm_to_sender(
+ ChPid, [MsgSeqNo]),
+ MS
+ end};
{{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}
@@ -814,43 +801,45 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
{ok, State1 #state { sender_queues = SQ1,
msg_id_status = MS1,
backing_queue_state = BQS1 }};
-process_instruction({set_length, Length, AckRequired},
+process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
- ToDrop = QLen - Length,
- {ok,
- case ToDrop >= 0 of
- true ->
- State1 =
- lists:foldl(
- fun (const, StateN = #state {backing_queue_state = BQSN}) ->
- {{#basic_message{id = MsgId}, _IsDelivered, AckTag,
- _Remaining}, BQSN1} = BQ:fetch(AckRequired, BQSN),
- maybe_store_ack(
- AckRequired, MsgId, AckTag,
- StateN #state { backing_queue_state = BQSN1 })
- end, State, lists:duplicate(ToDrop, const)),
- set_synchronised(true, State1);
- false ->
- State
- end};
+ ToDrop = case QLen - Length of
+ N when N > 0 -> N;
+ _ -> 0
+ end,
+ State1 = lists:foldl(
+ fun (const, StateN = #state{backing_queue_state = BQSN}) ->
+ {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} =
+ BQ:fetch(AckRequired, BQSN),
+ maybe_store_ack(
+ AckRequired, MsgId, AckTag,
+ StateN #state { backing_queue_state = BQSN1 })
+ end, State, lists:duplicate(ToDrop, const)),
+ {ok, case AckRequired of
+ true -> State1;
+ false -> set_synchronised(ToDrop - Dropped, State1)
+ end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
- {ok, case QLen - 1 of
- Remaining ->
- {{#basic_message{id = MsgId}, _IsDelivered,
- AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State #state { backing_queue_state = BQS1 });
- Other when Other + 1 =:= Remaining ->
- set_synchronised(true, State);
- Other when Other < Remaining ->
- %% we must be shorter than the master
- State
- end};
+ {State1, Delta} =
+ case QLen - 1 of
+ Remaining ->
+ {{#basic_message{id = MsgId}, _IsDelivered,
+ AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
+ {maybe_store_ack(AckRequired, MsgId, AckTag,
+ State #state { backing_queue_state = BQS1 }),
+ 0};
+ _ when QLen =< Remaining ->
+ {State, case AckRequired of
+ true -> 0;
+ false -> -1
+ end}
+ end,
+ {ok, set_synchronised(Delta, State1)};
process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -858,27 +847,17 @@ process_instruction({ack, MsgIds},
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
- {ok, State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 }};
+ {ok, set_synchronised(length(MsgIds1) - length(MsgIds),
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 })};
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
- {ok, case length(AckTags) =:= length(MsgIds) of
- true ->
- {MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 };
- false ->
- %% The only thing we can safely do is nuke out our BQ
- %% and MA. The interaction between this and confirms
- %% doesn't really bear thinking about...
- {_Count, BQS1} = BQ:purge(BQS),
- {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1),
- State #state { msg_id_ack = dict:new(),
- backing_queue_state = BQS2 }
- end};
+ {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ {ok, State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 }};
process_instruction({sender_death, ChPid},
State = #state { sender_queues = SQ,
msg_id_status = MS,
@@ -896,10 +875,11 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = pmon:demonitor(ChPid, KS) }
end};
-process_instruction({length, Length},
- State = #state { backing_queue = BQ,
+process_instruction({depth, Depth},
+ State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- {ok, set_synchronised(Length =:= BQ:len(BQS), State)};
+ {ok, set_synchronised(
+ 0, true, State #state { depth_delta = Depth - BQ:depth(BQS) })};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -918,9 +898,6 @@ msg_ids_to_acktags(MsgIds, MA) ->
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.
-ack_all(BQ, MA, BQS) ->
- BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS).
-
maybe_store_ack(false, _MsgId, _AckTag, State) ->
State;
maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
@@ -928,23 +905,38 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
-%% We intentionally leave out the head where a slave becomes
-%% unsynchronised: we assert that can never happen.
-set_synchronised(true, State = #state { q = #amqqueue { name = QName },
- synchronised = false }) ->
- Self = self(),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_queue, QName}) of
- [] ->
- ok;
- [Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
- Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]},
- rabbit_mirror_queue_misc:store_updated_slaves(Q2)
- end
- end),
- State #state { synchronised = true };
-set_synchronised(true, State) ->
+set_synchronised(Delta, State) ->
+ set_synchronised(Delta, false, State).
+
+set_synchronised(_Delta, _AddAnyway,
+ State = #state { depth_delta = undefined }) ->
State;
-set_synchronised(false, State = #state { synchronised = false }) ->
- State.
+set_synchronised(Delta, AddAnyway,
+ State = #state { depth_delta = DepthDelta,
+ q = #amqqueue { name = QName }}) ->
+ DepthDelta1 = DepthDelta + Delta,
+ %% We intentionally leave out the head where a slave becomes
+ %% unsynchronised: we assert that can never happen.
+ %% The `AddAnyway' param is there since in the `depth' instruction we
+ %% receive the master depth for the first time, and we want to set the sync
+ %% state anyway if we are synced.
+ case DepthDelta1 =:= 0 of
+ true when not (DepthDelta =:= 0) orelse AddAnyway ->
+ Self = self(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
+ %% We might be there already, in the `AddAnyway'
+ %% case
+ SSPids1 = SSPids -- [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{sync_slave_pids = [Self | SSPids1]})
+ end
+ end);
+ _ when DepthDelta1 >= 0 ->
+ ok
+ end,
+ State #state { depth_delta = DepthDelta1 }.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 8f6a9bcf..a0536a50 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -60,6 +60,9 @@
-export([multi_call/2]).
-export([os_cmd/1]).
-export([gb_sets_difference/2]).
+-export([version/0]).
+-export([sequence_error/1]).
+-export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -217,6 +220,13 @@
([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
-spec(os_cmd/1 :: (string()) -> string()).
-spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()).
+-spec(version/0 :: () -> string()).
+-spec(sequence_error/1 :: ([({'error', any()} | any())])
+ -> {'error', any()} | any()).
+-spec(json_encode/1 :: (any()) -> {'ok', string()} | {'error', any()}).
+-spec(json_decode/1 :: (string()) -> {'ok', any()} | 'error').
+-spec(json_to_term/1 :: (any()) -> any()).
+-spec(term_to_json/1 :: (any()) -> any()).
-endif.
@@ -934,3 +944,46 @@ os_cmd(Command) ->
gb_sets_difference(S1, S2) ->
gb_sets:fold(fun gb_sets:delete_any/2, S1, S2).
+
+version() ->
+ {ok, VSN} = application:get_key(rabbit, vsn),
+ VSN.
+
+sequence_error([T]) -> T;
+sequence_error([{error, _} = Error | _]) -> Error;
+sequence_error([_ | Rest]) -> sequence_error(Rest).
+
+json_encode(Term) ->
+ try
+ {ok, mochijson2:encode(Term)}
+ catch
+ exit:{json_encode, E} ->
+ {error, E}
+ end.
+
+json_decode(Term) ->
+ try
+ {ok, mochijson2:decode(Term)}
+ catch
+ %% Sadly `mochijson2:decode/1' does not offer a nice way to catch
+ %% decoding errors...
+ error:_ -> error
+ end.
+
+json_to_term({struct, L}) ->
+ [{K, json_to_term(V)} || {K, V} <- L];
+json_to_term(L) when is_list(L) ->
+ [json_to_term(I) || I <- L];
+json_to_term(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
+ V =:= true orelse V =:= false ->
+ V.
+
+%% This has the flaw that empty lists will never be JSON objects, so use with
+%% care.
+term_to_json([{_, _}|_] = L) ->
+ {struct, [{K, term_to_json(V)} || {K, V} <- L]};
+term_to_json(L) when is_list(L) ->
+ [term_to_json(I) || I <- L];
+term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
+ V =:= true orelse V =:= false ->
+ V.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 5a971246..f19046a0 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -17,16 +17,42 @@
-module(rabbit_mnesia).
--export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
- cluster/1, force_cluster/1, reset/0, force_reset/0, init_db/3,
- is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0,
- empty_ram_only_tables/0, copy_db/1, wait_for_tables/1,
- create_cluster_nodes_config/1, read_cluster_nodes_config/0,
- record_running_nodes/0, read_previously_running_nodes/0,
- running_nodes_filename/0, is_disc_node/0, on_node_down/1,
- on_node_up/1]).
-
--export([table_names/0]).
+-export([init/0,
+ join_cluster/2,
+ reset/0,
+ force_reset/0,
+ update_cluster_nodes/1,
+ change_cluster_node_type/1,
+ forget_cluster_node/2,
+
+ status/0,
+ is_db_empty/0,
+ is_clustered/0,
+ all_clustered_nodes/0,
+ clustered_disc_nodes/0,
+ running_clustered_nodes/0,
+ is_disc_node/0,
+ dir/0,
+ table_names/0,
+ wait_for_tables/1,
+ cluster_status_from_mnesia/0,
+
+ init_db/3,
+ empty_ram_only_tables/0,
+ copy_db/1,
+ wait_for_tables/0,
+ check_cluster_consistency/0,
+ ensure_mnesia_dir/0,
+
+ on_node_up/1,
+ on_node_down/1
+ ]).
+
+%% Used internally in rpc calls
+-export([node_info/0,
+ remove_node_if_mnesia_running/1,
+ is_running_remote/0
+ ]).
%% create_tables/0 exported for helping embed RabbitMQ in or alongside
%% other mnesia-using Erlang applications, such as ejabberd
@@ -38,147 +64,138 @@
-ifdef(use_specs).
--export_type([node_type/0]).
+-export_type([node_type/0, cluster_status/0]).
--type(node_type() :: disc_only | disc | ram | unknown).
--spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
- {'running_nodes', [node()]}]).
--spec(dir/0 :: () -> file:filename()).
--spec(ensure_mnesia_dir/0 :: () -> 'ok').
+-type(node_type() :: disc | ram).
+-type(cluster_status() :: {ordsets:ordset(node()), ordsets:ordset(node()),
+ ordsets:ordset(node())}).
+
+%% Main interface
-spec(init/0 :: () -> 'ok').
--spec(init_db/3 :: ([node()], boolean(), rabbit_misc:thunk('ok')) -> 'ok').
--spec(is_db_empty/0 :: () -> boolean()).
--spec(cluster/1 :: ([node()]) -> 'ok').
--spec(force_cluster/1 :: ([node()]) -> 'ok').
--spec(cluster/2 :: ([node()], boolean()) -> 'ok').
+-spec(join_cluster/2 :: ([node()], boolean()) -> 'ok').
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
+-spec(update_cluster_nodes/1 :: (node()) -> 'ok').
+-spec(change_cluster_node_type/1 :: (node_type()) -> 'ok').
+-spec(forget_cluster_node/2 :: (node(), boolean()) -> 'ok').
+
+%% Various queries to get the status of the db
+-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
+ {'running_nodes', [node()]}]).
+-spec(is_db_empty/0 :: () -> boolean()).
-spec(is_clustered/0 :: () -> boolean()).
--spec(running_clustered_nodes/0 :: () -> [node()]).
-spec(all_clustered_nodes/0 :: () -> [node()]).
+-spec(clustered_disc_nodes/0 :: () -> [node()]).
+-spec(running_clustered_nodes/0 :: () -> [node()]).
+-spec(is_disc_node/0 :: () -> boolean()).
+-spec(dir/0 :: () -> file:filename()).
+-spec(table_names/0 :: () -> [atom()]).
+-spec(cluster_status_from_mnesia/0 :: () -> {'ok', cluster_status()} |
+ {'error', any()}).
+
+%% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit'
+-spec(init_db/3 :: ([node()], boolean(), boolean()) -> 'ok').
-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
-spec(wait_for_tables/1 :: ([atom()]) -> 'ok').
--spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok').
--spec(read_cluster_nodes_config/0 :: () -> [node()]).
--spec(record_running_nodes/0 :: () -> 'ok').
--spec(read_previously_running_nodes/0 :: () -> [node()]).
--spec(running_nodes_filename/0 :: () -> file:filename()).
--spec(is_disc_node/0 :: () -> boolean()).
+-spec(check_cluster_consistency/0 :: () -> 'ok').
+-spec(ensure_mnesia_dir/0 :: () -> 'ok').
+
+%% Hooks used in `rabbit_node_monitor'
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
--spec(table_names/0 :: () -> [atom()]).
+%% Functions used in internal rpc calls
+-spec(node_info/0 :: () -> {string(), string(),
+ ({'ok', cluster_status()} | 'error')}).
+-spec(remove_node_if_mnesia_running/1 :: (node()) -> 'ok' |
+ {'error', term()}).
-endif.
%%----------------------------------------------------------------------------
-
-status() ->
- [{nodes, case mnesia:system_info(is_running) of
- yes -> [{Key, Nodes} ||
- {Key, CopyType} <- [{disc_only, disc_only_copies},
- {disc, disc_copies},
- {ram, ram_copies}],
- begin
- Nodes = nodes_of_type(CopyType),
- Nodes =/= []
- end];
- no -> case all_clustered_nodes() of
- [] -> [];
- Nodes -> [{unknown, Nodes}]
- end;
- Reason when Reason =:= starting; Reason =:= stopping ->
- exit({rabbit_busy, try_again_later})
- end},
- {running_nodes, running_clustered_nodes()}].
+%% Main interface
+%%----------------------------------------------------------------------------
init() ->
ensure_mnesia_running(),
ensure_mnesia_dir(),
- Nodes = read_cluster_nodes_config(),
- ok = init_db(Nodes, should_be_disc_node(Nodes)),
+ case is_virgin_node() of
+ true -> init_from_config();
+ false -> init(is_disc_node(), all_clustered_nodes())
+ end,
%% We intuitively expect the global name server to be synced when
- %% Mnesia is up. In fact that's not guaranteed to be the case - let's
- %% make it so.
+ %% Mnesia is up. In fact that's not guaranteed to be the case -
+ %% let's make it so.
ok = global:sync(),
- ok = delete_previously_running_nodes(),
ok.
-is_db_empty() ->
- lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
- table_names()).
+init(WantDiscNode, AllNodes) ->
+ init_db_and_upgrade(AllNodes, WantDiscNode, WantDiscNode).
+
+init_from_config() ->
+ {ok, {TryNodes, WantDiscNode}} =
+ application:get_env(rabbit, cluster_nodes),
+ case find_good_node(TryNodes -- [node()]) of
+ {ok, Node} ->
+ rabbit_log:info("Node '~p' selected for clustering from "
+ "configuration~n", [Node]),
+ {ok, {_, DiscNodes, _}} = discover_cluster(Node),
+ init_db_and_upgrade(DiscNodes, WantDiscNode, false),
+ rabbit_node_monitor:notify_joined_cluster();
+ none ->
+ rabbit_log:warning("Could not find any suitable node amongst the "
+ "ones provided in the configuration: ~p~n",
+ [TryNodes]),
+ init(true, [node()])
+ end.
+
+%% Make the node join a cluster. The node will be reset automatically
+%% before we actually cluster it. The nodes provided will be used to
+%% find out about the nodes in the cluster.
+%%
+%% This function will fail if:
+%%
+%% * The node is currently the only disc node of its cluster
+%% * We can't connect to any of the nodes provided
+%% * The node is currently already clustered with the cluster of the nodes
+%% provided
+%%
+%% Note that we make no attempt to verify that the nodes provided are
+%% all in the same cluster, we simply pick the first online node and
+%% we cluster to its cluster.
+join_cluster(DiscoveryNode, WantDiscNode) ->
+ case is_disc_and_clustered() andalso [node()] =:= clustered_disc_nodes() of
+ true -> e(clustering_only_disc_node);
+ _ -> ok
+ end,
-cluster(ClusterNodes) ->
- cluster(ClusterNodes, false).
-force_cluster(ClusterNodes) ->
- cluster(ClusterNodes, true).
-
-%% Alter which disk nodes this node is clustered with. This can be a
-%% subset of all the disk nodes in the cluster but can (and should)
-%% include the node itself if it is to be a disk rather than a ram
-%% node. If Force is false, only connections to online nodes are
-%% allowed.
-cluster(ClusterNodes, Force) ->
- rabbit_misc:local_info_msg("Clustering with ~p~s~n",
- [ClusterNodes, if Force -> " forcefully";
- true -> ""
- end]),
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
- case not Force andalso is_clustered() andalso
- is_only_disc_node(node(), false) andalso
- not should_be_disc_node(ClusterNodes)
- of
- true -> log_both("last running disc node leaving cluster");
- _ -> ok
- end,
+ {ClusterNodes, _, _} = case discover_cluster(DiscoveryNode) of
+ {ok, Res} -> Res;
+ E = {error, _} -> throw(E)
+ end,
- %% Wipe mnesia if we're changing type from disc to ram
- case {is_disc_node(), should_be_disc_node(ClusterNodes)} of
- {true, false} -> rabbit_misc:with_local_io(
- fun () -> error_logger:warning_msg(
- "changing node type; wiping "
- "mnesia...~n~n")
- end),
- rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
- cannot_delete_schema);
- _ -> ok
+ case lists:member(node(), ClusterNodes) of
+ true -> e(already_clustered);
+ false -> ok
end,
- %% Pre-emptively leave the cluster
- %%
- %% We're trying to handle the following two cases:
- %% 1. We have a two-node cluster, where both nodes are disc nodes.
- %% One node is re-clustered as a ram node. When it tries to
- %% re-join the cluster, but before it has time to update its
- %% tables definitions, the other node will order it to re-create
- %% its disc tables. So, we need to leave the cluster before we
- %% can join it again.
- %% 2. We have a two-node cluster, where both nodes are disc nodes.
- %% One node is forcefully reset (so, the other node thinks its
- %% still a part of the cluster). The reset node is re-clustered
- %% as a ram node. Same as above, we need to leave the cluster
- %% before we can join it. But, since we don't know if we're in a
- %% cluster or not, we just pre-emptively leave it before joining.
- ProperClusterNodes = ClusterNodes -- [node()],
- try
- ok = leave_cluster(ProperClusterNodes, ProperClusterNodes)
- catch
- {error, {no_running_cluster_nodes, _, _}} when Force ->
- ok
- end,
+ %% reset the node. this simplifies things and it will be needed in
+ %% this case - we're joining a new cluster with new nodes which
+ %% are not in synch with the current node. I also lifts the burden
+ %% of reseting the node from the user.
+ reset(false),
+
+ rabbit_misc:local_info_msg("Clustering with ~p~n", [ClusterNodes]),
%% Join the cluster
- start_mnesia(),
- try
- ok = init_db(ClusterNodes, Force),
- ok = create_cluster_nodes_config(ClusterNodes)
- after
- stop_mnesia()
- end,
+ ok = init_db_with_mnesia(ClusterNodes, WantDiscNode, false),
+
+ rabbit_node_monitor:notify_joined_cluster(),
ok.
@@ -188,15 +205,399 @@ cluster(ClusterNodes, Force) ->
reset() -> reset(false).
force_reset() -> reset(true).
+reset(Force) ->
+ rabbit_misc:local_info_msg("Resetting Rabbit~s~n",
+ [if Force -> " forcefully";
+ true -> ""
+ end]),
+ ensure_mnesia_not_running(),
+ Node = node(),
+ case Force of
+ true ->
+ disconnect_nodes(nodes());
+ false ->
+ AllNodes = all_clustered_nodes(),
+ %% Reconnecting so that we will get an up to date nodes.
+ %% We don't need to check for consistency because we are
+ %% resetting. Force=true here so that reset still works
+ %% when clustered with a node which is down.
+ init_db_with_mnesia(AllNodes, is_disc_node(), false, true),
+ case is_disc_and_clustered() andalso
+ [node()] =:= clustered_disc_nodes()
+ of
+ true -> e(resetting_only_disc_node);
+ false -> ok
+ end,
+ leave_cluster(),
+ rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
+ cannot_delete_schema),
+ disconnect_nodes(all_clustered_nodes()),
+ ok
+ end,
+ %% remove persisted messages and any other garbage we find
+ ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
+ ok = rabbit_node_monitor:reset_cluster_status(),
+ ok.
+
+%% We need to make sure that we don't end up in a distributed Erlang
+%% system with nodes while not being in an Mnesia cluster with
+%% them. We don't handle that well.
+disconnect_nodes(Nodes) -> [erlang:disconnect_node(N) || N <- Nodes].
+
+change_cluster_node_type(Type) ->
+ ensure_mnesia_dir(),
+ ensure_mnesia_not_running(),
+ case is_clustered() of
+ false -> e(not_clustered);
+ true -> ok
+ end,
+ {_, _, RunningNodes} =
+ case discover_cluster(all_clustered_nodes()) of
+ {ok, Status} -> Status;
+ {error, _Reason} -> e(cannot_connect_to_cluster)
+ end,
+ Node = case RunningNodes of
+ [] -> e(no_online_cluster_nodes);
+ [Node0|_] -> Node0
+ end,
+ ok = reset(false),
+ ok = join_cluster(Node, case Type of
+ ram -> false;
+ disc -> true
+ end).
+
+update_cluster_nodes(DiscoveryNode) ->
+ ensure_mnesia_not_running(),
+ ensure_mnesia_dir(),
+
+ Status = {AllNodes, _, _} =
+ case discover_cluster(DiscoveryNode) of
+ {ok, Status0} -> Status0;
+ {error, _Reason} -> e(cannot_connect_to_node)
+ end,
+ case ordsets:is_element(node(), AllNodes) of
+ true ->
+ %% As in `check_consistency/0', we can safely delete the
+ %% schema here, since it'll be replicated from the other
+ %% nodes
+ mnesia:delete_schema([node()]),
+ rabbit_node_monitor:write_cluster_status(Status),
+ init_db_with_mnesia(AllNodes, is_disc_node(), false);
+ false ->
+ e(inconsistent_cluster)
+ end,
+ ok.
+
+%% We proceed like this: try to remove the node locally. If the node
+%% is offline, we remove the node if:
+%% * This node is a disc node
+%% * All other nodes are offline
+%% * This node was, at the best of our knowledge (see comment below)
+%% the last or second to last after the node we're removing to go
+%% down
+forget_cluster_node(Node, RemoveWhenOffline) ->
+ case ordsets:is_element(Node, all_clustered_nodes()) of
+ true -> ok;
+ false -> e(not_a_cluster_node)
+ end,
+ case {mnesia:system_info(is_running), RemoveWhenOffline} of
+ {yes, true} -> e(online_node_offline_flag);
+ _ -> ok
+ end,
+ case remove_node_if_mnesia_running(Node) of
+ ok ->
+ ok;
+ {error, mnesia_not_running} when RemoveWhenOffline ->
+ remove_node_offline_node(Node);
+ {error, mnesia_not_running} ->
+ e(offline_node_no_offline_flag);
+ Err = {error, _} ->
+ throw(Err)
+ end.
+
+remove_node_offline_node(Node) ->
+ case {ordsets:del_element(Node, running_nodes(all_clustered_nodes())),
+ is_disc_node()} of
+ {[], true} ->
+ %% Note that while we check if the nodes was the last to
+ %% go down, apart from the node we're removing from, this
+ %% is still unsafe. Consider the situation in which A and
+ %% B are clustered. A goes down, and records B as the
+ %% running node. Then B gets clustered with C, C goes down
+ %% and B goes down. In this case, C is the second-to-last,
+ %% but we don't know that and we'll remove B from A
+ %% anyway, even if that will lead to bad things.
+ case ordsets:subtract(running_clustered_nodes(),
+ ordsets:from_list([node(), Node])) of
+ [] -> start_mnesia(),
+ try
+ [mnesia:force_load_table(T) ||
+ T <- rabbit_mnesia:table_names()],
+ forget_cluster_node(Node, false),
+ ensure_mnesia_running()
+ after
+ stop_mnesia()
+ end;
+ _ -> e(not_last_node_to_go_down)
+ end;
+ {_, _} ->
+ e(removing_node_from_offline_node)
+ end.
+
+
+%%----------------------------------------------------------------------------
+%% Queries
+%%----------------------------------------------------------------------------
+
+status() ->
+ IfNonEmpty = fun (_, []) -> [];
+ (Type, Nodes) -> [{Type, Nodes}]
+ end,
+ [{nodes, (IfNonEmpty(disc, clustered_disc_nodes()) ++
+ IfNonEmpty(ram, clustered_ram_nodes()))}] ++
+ case mnesia:system_info(is_running) of
+ yes -> [{running_nodes, running_clustered_nodes()}];
+ no -> []
+ end.
+
+is_db_empty() ->
+ lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
+ table_names()).
+
is_clustered() ->
- RunningNodes = running_clustered_nodes(),
- [node()] /= RunningNodes andalso [] /= RunningNodes.
+ Nodes = all_clustered_nodes(),
+ [node()] =/= Nodes andalso [] =/= Nodes.
+
+is_disc_and_clustered() -> is_disc_node() andalso is_clustered().
+
+%% Functions that retrieve the nodes in the cluster will rely on the
+%% status file if offline.
+
+all_clustered_nodes() -> cluster_status(all).
-all_clustered_nodes() ->
- mnesia:system_info(db_nodes).
+clustered_disc_nodes() -> cluster_status(disc).
-running_clustered_nodes() ->
- mnesia:system_info(running_db_nodes).
+clustered_ram_nodes() -> ordsets:subtract(cluster_status(all),
+ cluster_status(disc)).
+
+running_clustered_nodes() -> cluster_status(running).
+
+running_clustered_disc_nodes() ->
+ {_, DiscNodes, RunningNodes} = cluster_status(),
+ ordsets:intersection(DiscNodes, RunningNodes).
+
+%% This function is the actual source of information, since it gets
+%% the data from mnesia. Obviously it'll work only when mnesia is
+%% running.
+mnesia_nodes() ->
+ case mnesia:system_info(is_running) of
+ no ->
+ {error, mnesia_not_running};
+ yes ->
+ %% If the tables are not present, it means that
+ %% `init_db/3' hasn't been run yet. In other words, either
+ %% we are a virgin node or a restarted RAM node. In both
+ %% cases we're not interested in what mnesia has to say.
+ IsDiscNode = mnesia:system_info(use_dir),
+ Tables = mnesia:system_info(tables),
+ {Table, _} = case table_definitions(case IsDiscNode of
+ true -> disc;
+ false -> ram
+ end) of [T|_] -> T end,
+ case lists:member(Table, Tables) of
+ true ->
+ AllNodes =
+ ordsets:from_list(mnesia:system_info(db_nodes)),
+ DiscCopies = ordsets:from_list(
+ mnesia:table_info(schema, disc_copies)),
+ DiscNodes =
+ case IsDiscNode of
+ true -> ordsets:add_element(node(), DiscCopies);
+ false -> DiscCopies
+ end,
+ {ok, {AllNodes, DiscNodes}};
+ false ->
+ {error, tables_not_present}
+ end
+ end.
+
+cluster_status(WhichNodes, ForceMnesia) ->
+ %% I don't want to call `running_nodes/1' unless if necessary,
+ %% since it can deadlock when stopping applications.
+ Nodes = case mnesia_nodes() of
+ {ok, {AllNodes, DiscNodes}} ->
+ {ok, {AllNodes, DiscNodes,
+ fun() -> running_nodes(AllNodes) end}};
+ {error, _Reason} when not ForceMnesia ->
+ {AllNodes, DiscNodes, RunningNodes} =
+ rabbit_node_monitor:read_cluster_status(),
+ %% The cluster status file records the status when
+ %% the node is online, but we know for sure that
+ %% the node is offline now, so we can remove it
+ %% from the list of running nodes.
+ {ok,
+ {AllNodes, DiscNodes,
+ fun() -> ordsets:del_element(node(), RunningNodes) end}};
+ Err = {error, _} ->
+ Err
+ end,
+ case Nodes of
+ {ok, {AllNodes1, DiscNodes1, RunningNodesThunk}} ->
+ {ok, case WhichNodes of
+ status -> {AllNodes1, DiscNodes1, RunningNodesThunk()};
+ all -> AllNodes1;
+ disc -> DiscNodes1;
+ running -> RunningNodesThunk()
+ end};
+ Err1 = {error, _} ->
+ Err1
+ end.
+
+cluster_status(WhichNodes) ->
+ {ok, Status} = cluster_status(WhichNodes, false),
+ Status.
+
+cluster_status() -> cluster_status(status).
+
+cluster_status_from_mnesia() -> cluster_status(status, true).
+
+node_info() ->
+ {erlang:system_info(otp_release), rabbit_misc:version(),
+ cluster_status_from_mnesia()}.
+
+is_disc_node() ->
+ DiscNodes = clustered_disc_nodes(),
+ DiscNodes =:= [] orelse ordsets:is_element(node(), DiscNodes).
+
+dir() -> mnesia:system_info(directory).
+
+table_names() -> [Tab || {Tab, _} <- table_definitions()].
+
+%%----------------------------------------------------------------------------
+%% Operations on the db
+%%----------------------------------------------------------------------------
+
+%% Adds the provided nodes to the mnesia cluster, creating a new
+%% schema if there is the need to and catching up if there are other
+%% nodes in the cluster already. It also updates the cluster status
+%% file.
+init_db(ClusterNodes, WantDiscNode, Force) ->
+ Nodes = change_extra_db_nodes(ClusterNodes, Force),
+ %% Note that we use `system_info' here and not the cluster status
+ %% since when we start rabbit for the first time the cluster
+ %% status will say we are a disc node but the tables won't be
+ %% present yet.
+ WasDiscNode = mnesia:system_info(use_dir),
+ case {Nodes, WasDiscNode, WantDiscNode} of
+ {[], _, false} ->
+ %% Standalone ram node, we don't want that
+ throw({error, cannot_create_standalone_ram_node});
+ {[], false, true} ->
+ %% RAM -> disc, starting from scratch
+ ok = create_schema();
+ {[], true, true} ->
+ %% First disc node up
+ ok;
+ {[AnotherNode | _], _, _} ->
+ %% Subsequent node in cluster, catch up
+ ensure_version_ok(
+ rpc:call(AnotherNode, rabbit_version, recorded, [])),
+ ok = wait_for_replicated_tables(),
+ %% The sequence in which we delete the schema and then the
+ %% other tables is important: if we delete the schema
+ %% first when moving to RAM mnesia will loudly complain
+ %% since it doesn't make much sense to do that. But when
+ %% moving to disc, we need to move the schema first.
+ case WantDiscNode of
+ true -> create_local_table_copy(schema, disc_copies),
+ create_local_table_copies(disc);
+ false -> create_local_table_copies(ram),
+ create_local_table_copy(schema, ram_copies)
+ end
+ end,
+ ensure_schema_integrity(),
+ rabbit_node_monitor:update_cluster_status(),
+ ok.
+
+init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) ->
+ ok = init_db(ClusterNodes, WantDiscNode, Force),
+ ok = case rabbit_upgrade:maybe_upgrade_local() of
+ ok -> ok;
+ starting_from_scratch -> rabbit_version:record_desired();
+ version_not_available -> schema_ok_or_move()
+ end,
+ %% `maybe_upgrade_local' restarts mnesia, so ram nodes will forget
+ %% about the cluster
+ case WantDiscNode of
+ false -> start_mnesia(),
+ change_extra_db_nodes(ClusterNodes, true),
+ wait_for_replicated_tables();
+ true -> ok
+ end,
+ ok.
+
+init_db_with_mnesia(ClusterNodes, WantDiscNode, CheckConsistency, Force) ->
+ start_mnesia(CheckConsistency),
+ try
+ init_db_and_upgrade(ClusterNodes, WantDiscNode, Force)
+ after
+ stop_mnesia()
+ end.
+
+init_db_with_mnesia(ClusterNodes, WantDiscNode, Force) ->
+ init_db_with_mnesia(ClusterNodes, WantDiscNode, true, Force).
+
+ensure_mnesia_dir() ->
+ MnesiaDir = dir() ++ "/",
+ case filelib:ensure_dir(MnesiaDir) of
+ {error, Reason} ->
+ throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}});
+ ok ->
+ ok
+ end.
+
+ensure_mnesia_running() ->
+ case mnesia:system_info(is_running) of
+ yes ->
+ ok;
+ starting ->
+ wait_for(mnesia_running),
+ ensure_mnesia_running();
+ Reason when Reason =:= no; Reason =:= stopping ->
+ throw({error, mnesia_not_running})
+ end.
+
+ensure_mnesia_not_running() ->
+ case mnesia:system_info(is_running) of
+ no ->
+ ok;
+ stopping ->
+ wait_for(mnesia_not_running),
+ ensure_mnesia_not_running();
+ Reason when Reason =:= yes; Reason =:= starting ->
+ throw({error, mnesia_unexpectedly_running})
+ end.
+
+ensure_schema_integrity() ->
+ case check_schema_integrity() of
+ ok ->
+ ok;
+ {error, Reason} ->
+ throw({error, {schema_integrity_check_failed, Reason}})
+ end.
+
+check_schema_integrity() ->
+ Tables = mnesia:system_info(tables),
+ case check_tables(fun (Tab, TabDef) ->
+ case lists:member(Tab, Tables) of
+ false -> {error, {table_missing, Tab}};
+ true -> check_table_attributes(Tab, TabDef)
+ end
+ end) of
+ ok -> ok = wait_for_tables(),
+ check_tables(fun check_table_content/2);
+ Other -> Other
+ end.
empty_ram_only_tables() ->
Node = node(),
@@ -209,13 +610,127 @@ empty_ram_only_tables() ->
end, table_names()),
ok.
+create_tables() -> create_tables(disc).
+
+create_tables(Type) ->
+ lists:foreach(fun ({Tab, TabDef}) ->
+ TabDef1 = proplists:delete(match, TabDef),
+ case mnesia:create_table(Tab, TabDef1) of
+ {atomic, ok} -> ok;
+ {aborted, Reason} ->
+ throw({error, {table_creation_failed,
+ Tab, TabDef1, Reason}})
+ end
+ end,
+ table_definitions(Type)),
+ ok.
+
+copy_db(Destination) ->
+ ok = ensure_mnesia_not_running(),
+ rabbit_file:recursive_copy(dir(), Destination).
+
+wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
+
+wait_for_tables() -> wait_for_tables(table_names()).
+
+wait_for_tables(TableNames) ->
+ case mnesia:wait_for_tables(TableNames, 30000) of
+ ok ->
+ ok;
+ {timeout, BadTabs} ->
+ throw({error, {timeout_waiting_for_tables, BadTabs}});
+ {error, Reason} ->
+ throw({error, {failed_waiting_for_tables, Reason}})
+ end.
+
+%% This does not guarantee us much, but it avoids some situations that
+%% will definitely end up badly
+check_cluster_consistency() ->
+ %% We want to find 0 or 1 consistent nodes.
+ case lists:foldl(
+ fun (Node, {error, _}) -> check_cluster_consistency(Node);
+ (_Node, {ok, Status}) -> {ok, Status}
+ end, {error, not_found},
+ ordsets:del_element(node(), all_clustered_nodes()))
+ of
+ {ok, Status = {RemoteAllNodes, _, _}} ->
+ case ordsets:is_subset(all_clustered_nodes(), RemoteAllNodes) of
+ true ->
+ ok;
+ false ->
+ %% We delete the schema here since we think we are
+ %% clustered with nodes that are no longer in the
+ %% cluster and there is no other way to remove
+ %% them from our schema. On the other hand, we are
+ %% sure that there is another online node that we
+ %% can use to sync the tables with. There is a
+ %% race here: if between this check and the
+ %% `init_db' invocation the cluster gets
+ %% disbanded, we're left with a node with no
+ %% mnesia data that will try to connect to offline
+ %% nodes.
+ mnesia:delete_schema([node()])
+ end,
+ rabbit_node_monitor:write_cluster_status(Status);
+ {error, not_found} ->
+ ok;
+ E = {error, _} ->
+ throw(E)
+ end.
+
+check_cluster_consistency(Node) ->
+ case rpc:call(Node, rabbit_mnesia, node_info, []) of
+ {badrpc, _Reason} ->
+ {error, not_found};
+ {_OTP, _Rabbit, {error, _}} ->
+ {error, not_found};
+ {OTP, Rabbit, {ok, Status}} ->
+ case check_consistency(OTP, Rabbit, Node, Status) of
+ E = {error, _} -> E;
+ {ok, Res} -> {ok, Res}
+ end
+ end.
+
+%%--------------------------------------------------------------------
+%% Hooks for `rabbit_node_monitor'
+%%--------------------------------------------------------------------
+
+on_node_up(Node) ->
+ case running_clustered_disc_nodes() =:= [Node] of
+ true -> rabbit_log:info("cluster contains disc nodes again~n");
+ false -> ok
+ end.
+
+on_node_down(_Node) ->
+ case running_clustered_disc_nodes() =:= [] of
+ true -> rabbit_log:info("only running disc node went down~n");
+ false -> ok
+ end.
+
+%%--------------------------------------------------------------------
+%% Internal helpers
%%--------------------------------------------------------------------
-nodes_of_type(Type) ->
- %% This function should return the nodes of a certain type (ram,
- %% disc or disc_only) in the current cluster. The type of nodes
- %% is determined when the cluster is initially configured.
- mnesia:table_info(schema, Type).
+discover_cluster(Nodes) when is_list(Nodes) ->
+ lists:foldl(fun (_, {ok, Res}) -> {ok, Res};
+ (Node, {error, _}) -> discover_cluster(Node)
+ end, {error, no_nodes_provided}, Nodes);
+discover_cluster(Node) ->
+ OfflineError =
+ {error, {cannot_discover_cluster,
+ "The nodes provided is either offline or not running"}},
+ case Node =:= node() of
+ true ->
+ {error, {cannot_discover_cluster,
+ "You provided the current node as node to cluster with"}};
+ false ->
+ case rpc:call(Node,
+ rabbit_mnesia, cluster_status_from_mnesia, []) of
+ {badrpc, _Reason} -> OfflineError;
+ {error, mnesia_not_running} -> OfflineError;
+ {ok, Res} -> {ok, Res}
+ end
+ end.
%% The tables aren't supposed to be on disk on a ram node
table_definitions(disc) ->
@@ -336,68 +851,11 @@ queue_name_match() ->
resource_match(Kind) ->
#resource{kind = Kind, _='_'}.
-table_names() ->
- [Tab || {Tab, _} <- table_definitions()].
-
replicated_table_names() ->
[Tab || {Tab, TabDef} <- table_definitions(),
not lists:member({local_content, true}, TabDef)
].
-dir() -> mnesia:system_info(directory).
-
-ensure_mnesia_dir() ->
- MnesiaDir = dir() ++ "/",
- case filelib:ensure_dir(MnesiaDir) of
- {error, Reason} ->
- throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}});
- ok ->
- ok
- end.
-
-ensure_mnesia_running() ->
- case mnesia:system_info(is_running) of
- yes ->
- ok;
- starting ->
- wait_for(mnesia_running),
- ensure_mnesia_running();
- Reason when Reason =:= no; Reason =:= stopping ->
- throw({error, mnesia_not_running})
- end.
-
-ensure_mnesia_not_running() ->
- case mnesia:system_info(is_running) of
- no ->
- ok;
- stopping ->
- wait_for(mnesia_not_running),
- ensure_mnesia_not_running();
- Reason when Reason =:= yes; Reason =:= starting ->
- throw({error, mnesia_unexpectedly_running})
- end.
-
-ensure_schema_integrity() ->
- case check_schema_integrity() of
- ok ->
- ok;
- {error, Reason} ->
- throw({error, {schema_integrity_check_failed, Reason}})
- end.
-
-check_schema_integrity() ->
- Tables = mnesia:system_info(tables),
- case check_tables(fun (Tab, TabDef) ->
- case lists:member(Tab, Tables) of
- false -> {error, {table_missing, Tab}};
- true -> check_table_attributes(Tab, TabDef)
- end
- end) of
- ok -> ok = wait_for_tables(),
- check_tables(fun check_table_content/2);
- Other -> Other
- end.
-
check_table_attributes(Tab, TabDef) ->
{_, ExpAttrs} = proplists:lookup(attributes, TabDef),
case mnesia:table_info(Tab, attributes) of
@@ -433,153 +891,6 @@ check_tables(Fun) ->
Errors -> {error, Errors}
end.
-%% The cluster node config file contains some or all of the disk nodes
-%% that are members of the cluster this node is / should be a part of.
-%%
-%% If the file is absent, the list is empty, or only contains the
-%% current node, then the current node is a standalone (disk)
-%% node. Otherwise it is a node that is part of a cluster as either a
-%% disk node, if it appears in the cluster node config, or ram node if
-%% it doesn't.
-
-cluster_nodes_config_filename() ->
- dir() ++ "/cluster_nodes.config".
-
-create_cluster_nodes_config(ClusterNodes) ->
- FileName = cluster_nodes_config_filename(),
- case rabbit_file:write_term_file(FileName, [ClusterNodes]) of
- ok -> ok;
- {error, Reason} ->
- throw({error, {cannot_create_cluster_nodes_config,
- FileName, Reason}})
- end.
-
-read_cluster_nodes_config() ->
- FileName = cluster_nodes_config_filename(),
- case rabbit_file:read_term_file(FileName) of
- {ok, [ClusterNodes]} -> ClusterNodes;
- {error, enoent} ->
- {ok, ClusterNodes} = application:get_env(rabbit, cluster_nodes),
- ClusterNodes;
- {error, Reason} ->
- throw({error, {cannot_read_cluster_nodes_config,
- FileName, Reason}})
- end.
-
-delete_cluster_nodes_config() ->
- FileName = cluster_nodes_config_filename(),
- case file:delete(FileName) of
- ok -> ok;
- {error, enoent} -> ok;
- {error, Reason} ->
- throw({error, {cannot_delete_cluster_nodes_config,
- FileName, Reason}})
- end.
-
-running_nodes_filename() ->
- filename:join(dir(), "nodes_running_at_shutdown").
-
-record_running_nodes() ->
- FileName = running_nodes_filename(),
- Nodes = running_clustered_nodes() -- [node()],
- %% Don't check the result: we're shutting down anyway and this is
- %% a best-effort-basis.
- rabbit_file:write_term_file(FileName, [Nodes]),
- ok.
-
-read_previously_running_nodes() ->
- FileName = running_nodes_filename(),
- case rabbit_file:read_term_file(FileName) of
- {ok, [Nodes]} -> Nodes;
- {error, enoent} -> [];
- {error, Reason} -> throw({error, {cannot_read_previous_nodes_file,
- FileName, Reason}})
- end.
-
-delete_previously_running_nodes() ->
- FileName = running_nodes_filename(),
- case file:delete(FileName) of
- ok -> ok;
- {error, enoent} -> ok;
- {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file,
- FileName, Reason}})
- end.
-
-init_db(ClusterNodes, Force) ->
- init_db(
- ClusterNodes, Force,
- fun () ->
- case rabbit_upgrade:maybe_upgrade_local() of
- ok -> ok;
- %% If we're just starting up a new node we won't have a
- %% version
- starting_from_scratch -> ok = rabbit_version:record_desired()
- end
- end).
-
-%% Take a cluster node config and create the right kind of node - a
-%% standalone disk node, or disk or ram node connected to the
-%% specified cluster nodes. If Force is false, don't allow
-%% connections to offline nodes.
-init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) ->
- UClusterNodes = lists:usort(ClusterNodes),
- ProperClusterNodes = UClusterNodes -- [node()],
- case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
- {ok, []} when not Force andalso ProperClusterNodes =/= [] ->
- throw({error, {failed_to_cluster_with, ProperClusterNodes,
- "Mnesia could not connect to any disc nodes."}});
- {ok, Nodes} ->
- WasDiscNode = is_disc_node(),
- WantDiscNode = should_be_disc_node(ClusterNodes),
- %% We create a new db (on disk, or in ram) in the first
- %% two cases and attempt to upgrade the in the other two
- case {Nodes, WasDiscNode, WantDiscNode} of
- {[], _, false} ->
- %% New ram node; start from scratch
- ok = create_schema(ram);
- {[], false, true} ->
- %% Nothing there at all, start from scratch
- ok = create_schema(disc);
- {[], true, true} ->
- %% We're the first node up
- case rabbit_upgrade:maybe_upgrade_local() of
- ok -> ensure_schema_integrity();
- version_not_available -> ok = schema_ok_or_move()
- end;
- {[AnotherNode|_], _, _} ->
- %% Subsequent node in cluster, catch up
- ensure_version_ok(
- rpc:call(AnotherNode, rabbit_version, recorded, [])),
- {CopyType, CopyTypeAlt} =
- case WantDiscNode of
- true -> {disc, disc_copies};
- false -> {ram, ram_copies}
- end,
- ok = wait_for_replicated_tables(),
- ok = create_local_table_copy(schema, CopyTypeAlt),
- ok = create_local_table_copies(CopyType),
-
- ok = SecondaryPostMnesiaFun(),
- %% We've taken down mnesia, so ram nodes will need
- %% to re-sync
- case is_disc_node() of
- false -> start_mnesia(),
- mnesia:change_config(extra_db_nodes,
- ProperClusterNodes),
- wait_for_replicated_tables();
- true -> ok
- end,
-
- ensure_schema_integrity(),
- ok
- end;
- {error, Reason} ->
- %% one reason we may end up here is if we try to join
- %% nodes together that are currently running standalone or
- %% are members of a different cluster
- throw({error, {unable_to_join_cluster, ClusterNodes, Reason}})
- end.
-
schema_ok_or_move() ->
case check_schema_integrity() of
ok ->
@@ -592,7 +903,7 @@ schema_ok_or_move() ->
"and recreating schema from scratch~n",
[Reason]),
ok = move_db(),
- ok = create_schema(disc)
+ ok = create_schema()
end.
ensure_version_ok({ok, DiscVersion}) ->
@@ -604,25 +915,16 @@ ensure_version_ok({ok, DiscVersion}) ->
ensure_version_ok({error, _}) ->
ok = rabbit_version:record_desired().
-create_schema(Type) ->
+%% We only care about disc nodes since ram nodes are supposed to catch
+%% up only
+create_schema() ->
stop_mnesia(),
- case Type of
- disc -> rabbit_misc:ensure_ok(mnesia:create_schema([node()]),
- cannot_create_schema);
- ram -> %% remove the disc schema since this is a ram node
- rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
- cannot_delete_schema)
- end,
+ rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema),
start_mnesia(),
- ok = create_tables(Type),
+ ok = create_tables(disc),
ensure_schema_integrity(),
ok = rabbit_version:record_desired().
-is_disc_node() -> mnesia:system_info(use_dir).
-
-should_be_disc_node(ClusterNodes) ->
- ClusterNodes == [] orelse lists:member(node(), ClusterNodes).
-
move_db() ->
stop_mnesia(),
MnesiaDir = filename:dirname(dir() ++ "/"),
@@ -644,25 +946,6 @@ move_db() ->
start_mnesia(),
ok.
-copy_db(Destination) ->
- ok = ensure_mnesia_not_running(),
- rabbit_file:recursive_copy(dir(), Destination).
-
-create_tables() -> create_tables(disc).
-
-create_tables(Type) ->
- lists:foreach(fun ({Tab, TabDef}) ->
- TabDef1 = proplists:delete(match, TabDef),
- case mnesia:create_table(Tab, TabDef1) of
- {atomic, ok} -> ok;
- {aborted, Reason} ->
- throw({error, {table_creation_failed,
- Tab, TabDef1, Reason}})
- end
- end,
- table_definitions(Type)),
- ok.
-
copy_type_to_ram(TabDef) ->
[{disc_copies, []}, {ram_copies, [node()]}
| proplists:delete(ram_copies, proplists:delete(disc_copies, TabDef))].
@@ -684,13 +967,6 @@ create_local_table_copies(Type) ->
HasDiscOnlyCopies -> disc_only_copies;
true -> ram_copies
end;
-%%% unused code - commented out to keep dialyzer happy
-%%% Type =:= disc_only ->
-%%% if
-%%% HasDiscCopies or HasDiscOnlyCopies ->
-%%% disc_only_copies;
-%%% true -> ram_copies
-%%% end;
Type =:= ram ->
ram_copies
end,
@@ -711,120 +987,187 @@ create_local_table_copy(Tab, Type) ->
end,
ok.
-wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
-
-wait_for_tables() -> wait_for_tables(table_names()).
-
-wait_for_tables(TableNames) ->
- case mnesia:wait_for_tables(TableNames, 30000) of
- ok ->
- ok;
- {timeout, BadTabs} ->
- throw({error, {timeout_waiting_for_tables, BadTabs}});
- {error, Reason} ->
- throw({error, {failed_waiting_for_tables, Reason}})
+remove_node_if_mnesia_running(Node) ->
+ case mnesia:system_info(is_running) of
+ yes ->
+ %% Deleting the the schema copy of the node will result in
+ %% the node being removed from the cluster, with that
+ %% change being propagated to all nodes
+ case mnesia:del_table_copy(schema, Node) of
+ {atomic, ok} ->
+ rabbit_node_monitor:notify_left_cluster(Node),
+ ok;
+ {aborted, Reason} ->
+ {error, {failed_to_remove_node, Node, Reason}}
+ end;
+ no ->
+ {error, mnesia_not_running}
end.
-reset(Force) ->
- rabbit_misc:local_info_msg("Resetting Rabbit~s~n",
- [if Force -> " forcefully";
- true -> ""
- end]),
- ensure_mnesia_not_running(),
- case not Force andalso is_clustered() andalso
- is_only_disc_node(node(), false)
+leave_cluster() ->
+ case {is_clustered(),
+ running_nodes(ordsets:del_element(node(), all_clustered_nodes()))}
of
- true -> log_both("no other disc nodes running");
- false -> ok
- end,
- Node = node(),
- Nodes = all_clustered_nodes() -- [Node],
- case Force of
- true -> ok;
- false ->
- ensure_mnesia_dir(),
- start_mnesia(),
- RunningNodes =
- try
- %% Force=true here so that reset still works when clustered
- %% with a node which is down
- ok = init_db(read_cluster_nodes_config(), true),
- running_clustered_nodes() -- [Node]
- after
- stop_mnesia()
- end,
- leave_cluster(Nodes, RunningNodes),
- rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
- cannot_delete_schema)
- end,
- %% We need to make sure that we don't end up in a distributed
- %% Erlang system with nodes while not being in an Mnesia cluster
- %% with them. We don't handle that well.
- [erlang:disconnect_node(N) || N <- Nodes],
- ok = delete_cluster_nodes_config(),
- %% remove persisted messages and any other garbage we find
- ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
- ok.
+ {false, []} -> ok;
+ {_, AllNodes} -> case lists:any(fun leave_cluster/1, AllNodes) of
+ true -> ok;
+ false -> e(no_running_cluster_nodes)
+ end
+ end.
-leave_cluster([], _) -> ok;
-leave_cluster(Nodes, RunningNodes) ->
- %% find at least one running cluster node and instruct it to
- %% remove our schema copy which will in turn result in our node
- %% being removed as a cluster node from the schema, with that
- %% change being propagated to all nodes
- case lists:any(
- fun (Node) ->
- case rpc:call(Node, mnesia, del_table_copy,
- [schema, node()]) of
- {atomic, ok} -> true;
- {badrpc, nodedown} -> false;
- {aborted, {node_not_running, _}} -> false;
- {aborted, Reason} ->
- throw({error, {failed_to_leave_cluster,
- Nodes, RunningNodes, Reason}})
- end
- end,
- RunningNodes) of
- true -> ok;
- false -> throw({error, {no_running_cluster_nodes,
- Nodes, RunningNodes}})
+leave_cluster(Node) ->
+ case rpc:call(Node,
+ rabbit_mnesia, remove_node_if_mnesia_running, [node()]) of
+ ok -> true;
+ {error, mnesia_not_running} -> false;
+ {error, Reason} -> throw({error, Reason});
+ {badrpc, nodedown} -> false
end.
wait_for(Condition) ->
error_logger:info_msg("Waiting for ~p...~n", [Condition]),
timer:sleep(1000).
-on_node_up(Node) ->
- case is_only_disc_node(Node, true) of
- true -> rabbit_log:info("cluster contains disc nodes again~n");
+start_mnesia(CheckConsistency) ->
+ case CheckConsistency of
+ true -> check_cluster_consistency();
false -> ok
- end.
-
-on_node_down(Node) ->
- case is_only_disc_node(Node, true) of
- true -> rabbit_log:info("only running disc node went down~n");
- false -> ok
- end.
-
-is_only_disc_node(Node, _MnesiaRunning = true) ->
- RunningSet = sets:from_list(running_clustered_nodes()),
- DiscSet = sets:from_list(nodes_of_type(disc_copies)),
- [Node] =:= sets:to_list(sets:intersection(RunningSet, DiscSet));
-is_only_disc_node(Node, false) ->
- start_mnesia(),
- Res = is_only_disc_node(Node, true),
- stop_mnesia(),
- Res.
-
-log_both(Warning) ->
- io:format("Warning: ~s~n", [Warning]),
- rabbit_misc:with_local_io(
- fun () -> error_logger:warning_msg("~s~n", [Warning]) end).
-
-start_mnesia() ->
+ end,
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
ensure_mnesia_running().
+start_mnesia() ->
+ start_mnesia(true).
+
stop_mnesia() ->
stopped = mnesia:stop(),
ensure_mnesia_not_running().
+
+change_extra_db_nodes(ClusterNodes0, Force) ->
+ ClusterNodes = lists:usort(ClusterNodes0) -- [node()],
+ case mnesia:change_config(extra_db_nodes, ClusterNodes) of
+ {ok, []} when not Force andalso ClusterNodes =/= [] ->
+ throw({error, {failed_to_cluster_with, ClusterNodes,
+ "Mnesia could not connect to any nodes."}});
+ {ok, Nodes} ->
+ Nodes
+ end.
+
+%% What we really want is nodes running rabbit, not running
+%% mnesia. Using `mnesia:system_info(running_db_nodes)' will
+%% return false positives when we are actually just doing cluster
+%% operations (e.g. joining the cluster).
+running_nodes(Nodes) ->
+ {Replies, _BadNodes} =
+ rpc:multicall(Nodes, rabbit_mnesia, is_running_remote, []),
+ [Node || {Running, Node} <- Replies, Running].
+
+is_running_remote() ->
+ {proplists:is_defined(rabbit, application:which_applications(infinity)),
+ node()}.
+
+check_consistency(OTP, Rabbit) ->
+ rabbit_misc:sequence_error(
+ [check_otp_consistency(OTP), check_rabbit_consistency(Rabbit)]).
+
+check_consistency(OTP, Rabbit, Node, Status) ->
+ rabbit_misc:sequence_error(
+ [check_otp_consistency(OTP),
+ check_rabbit_consistency(Rabbit),
+ check_nodes_consistency(Node, Status)]).
+
+check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) ->
+ ThisNode = node(),
+ case ordsets:is_element(ThisNode, RemoteAllNodes) of
+ true ->
+ {ok, RemoteStatus};
+ false ->
+ {error, {inconsistent_cluster,
+ rabbit_misc:format("Node ~p thinks it's clustered "
+ "with node ~p, but ~p disagrees",
+ [ThisNode, Node, Node])}}
+ end.
+
+check_version_consistency(This, Remote, _) when This =:= Remote ->
+ ok;
+check_version_consistency(This, Remote, Name) ->
+ {error, {inconsistent_cluster,
+ rabbit_misc:format("~s version mismatch: local node is ~s, "
+ "remote node ~s", [Name, This, Remote])}}.
+
+check_otp_consistency(Remote) ->
+ check_version_consistency(erlang:system_info(otp_release), Remote, "OTP").
+
+check_rabbit_consistency(Remote) ->
+ check_version_consistency(rabbit_misc:version(), Remote, "Rabbit").
+
+%% This is fairly tricky. We want to know if the node is in the state
+%% that a `reset' would leave it in. We cannot simply check if the
+%% mnesia tables aren't there because restarted RAM nodes won't have
+%% tables while still being non-virgin. What we do instead is to
+%% check if the mnesia directory is non existant or empty, with the
+%% exception of the cluster status file, which will be there thanks to
+%% `rabbit_node_monitor:prepare_cluster_status_file/0'.
+is_virgin_node() ->
+ case rabbit_file:list_dir(dir()) of
+ {error, enoent} -> true;
+ {ok, []} -> true;
+ {ok, [File]} -> (dir() ++ "/" ++ File) =:=
+ [rabbit_node_monitor:cluster_status_filename(),
+ rabbit_node_monitor:running_nodes_filename()];
+ {ok, _} -> false
+ end.
+
+find_good_node([]) ->
+ none;
+find_good_node([Node | Nodes]) ->
+ case rpc:call(Node, rabbit_mnesia, node_info, []) of
+ {badrpc, _Reason} -> find_good_node(Nodes);
+ {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of
+ {error, _} -> find_good_node(Nodes);
+ ok -> {ok, Node}
+ end
+ end.
+
+e(Tag) -> throw({error, {Tag, error_description(Tag)}}).
+
+error_description(clustering_only_disc_node) ->
+ "You cannot cluster a node if it is the only disc node in its existing "
+ " cluster. If new nodes joined while this node was offline, use "
+ "\"update_cluster_nodes\" to add them manually.";
+error_description(resetting_only_disc_node) ->
+ "You cannot reset a node when it is the only disc node in a cluster. "
+ "Please convert another node of the cluster to a disc node first.";
+error_description(already_clustered) ->
+ "You are already clustered with the nodes you have selected.";
+error_description(not_clustered) ->
+ "Non-clustered nodes can only be disc nodes.";
+error_description(cannot_connect_to_cluster) ->
+ "Could not connect to the cluster nodes present in this node's "
+ "status file. If the cluster has changed, you can use the "
+ "\"update_cluster_nodes\" command to point to the new cluster nodes.";
+error_description(no_online_cluster_nodes) ->
+ "Could not find any online cluster nodes. If the cluster has changed, "
+ "you can use the 'recluster' command.";
+error_description(cannot_connect_to_node) ->
+ "Could not connect to the cluster node provided.";
+error_description(inconsistent_cluster) ->
+ "The nodes provided do not have this node as part of the cluster.";
+error_description(not_a_cluster_node) ->
+ "The node selected is not in the cluster.";
+error_description(online_node_offline_flag) ->
+ "You set the --offline flag, which is used to remove nodes remotely from "
+ "offline nodes, but this node is online.";
+error_description(offline_node_no_offline_flag) ->
+ "You are trying to remove a node from an offline node. That is dangerous, "
+ "but can be done with the --offline flag. Please consult the manual "
+ "for rabbitmqctl for more information.";
+error_description(not_last_node_to_go_down) ->
+ "The node you're trying to remove from was not the last to go down "
+ "(excluding the node you are removing). Please use the the last node "
+ "to go down to remove nodes when the cluster is offline.";
+error_description(removing_node_from_offline_node) ->
+ "To remove a node remotely from an offline node, the node you're removing "
+ "from must be a disc node and all the other nodes must be offline.";
+error_description(no_running_cluster_nodes) ->
+ "You cannot leave a cluster if no online nodes are present.".
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index d69dad1f..c2e55022 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1394,7 +1394,7 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)).
-list_sorted_file_names(Dir, Ext) ->
+list_sorted_filenames(Dir, Ext) ->
lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
filelib:wildcard("*" ++ Ext, Dir)).
@@ -1531,8 +1531,8 @@ count_msg_refs(Gen, Seed, State) ->
end.
recover_crashed_compactions(Dir) ->
- FileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION),
- TmpFileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP),
+ FileNames = list_sorted_filenames(Dir, ?FILE_EXTENSION),
+ TmpFileNames = list_sorted_filenames(Dir, ?FILE_EXTENSION_TMP),
lists:foreach(
fun (TmpFileName) ->
NonTmpRelatedFileName =
@@ -1609,7 +1609,7 @@ build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
{ok, Pid} = gatherer:start_link(),
case [filename_to_num(FileName) ||
- FileName <- list_sorted_file_names(Dir, ?FILE_EXTENSION)] of
+ FileName <- list_sorted_filenames(Dir, ?FILE_EXTENSION)] of
[] -> build_index(Pid, undefined, [State #msstate.current_file],
State);
Files -> {Offset, State1} = build_index(Pid, undefined, Files, State),
@@ -2023,7 +2023,7 @@ transform_dir(BaseDir, Store, TransformFun) ->
CopyFile = fun (Src, Dst) -> {ok, _Bytes} = file:copy(Src, Dst), ok end,
case filelib:is_dir(TmpDir) of
true -> throw({error, transform_failed_previously});
- false -> FileList = list_sorted_file_names(Dir, ?FILE_EXTENSION),
+ false -> FileList = list_sorted_filenames(Dir, ?FILE_EXTENSION),
foreach_file(Dir, TmpDir, TransformFile, FileList),
foreach_file(Dir, fun file:delete/1, FileList),
foreach_file(TmpDir, Dir, CopyFile, FileList),
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index bedf5142..038154c3 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -19,7 +19,7 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2,
- close/1, maybe_fast_close/1, sockname/1, peername/1, peercert/1,
+ close/1, fast_close/1, sockname/1, peername/1, peercert/1,
tune_buffer_size/1, connection_string/2]).
%%---------------------------------------------------------------------------
@@ -59,7 +59,7 @@
-spec(setopts/2 :: (socket(), opts()) -> ok_or_any_error()).
-spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()).
-spec(close/1 :: (socket()) -> ok_or_any_error()).
--spec(maybe_fast_close/1 :: (socket()) -> ok_or_any_error()).
+-spec(fast_close/1 :: (socket()) -> ok_or_any_error()).
-spec(sockname/1 ::
(socket())
-> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
@@ -77,6 +77,8 @@
%%---------------------------------------------------------------------------
+-define(SSL_CLOSE_TIMEOUT, 5000).
+
-define(IS_SSL(Sock), is_record(Sock, ssl_socket)).
is_ssl(Sock) -> ?IS_SSL(Sock).
@@ -148,8 +150,31 @@ send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data).
close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl);
close(Sock) when is_port(Sock) -> gen_tcp:close(Sock).
-maybe_fast_close(Sock) when ?IS_SSL(Sock) -> ok;
-maybe_fast_close(Sock) when is_port(Sock) -> erlang:port_close(Sock), ok.
+fast_close(Sock) when ?IS_SSL(Sock) ->
+ %% We cannot simply port_close the underlying tcp socket since the
+ %% TLS protocol is quite insistent that a proper closing handshake
+ %% should take place (see RFC 5245 s7.2.1). So we call ssl:close
+ %% instead, but that can block for a very long time, e.g. when
+ %% there is lots of pending output and there is tcp backpressure,
+ %% or the ssl_connection process has entered the the
+ %% workaround_transport_delivery_problems function during
+ %% termination, which, inexplicably, does a gen_tcp:recv(Socket,
+ %% 0), which may never return if the client doesn't send a FIN or
+ %% that gets swallowed by the network. Since there is no timeout
+ %% variant of ssl:close, we construct our own.
+ {Pid, MRef} = spawn_monitor(fun () -> ssl:close(Sock#ssl_socket.ssl) end),
+ erlang:send_after(?SSL_CLOSE_TIMEOUT, self(), {Pid, ssl_close_timeout}),
+ receive
+ {Pid, ssl_close_timeout} ->
+ erlang:demonitor(MRef, [flush]),
+ exit(Pid, kill);
+ {'DOWN', MRef, process, Pid, _Reason} ->
+ ok
+ end,
+ catch port_close(Sock#ssl_socket.tcp),
+ ok;
+fast_close(Sock) when is_port(Sock) ->
+ catch port_close(Sock), ok.
sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl);
sockname(Sock) when is_port(Sock) -> inet:sockname(Sock).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 94a5a2b7..2d0ded12 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -160,7 +160,19 @@ ssl_transform_fun(SslOpts) ->
case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of
{ok, SslSock} ->
{ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
+ {error, timeout} ->
+ {error, {ssl_upgrade_error, timeout}};
{error, Reason} ->
+ %% We have no idea what state the ssl_connection
+ %% process is in - it could still be happily
+ %% going, it might be stuck, or it could be just
+ %% about to fail. There is little that our caller
+ %% can do but close the TCP socket, but this could
+ %% cause ssl alerts to get dropped (which is bad
+ %% form, according to the TLS spec). So we give
+ %% the ssl_connection a little bit of time to send
+ %% such alerts.
+ timer:sleep(?SSL_TIMEOUT * 1000),
{error, {ssl_upgrade_error, Reason}};
{'EXIT', Reason} ->
{error, {ssl_upgrade_failure, Reason}}
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 323cf0ce..64c801f2 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -18,11 +18,29 @@
-behaviour(gen_server).
--export([start_link/0]).
+-export([running_nodes_filename/0,
+ cluster_status_filename/0,
+ prepare_cluster_status_files/0,
+ write_cluster_status/1,
+ read_cluster_status/0,
+ update_cluster_status/0,
+ reset_cluster_status/0,
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
--export([notify_cluster/0, rabbit_running_on/1]).
+ joined_cluster/2,
+ notify_joined_cluster/0,
+ left_cluster/1,
+ notify_left_cluster/1,
+ node_up/2,
+ notify_node_up/0,
+
+ start_link/0,
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3
+ ]).
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
@@ -31,56 +49,198 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(rabbit_running_on/1 :: (node()) -> 'ok').
--spec(notify_cluster/0 :: () -> 'ok').
+-spec(running_nodes_filename/0 :: () -> string()).
+-spec(cluster_status_filename/0 :: () -> string()).
+-spec(prepare_cluster_status_files/0 :: () -> 'ok').
+-spec(write_cluster_status/1 :: (rabbit_mnesia:cluster_status()) -> 'ok').
+-spec(read_cluster_status/0 :: () -> rabbit_mnesia:cluster_status()).
+-spec(update_cluster_status/0 :: () -> 'ok').
+-spec(reset_cluster_status/0 :: () -> 'ok').
+
+-spec(joined_cluster/2 :: (node(), boolean()) -> 'ok').
+-spec(notify_joined_cluster/0 :: () -> 'ok').
+-spec(left_cluster/1 :: (node()) -> 'ok').
+-spec(notify_left_cluster/1 :: (node()) -> 'ok').
+-spec(node_up/2 :: (node(), boolean()) -> 'ok').
+-spec(notify_node_up/0 :: () -> 'ok').
-endif.
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
+%% Cluster file operations
+%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+%% The cluster file information is kept in two files. The "cluster status file"
+%% contains all the clustered nodes and the disc nodes. The "running nodes
+%% file" contains the currently running nodes or the running nodes at shutdown
+%% when the node is down.
+%%
+%% We strive to keep the files up to date and we rely on this assumption in
+%% various situations. Obviously when mnesia is offline the information we have
+%% will be outdated, but it can't be otherwise.
-rabbit_running_on(Node) ->
- gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}).
+running_nodes_filename() ->
+ filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown").
-notify_cluster() ->
- Node = node(),
- Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
- %% notify other rabbits of this rabbit
- case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on,
- [Node], ?RABBIT_UP_RPC_TIMEOUT) of
- {_, [] } -> ok;
- {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
- end,
+cluster_status_filename() ->
+ rabbit_mnesia:dir() ++ "/cluster_nodes.config".
+
+prepare_cluster_status_files() ->
+ rabbit_mnesia:ensure_mnesia_dir(),
+ CorruptFiles = fun () -> throw({error, corrupt_cluster_status_files}) end,
+ RunningNodes1 = case try_read_file(running_nodes_filename()) of
+ {ok, [Nodes]} when is_list(Nodes) -> Nodes;
+ {ok, _ } -> CorruptFiles();
+ {error, enoent} -> []
+ end,
+ {AllNodes1, WantDiscNode} =
+ case try_read_file(cluster_status_filename()) of
+ {ok, [{AllNodes, DiscNodes0}]} ->
+ {AllNodes, lists:member(node(), DiscNodes0)};
+ {ok, [AllNodes0]} when is_list(AllNodes0) ->
+ {legacy_cluster_nodes(AllNodes0),
+ legacy_should_be_disc_node(AllNodes0)};
+ {ok, _} ->
+ CorruptFiles();
+ {error, enoent} ->
+ {legacy_cluster_nodes([]), true}
+ end,
+
+ ThisNode = [node()],
+
+ RunningNodes2 = lists:usort(RunningNodes1 ++ ThisNode),
+ AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2),
+ DiscNodes = case WantDiscNode of
+ true -> ThisNode;
+ false -> []
+ end,
+
+ ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}).
+
+write_cluster_status({All, Disc, Running}) ->
+ ClusterStatusFN = cluster_status_filename(),
+ Res = case rabbit_file:write_term_file(ClusterStatusFN, [{All, Disc}]) of
+ ok ->
+ RunningNodesFN = running_nodes_filename(),
+ {RunningNodesFN,
+ rabbit_file:write_term_file(RunningNodesFN, [Running])};
+ E1 = {error, _} ->
+ {ClusterStatusFN, E1}
+ end,
+ case Res of
+ {_, ok} -> ok;
+ {FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}})
+ end.
+
+try_read_file(FileName) ->
+ case rabbit_file:read_term_file(FileName) of
+ {ok, Term} -> {ok, Term};
+ {error, enoent} -> {error, enoent};
+ {error, E} -> throw({error, {cannot_read_file, FileName, E}})
+ end.
+
+read_cluster_status() ->
+ case {try_read_file(cluster_status_filename()),
+ try_read_file(running_nodes_filename())} of
+ {{ok, [{All, Disc}]}, {ok, [Running]}} when is_list(Running) ->
+ {All, Disc, Running};
+ {_, _} ->
+ throw({error, corrupt_or_missing_cluster_files})
+ end.
+
+update_cluster_status() ->
+ {ok, Status} = rabbit_mnesia:cluster_status_from_mnesia(),
+ write_cluster_status(Status).
+
+reset_cluster_status() ->
+ write_cluster_status({[node()], [node()], [node()]}).
+
+%%----------------------------------------------------------------------------
+%% Cluster notifications
+%%----------------------------------------------------------------------------
+
+joined_cluster(Node, IsDiscNode) ->
+ gen_server:cast(?SERVER, {rabbit_join, Node, IsDiscNode}).
+
+notify_joined_cluster() ->
+ cluster_multicall(joined_cluster, [node(), rabbit_mnesia:is_disc_node()]),
+ ok.
+
+left_cluster(Node) ->
+ gen_server:cast(?SERVER, {left_cluster, Node}).
+
+notify_left_cluster(Node) ->
+ left_cluster(Node),
+ cluster_multicall(left_cluster, [Node]),
+ ok.
+
+node_up(Node, IsDiscNode) ->
+ gen_server:cast(?SERVER, {node_up, Node, IsDiscNode}).
+
+notify_node_up() ->
+ Nodes = cluster_multicall(node_up, [node(), rabbit_mnesia:is_disc_node()]),
%% register other active rabbits with this rabbit
- [ rabbit_running_on(N) || N <- Nodes ],
+ [ node_up(N, ordsets:is_element(N, rabbit_mnesia:clustered_disc_nodes())) ||
+ N <- Nodes ],
ok.
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
+%% gen_server callbacks
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
init([]) ->
- {ok, ordsets:new()}.
+ {ok, no_state}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({rabbit_running_on, Node}, Nodes) ->
- case ordsets:is_element(Node, Nodes) of
- true -> {noreply, Nodes};
+%% Note: when updating the status file, we can't simply write the mnesia
+%% information since the message can (and will) overtake the mnesia propagation.
+handle_cast({node_up, Node, IsDiscNode}, State) ->
+ case is_already_monitored({rabbit, Node}) of
+ true -> {noreply, State};
false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({ordsets:add_element(Node, AllNodes),
+ case IsDiscNode of
+ true -> ordsets:add_element(
+ Node, DiscNodes);
+ false -> DiscNodes
+ end,
+ ordsets:add_element(Node, RunningNodes)}),
erlang:monitor(process, {rabbit, Node}),
ok = handle_live_rabbit(Node),
- {noreply, ordsets:add_element(Node, Nodes)}
+ {noreply, State}
end;
+handle_cast({joined_cluster, Node, IsDiscNode}, State) ->
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({ordsets:add_element(Node, AllNodes),
+ case IsDiscNode of
+ true -> ordsets:add_element(Node,
+ DiscNodes);
+ false -> DiscNodes
+ end,
+ RunningNodes}),
+ {noreply, State};
+handle_cast({left_cluster, Node}, State) ->
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({ordsets:del_element(Node, AllNodes),
+ ordsets:del_element(Node, DiscNodes),
+ ordsets:del_element(Node, RunningNodes)}),
+ {noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Nodes) ->
+handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
rabbit_log:info("rabbit on node ~p down~n", [Node]),
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({AllNodes, DiscNodes,
+ ordsets:del_element(Node, RunningNodes)}),
ok = handle_dead_rabbit(Node),
- {noreply, ordsets:del_element(Node, Nodes)};
+ {noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
@@ -90,7 +250,9 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
+%% Functions that call the module specific hooks when nodes go up/down
+%%----------------------------------------------------------------------------
%% TODO: This may turn out to be a performance hog when there are lots
%% of nodes. We really only need to execute some of these statements
@@ -104,3 +266,32 @@ handle_dead_rabbit(Node) ->
handle_live_rabbit(Node) ->
ok = rabbit_alarm:on_node_up(Node),
ok = rabbit_mnesia:on_node_up(Node).
+
+%%--------------------------------------------------------------------
+%% Internal utils
+%%--------------------------------------------------------------------
+
+cluster_multicall(Fun, Args) ->
+ Node = node(),
+ Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
+ %% notify other rabbits of this cluster
+ case rpc:multicall(Nodes, rabbit_node_monitor, Fun, Args,
+ ?RABBIT_UP_RPC_TIMEOUT) of
+ {_, [] } -> ok;
+ {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
+ end,
+ Nodes.
+
+is_already_monitored(Item) ->
+ {monitors, Monitors} = process_info(self(), monitors),
+ lists:any(fun ({_, Item1}) when Item =:= Item1 -> true;
+ (_) -> false
+ end, Monitors).
+
+legacy_cluster_nodes(Nodes) ->
+ %% We get all the info that we can, including the nodes from mnesia, which
+ %% will be there if the node is a disc node (empty list otherwise)
+ lists:usort(Nodes ++ mnesia:system_info(db_nodes)).
+
+legacy_should_be_disc_node(DiscNodes) ->
+ DiscNodes == [] orelse lists:member(node(), DiscNodes).
diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl
index af940dde..24762a73 100644
--- a/src/rabbit_parameter_validation.erl
+++ b/src/rabbit_parameter_validation.erl
@@ -16,7 +16,7 @@
-module(rabbit_parameter_validation).
--export([number/2, binary/2, list/2, proplist/3]).
+-export([number/2, binary/2, boolean/2, list/2, regex/2, proplist/3]).
number(_Name, Term) when is_number(Term) ->
ok;
@@ -30,12 +30,26 @@ binary(_Name, Term) when is_binary(Term) ->
binary(Name, Term) ->
{error, "~s should be binary, actually was ~p", [Name, Term]}.
+boolean(_Name, Term) when is_boolean(Term) ->
+ ok;
+boolean(Name, Term) ->
+ {error, "~s should be boolean, actually was ~p", [Name, Term]}.
+
list(_Name, Term) when is_list(Term) ->
ok;
list(Name, Term) ->
{error, "~s should be list, actually was ~p", [Name, Term]}.
+regex(Name, Term) when is_binary(Term) ->
+ case re:compile(Term) of
+ {ok, _} -> ok;
+ {error, Reason} -> {error, "~s should be regular expression "
+ "but is invalid: ~p", [Name, Reason]}
+ end;
+regex(Name, Term) ->
+ {error, "~s should be a binary but was ~p", [Name, Term]}.
+
proplist(Name, Constraints, Term) when is_list(Term) ->
{Results, Remainder}
= lists:foldl(
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 1551795f..69480c9c 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -22,11 +22,11 @@
-include("rabbit.hrl").
--import(rabbit_misc, [pget/2]).
+-import(rabbit_misc, [pget/2, pget/3]).
-export([register/0]).
-export([name/1, get/2, set/1]).
--export([validate/3, validate_clear/2, notify/3, notify_clear/2]).
+-export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
-rabbit_boot_step({?MODULE,
[{description, "policy parameters"},
@@ -46,12 +46,13 @@ name0(Policy) -> pget(<<"name">>, Policy).
set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
-set0(Name) -> match(Name, list()).
+set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)).
get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy);
get(Name, #exchange{policy = Policy}) -> get0(Name, Policy);
%% Caution - SLOW.
-get(Name, EntityName = #resource{}) -> get0(Name, match(EntityName, list())).
+get(Name, EntityName = #resource{virtual_host = VHost}) ->
+ get0(Name, match(EntityName, list(VHost))).
get0(_Name, undefined) -> {error, not_found};
get0(Name, List) -> case pget(<<"policy">>, List) of
@@ -64,35 +65,33 @@ get0(Name, List) -> case pget(<<"policy">>, List) of
%%----------------------------------------------------------------------------
-validate(<<"policy">>, Name, Term) ->
+validate(_VHost, <<"policy">>, Name, Term) ->
rabbit_parameter_validation:proplist(
Name, policy_validation(), Term).
-validate_clear(<<"policy">>, _Name) ->
+validate_clear(_VHost, <<"policy">>, _Name) ->
ok.
-notify(<<"policy">>, _Name, _Term) ->
- update_policies().
+notify(VHost, <<"policy">>, _Name, _Term) ->
+ update_policies(VHost).
-notify_clear(<<"policy">>, _Name) ->
- update_policies().
+notify_clear(VHost, <<"policy">>, _Name) ->
+ update_policies(VHost).
%%----------------------------------------------------------------------------
-list() ->
+list(VHost) ->
[[{<<"name">>, pget(key, P)} | pget(value, P)]
- || P <- rabbit_runtime_parameters:list(<<"policy">>)].
+ || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)].
-update_policies() ->
- Policies = list(),
+update_policies(VHost) ->
+ Policies = list(VHost),
{Xs, Qs} = rabbit_misc:execute_mnesia_transaction(
fun() ->
{[update_exchange(X, Policies) ||
- VHost <- rabbit_vhost:list(),
- X <- rabbit_exchange:list(VHost)],
+ X <- rabbit_exchange:list(VHost)],
[update_queue(Q, Policies) ||
- VHost <- rabbit_vhost:list(),
- Q <- rabbit_amqqueue:list(VHost)]}
+ Q <- rabbit_amqqueue:list(VHost)]}
end),
[notify(X) || X <- Xs],
[notify(Q) || Q <- Qs],
@@ -129,28 +128,15 @@ match(Name, Policies) ->
[Policy | _Rest] -> Policy
end.
-matches(#resource{name = Name, virtual_host = VHost}, Policy) ->
- Prefix = pget(<<"prefix">>, Policy),
- case pget(<<"vhost">>, Policy) of
- undefined -> prefix(Prefix, Name);
- VHost -> prefix(Prefix, Name);
- _ -> false
- end.
-
-prefix(A, B) -> lists:prefix(binary_to_list(A), binary_to_list(B)).
+matches(#resource{name = Name}, Policy) ->
+ match =:= re:run(Name, pget(<<"pattern">>, Policy), [{capture, none}]).
sort_pred(A, B) ->
- R = size(pget(<<"prefix">>, A)) >= size(pget(<<"prefix">>, B)),
- case {pget(<<"vhost">>, A), pget(<<"vhost">>, B)} of
- {undefined, undefined} -> R;
- {undefined, _} -> true;
- {_, undefined} -> false;
- _ -> R
- end.
+ pget(<<"priority">>, A, 0) >= pget(<<"priority">>, B, 0).
%%----------------------------------------------------------------------------
policy_validation() ->
- [{<<"vhost">>, fun rabbit_parameter_validation:binary/2, optional},
- {<<"prefix">>, fun rabbit_parameter_validation:binary/2, mandatory},
- {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}].
+ [{<<"priority">>, fun rabbit_parameter_validation:number/2, optional},
+ {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
+ {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}].
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 3ef769c7..6d6c648a 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -400,19 +400,19 @@ blank_state_dir(Dir) ->
on_sync = fun (_) -> ok end,
unsynced_msg_ids = gb_sets:new() }.
-clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
+clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
detect_clean_shutdown(Dir) ->
- case rabbit_file:delete(clean_file_name(Dir)) of
+ case rabbit_file:delete(clean_filename(Dir)) of
ok -> true;
{error, enoent} -> false
end.
read_shutdown_terms(Dir) ->
- rabbit_file:read_term_file(clean_file_name(Dir)).
+ rabbit_file:read_term_file(clean_filename(Dir)).
store_clean_shutdown(Terms, Dir) ->
- CleanFileName = clean_file_name(Dir),
+ CleanFileName = clean_filename(Dir),
ok = rabbit_file:ensure_dir(CleanFileName),
rabbit_file:write_term_file(CleanFileName, Terms).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 19dac70c..aef48b20 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -184,6 +184,8 @@ socket_op(Sock, Fun) ->
{ok, Res} -> Res;
{error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n",
[self(), Reason]),
+ %% NB: this is tcp socket, even in case of ssl
+ rabbit_net:fast_close(Sock),
exit(normal)
end.
@@ -236,15 +238,14 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
end, "closing AMQP connection ~p (~s):~n~p~n",
[self(), ConnStr, Ex])
after
- %% The reader is the controlling process and hence its
- %% termination will close the socket. Furthermore,
- %% gen_tcp:close/1 waits for pending output to be sent, which
- %% results in unnecessary delays. However, to keep the
- %% file_handle_cache accounting as accurate as possible it
- %% would be good to close the socket immediately if we
- %% can. But we can only do this for non-ssl sockets.
- %%
- rabbit_net:maybe_fast_close(ClientSock),
+ %% We don't call gen_tcp:close/1 here since it waits for
+ %% pending output to be sent, which results in unnecessary
+ %% delays. We could just terminate - the reader is the
+ %% controlling process and hence its termination will close
+ %% the socket. However, to keep the file_handle_cache
+ %% accounting as accurate as possible we ought to close the
+ %% socket w/o delay before termination.
+ rabbit_net:fast_close(ClientSock),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
diff --git a/src/rabbit_runtime_parameter.erl b/src/rabbit_runtime_parameter.erl
index c7d30116..18668049 100644
--- a/src/rabbit_runtime_parameter.erl
+++ b/src/rabbit_runtime_parameter.erl
@@ -21,10 +21,12 @@
-type(validate_results() ::
'ok' | {error, string(), [term()]} | [validate_results()]).
--callback validate(binary(), binary(), term()) -> validate_results().
--callback validate_clear(binary(), binary()) -> validate_results().
--callback notify(binary(), binary(), term()) -> 'ok'.
--callback notify_clear(binary(), binary()) -> 'ok'.
+-callback validate(rabbit_types:vhost(), binary(), binary(),
+ term()) -> validate_results().
+-callback validate_clear(rabbit_types:vhost(), binary(),
+ binary()) -> validate_results().
+-callback notify(rabbit_types:vhost(), binary(), binary(), term()) -> 'ok'.
+-callback notify_clear(rabbit_types:vhost(), binary(), binary()) -> 'ok'.
-else.
@@ -32,10 +34,10 @@
behaviour_info(callbacks) ->
[
- {validate, 3},
- {validate_clear, 2},
- {notify, 3},
- {notify_clear, 2}
+ {validate, 4},
+ {validate_clear, 3},
+ {notify, 4},
+ {notify_clear, 3}
];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index 3a54e8f6..b58b459a 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -18,8 +18,9 @@
-include("rabbit.hrl").
--export([parse_set/3, set/3, clear/2, list/0, list/1, list_strict/1,
- list_formatted/0, lookup/2, value/2, value/3, info_keys/0]).
+-export([parse_set/4, set/4, clear/3,
+ list/0, list/1, list_strict/1, list/2, list_strict/2, list_formatted/1,
+ lookup/3, value/3, value/4, info_keys/0]).
%%----------------------------------------------------------------------------
@@ -27,16 +28,23 @@
-type(ok_or_error_string() :: 'ok' | {'error_string', string()}).
--spec(parse_set/3 :: (binary(), binary(), string()) -> ok_or_error_string()).
--spec(set/3 :: (binary(), binary(), term()) -> ok_or_error_string()).
--spec(clear/2 :: (binary(), binary()) -> ok_or_error_string()).
+-spec(parse_set/4 :: (rabbit_types:vhost(), binary(), binary(), string())
+ -> ok_or_error_string()).
+-spec(set/4 :: (rabbit_types:vhost(), binary(), binary(), term())
+ -> ok_or_error_string()).
+-spec(clear/3 :: (rabbit_types:vhost(), binary(), binary())
+ -> ok_or_error_string()).
-spec(list/0 :: () -> [rabbit_types:infos()]).
--spec(list/1 :: (binary()) -> [rabbit_types:infos()]).
+-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(list_strict/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found').
--spec(list_formatted/0 :: () -> [rabbit_types:infos()]).
--spec(lookup/2 :: (binary(), binary()) -> rabbit_types:infos()).
--spec(value/2 :: (binary(), binary()) -> term()).
--spec(value/3 :: (binary(), binary(), term()) -> term()).
+-spec(list/2 :: (rabbit_types:vhost(), binary()) -> [rabbit_types:infos()]).
+-spec(list_strict/2 :: (rabbit_types:vhost(), binary())
+ -> [rabbit_types:infos()] | 'not_found').
+-spec(list_formatted/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
+-spec(lookup/3 :: (rabbit_types:vhost(), binary(), binary())
+ -> rabbit_types:infos()).
+-spec(value/3 :: (rabbit_types:vhost(), binary(), binary()) -> term()).
+-spec(value/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> term()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-endif.
@@ -49,14 +57,14 @@
%%---------------------------------------------------------------------------
-parse_set(Component, Key, String) ->
- case parse(String) of
- {ok, Term} -> set(Component, Key, Term);
- {errors, L} -> format_error(L)
+parse_set(VHost, Component, Key, String) ->
+ case rabbit_misc:json_decode(String) of
+ {ok, JSON} -> set(VHost, Component, Key, rabbit_misc:json_to_term(JSON));
+ error -> {error_string, "JSON decoding error"}
end.
-set(Component, Key, Term) ->
- case set0(Component, Key, Term) of
+set(VHost, Component, Key, Term) ->
+ case set0(VHost, Component, Key, Term) of
ok -> ok;
{errors, L} -> format_error(L)
end.
@@ -64,21 +72,16 @@ set(Component, Key, Term) ->
format_error(L) ->
{error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}.
-set0(Component, Key, Term) ->
+set0(VHost, Component, Key, Term) ->
case lookup_component(Component) of
{ok, Mod} ->
- case flatten_errors(validate(Term)) of
+ case flatten_errors(Mod:validate(VHost, Component, Key, Term)) of
ok ->
- case flatten_errors(Mod:validate(Component, Key, Term)) of
- ok ->
- case mnesia_update(Component, Key, Term) of
- {old, Term} -> ok;
- _ -> Mod:notify(Component, Key, Term)
- end,
- ok;
- E ->
- E
- end;
+ case mnesia_update(VHost, Component, Key, Term) of
+ {old, Term} -> ok;
+ _ -> Mod:notify(VHost, Component, Key, Term)
+ end,
+ ok;
E ->
E
end;
@@ -86,95 +89,103 @@ set0(Component, Key, Term) ->
E
end.
-mnesia_update(Component, Key, Term) ->
+mnesia_update(VHost, Component, Key, Term) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- Res = case mnesia:read(?TABLE, {Component, Key}, read) of
+ Res = case mnesia:read(?TABLE, {VHost, Component, Key}, read) of
[] -> new;
[Params] -> {old, Params#runtime_parameters.value}
end,
- ok = mnesia:write(?TABLE, c(Component, Key, Term), write),
+ ok = mnesia:write(?TABLE, c(VHost, Component, Key, Term), write),
Res
end).
-clear(Component, Key) ->
- case clear0(Component, Key) of
+clear(VHost, Component, Key) ->
+ case clear0(VHost, Component, Key) of
ok -> ok;
{errors, L} -> format_error(L)
end.
-clear0(Component, Key) ->
+clear0(VHost, Component, Key) ->
case lookup_component(Component) of
- {ok, Mod} -> case flatten_errors(Mod:validate_clear(Component, Key)) of
- ok -> mnesia_clear(Component, Key),
- Mod:notify_clear(Component, Key),
+ {ok, Mod} -> case flatten_errors(
+ Mod:validate_clear(VHost, Component, Key)) of
+ ok -> mnesia_clear(VHost, Component, Key),
+ Mod:notify_clear(VHost, Component, Key),
ok;
E -> E
end;
E -> E
end.
-mnesia_clear(Component, Key) ->
+mnesia_clear(VHost, Component, Key) ->
ok = rabbit_misc:execute_mnesia_transaction(
fun () ->
- ok = mnesia:delete(?TABLE, {Component, Key}, write)
+ ok = mnesia:delete(?TABLE, {VHost, Component, Key}, write)
end).
list() ->
[p(P) || P <- rabbit_misc:dirty_read_all(?TABLE)].
-list(Component) -> list(Component, []).
-list_strict(Component) -> list(Component, not_found).
-
-list(Component, Default) ->
- case lookup_component(Component) of
- {ok, _} -> Match = #runtime_parameters{key = {Component, '_'}, _ = '_'},
- [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)];
- _ -> Default
+list(VHost) -> list(VHost, '_', []).
+list_strict(Component) -> list('_', Component, not_found).
+list(VHost, Component) -> list(VHost, Component, []).
+list_strict(VHost, Component) -> list(VHost, Component, not_found).
+
+list(VHost, Component, Default) ->
+ case component_good(Component) of
+ true -> Match = #runtime_parameters{key = {VHost, Component, '_'},
+ _ = '_'},
+ [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)];
+ _ -> Default
end.
-list_formatted() ->
- [pset(value, format(pget(value, P)), P) || P <- list()].
+list_formatted(VHost) ->
+ [pset(value, format(pget(value, P)), P) || P <- list(VHost)].
-lookup(Component, Key) ->
- case lookup0(Component, Key, rabbit_misc:const(not_found)) of
+lookup(VHost, Component, Key) ->
+ case lookup0(VHost, Component, Key, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> p(Params)
end.
-value(Component, Key) ->
- case lookup0(Component, Key, rabbit_misc:const(not_found)) of
+value(VHost, Component, Key) ->
+ case lookup0(VHost, Component, Key, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> Params#runtime_parameters.value
end.
-value(Component, Key, Default) ->
- Params = lookup0(Component, Key,
- fun () -> lookup_missing(Component, Key, Default) end),
+value(VHost, Component, Key, Default) ->
+ Params = lookup0(VHost, Component, Key,
+ fun () ->
+ lookup_missing(VHost, Component, Key, Default)
+ end),
Params#runtime_parameters.value.
-lookup0(Component, Key, DefaultFun) ->
- case mnesia:dirty_read(?TABLE, {Component, Key}) of
+lookup0(VHost, Component, Key, DefaultFun) ->
+ case mnesia:dirty_read(?TABLE, {VHost, Component, Key}) of
[] -> DefaultFun();
[R] -> R
end.
-lookup_missing(Component, Key, Default) ->
+lookup_missing(VHost, Component, Key, Default) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:read(?TABLE, {Component, Key}, read) of
- [] -> Record = c(Component, Key, Default),
+ case mnesia:read(?TABLE, {VHost, Component, Key}, read) of
+ [] -> Record = c(VHost, Component, Key, Default),
mnesia:write(?TABLE, Record, write),
Record;
[R] -> R
end
end).
-c(Component, Key, Default) -> #runtime_parameters{key = {Component, Key},
- value = Default}.
+c(VHost, Component, Key, Default) ->
+ #runtime_parameters{key = {VHost, Component, Key},
+ value = Default}.
-p(#runtime_parameters{key = {Component, Key}, value = Value}) ->
- [{component, Component},
+p(#runtime_parameters{key = {VHost, Component, Key}, value = Value}) ->
+ [{vhost, VHost},
+ {component, Component},
{key, Key},
{value, Value}].
@@ -182,6 +193,12 @@ info_keys() -> [component, key, value].
%%---------------------------------------------------------------------------
+component_good('_') -> true;
+component_good(Component) -> case lookup_component(Component) of
+ {ok, _} -> true;
+ _ -> false
+ end.
+
lookup_component(Component) ->
case rabbit_registry:lookup_module(
runtime_parameter, list_to_atom(binary_to_list(Component))) of
@@ -190,51 +207,9 @@ lookup_component(Component) ->
{ok, Module} -> {ok, Module}
end.
-parse(Src0) ->
- Src1 = string:strip(Src0),
- Src = case lists:reverse(Src1) of
- [$. |_] -> Src1;
- _ -> Src1 ++ "."
- end,
- case erl_scan:string(Src) of
- {ok, Scanned, _} ->
- case erl_parse:parse_term(Scanned) of
- {ok, Parsed} ->
- {ok, Parsed};
- {error, E} ->
- {errors,
- [{"Could not parse value: ~s", [format_parse_error(E)]}]}
- end;
- {error, E, _} ->
- {errors, [{"Could not scan value: ~s", [format_parse_error(E)]}]}
- end.
-
-format_parse_error({_Line, Mod, Err}) ->
- lists:flatten(Mod:format_error(Err)).
-
format(Term) ->
- list_to_binary(rabbit_misc:format("~p", [Term])).
-
-%%---------------------------------------------------------------------------
-
-%% We will want to be able to biject these to JSON. So we have some
-%% generic restrictions on what we consider acceptable.
-validate(Proplist = [T | _]) when is_tuple(T) -> validate_proplist(Proplist);
-validate(L) when is_list(L) -> validate_list(L);
-validate(T) when is_tuple(T) -> {error, "tuple: ~p", [T]};
-validate(B) when is_boolean(B) -> ok;
-validate(null) -> ok;
-validate(A) when is_atom(A) -> {error, "atom: ~p", [A]};
-validate(N) when is_number(N) -> ok;
-validate(B) when is_binary(B) -> ok;
-validate(B) when is_bitstring(B) -> {error, "bitstring: ~p", [B]}.
-
-validate_list(L) -> [validate(I) || I <- L].
-validate_proplist(L) -> [vp(I) || I <- L].
-
-vp({K, V}) when is_binary(K) -> validate(V);
-vp({K, _V}) -> {error, "bad key: ~p", [K]};
-vp(H) -> {error, "not two tuple: ~p", [H]}.
+ {ok, JSON} = rabbit_misc:json_encode(rabbit_misc:term_to_json(Term)),
+ list_to_binary(JSON).
flatten_errors(L) ->
case [{F, A} || I <- lists:flatten([L]), {error, F, A} <- [I]] of
diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl
index f23b3227..5224ccaa 100644
--- a/src/rabbit_runtime_parameters_test.erl
+++ b/src/rabbit_runtime_parameters_test.erl
@@ -17,7 +17,7 @@
-module(rabbit_runtime_parameters_test).
-behaviour(rabbit_runtime_parameter).
--export([validate/3, validate_clear/2, notify/3, notify_clear/2]).
+-export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
-export([register/0, unregister/0]).
register() ->
@@ -26,13 +26,13 @@ register() ->
unregister() ->
rabbit_registry:unregister(runtime_parameter, <<"test">>).
-validate(<<"test">>, <<"good">>, _Term) -> ok;
-validate(<<"test">>, <<"maybe">>, <<"good">>) -> ok;
-validate(<<"test">>, _, _) -> {error, "meh", []}.
+validate(_, <<"test">>, <<"good">>, _Term) -> ok;
+validate(_, <<"test">>, <<"maybe">>, <<"good">>) -> ok;
+validate(_, <<"test">>, _, _) -> {error, "meh", []}.
-validate_clear(<<"test">>, <<"good">>) -> ok;
-validate_clear(<<"test">>, <<"maybe">>) -> ok;
-validate_clear(<<"test">>, _) -> {error, "meh", []}.
+validate_clear(_, <<"test">>, <<"good">>) -> ok;
+validate_clear(_, <<"test">>, <<"maybe">>) -> ok;
+validate_clear(_, <<"test">>, _) -> {error, "meh", []}.
-notify(_, _, _) -> ok.
-notify_clear(_, _) -> ok.
+notify(_, _, _, _) -> ok.
+notify_clear(_, _, _) -> ok.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index bb60bd12..3cc0e5db 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -32,6 +32,8 @@
-define(TIMEOUT, 5000).
all_tests() ->
+ ok = setup_cluster(),
+ ok = supervisor2_tests:test_all(),
passed = gm_tests:all_tests(),
passed = mirrored_supervisor_tests:all_tests(),
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
@@ -52,36 +54,61 @@ all_tests() ->
passed = test_log_management_during_startup(),
passed = test_statistics(),
passed = test_arguments_parser(),
- passed = test_cluster_management(),
passed = test_user_management(),
passed = test_runtime_parameters(),
passed = test_server_status(),
passed = test_confirms(),
- passed = maybe_run_cluster_dependent_tests(),
+ passed =
+ do_if_secondary_node(
+ fun run_cluster_dependent_tests/1,
+ fun (SecondaryNode) ->
+ io:format("Skipping cluster dependent tests with node ~p~n",
+ [SecondaryNode]),
+ passed
+ end),
passed = test_configurable_server_properties(),
passed.
-maybe_run_cluster_dependent_tests() ->
+do_if_secondary_node(Up, Down) ->
SecondaryNode = rabbit_nodes:make("hare"),
case net_adm:ping(SecondaryNode) of
- pong -> passed = run_cluster_dependent_tests(SecondaryNode);
- pang -> io:format("Skipping cluster dependent tests with node ~p~n",
- [SecondaryNode])
- end,
- passed.
+ pong -> Up(SecondaryNode);
+ pang -> Down(SecondaryNode)
+ end.
-run_cluster_dependent_tests(SecondaryNode) ->
- SecondaryNodeS = atom_to_list(SecondaryNode),
+setup_cluster() ->
+ do_if_secondary_node(
+ fun (SecondaryNode) ->
+ cover:stop(SecondaryNode),
+ ok = control_action(stop_app, []),
+ %% 'cover' does not cope at all well with nodes disconnecting,
+ %% which happens as part of reset. So we turn it off
+ %% temporarily. That is ok even if we're not in general using
+ %% cover, it just turns the engine on / off and doesn't log
+ %% anything. Note that this way cover won't be on when joining
+ %% the cluster, but this is OK since we're testing the clustering
+ %% interface elsewere anyway.
+ cover:stop(nodes()),
+ ok = control_action(join_cluster,
+ [atom_to_list(SecondaryNode)]),
+ cover:start(nodes()),
+ ok = control_action(start_app, []),
+ ok = control_action(start_app, SecondaryNode, [], [])
+ end,
+ fun (_) -> ok end).
- cover:stop(SecondaryNode),
- ok = control_action(stop_app, []),
- ok = control_action(reset, []),
- ok = control_action(cluster, [SecondaryNodeS]),
- ok = control_action(start_app, []),
- cover:start(SecondaryNode),
- ok = control_action(start_app, SecondaryNode, [], []),
+maybe_run_cluster_dependent_tests() ->
+ do_if_secondary_node(
+ fun (SecondaryNode) ->
+ passed = run_cluster_dependent_tests(SecondaryNode)
+ end,
+ fun (SecondaryNode) ->
+ io:format("Skipping cluster dependent tests with node ~p~n",
+ [SecondaryNode])
+ end).
+run_cluster_dependent_tests(SecondaryNode) ->
io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]),
passed = test_delegates_async(SecondaryNode),
passed = test_delegates_sync(SecondaryNode),
@@ -856,200 +883,6 @@ test_arguments_parser() ->
passed.
-test_cluster_management() ->
- %% 'cluster' and 'reset' should only work if the app is stopped
- {error, _} = control_action(cluster, []),
- {error, _} = control_action(reset, []),
- {error, _} = control_action(force_reset, []),
-
- ok = control_action(stop_app, []),
-
- %% various ways of creating a standalone node
- NodeS = atom_to_list(node()),
- ClusteringSequence = [[],
- [NodeS],
- ["invalid@invalid", NodeS],
- [NodeS, "invalid@invalid"]],
-
- ok = control_action(reset, []),
- lists:foreach(fun (Arg) ->
- ok = control_action(force_cluster, Arg),
- ok
- end,
- ClusteringSequence),
- lists:foreach(fun (Arg) ->
- ok = control_action(reset, []),
- ok = control_action(force_cluster, Arg),
- ok
- end,
- ClusteringSequence),
- ok = control_action(reset, []),
- lists:foreach(fun (Arg) ->
- ok = control_action(force_cluster, Arg),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok
- end,
- ClusteringSequence),
- lists:foreach(fun (Arg) ->
- ok = control_action(reset, []),
- ok = control_action(force_cluster, Arg),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok
- end,
- ClusteringSequence),
-
- %% convert a disk node into a ram node
- ok = control_action(reset, []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- ok = assert_ram_node(),
-
- %% join a non-existing cluster as a ram node
- ok = control_action(reset, []),
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- ok = assert_ram_node(),
-
- ok = control_action(reset, []),
-
- SecondaryNode = rabbit_nodes:make("hare"),
- case net_adm:ping(SecondaryNode) of
- pong -> passed = test_cluster_management2(SecondaryNode);
- pang -> io:format("Skipping clustering tests with node ~p~n",
- [SecondaryNode])
- end,
-
- ok = control_action(start_app, []),
- passed.
-
-test_cluster_management2(SecondaryNode) ->
- NodeS = atom_to_list(node()),
- SecondaryNodeS = atom_to_list(SecondaryNode),
-
- %% make a disk node
- ok = control_action(cluster, [NodeS]),
- ok = assert_disc_node(),
- %% make a ram node
- ok = control_action(reset, []),
- ok = control_action(cluster, [SecondaryNodeS]),
- ok = assert_ram_node(),
-
- %% join cluster as a ram node
- ok = control_action(reset, []),
- ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_ram_node(),
-
- %% ram node will not start by itself
- ok = control_action(stop_app, []),
- ok = control_action(stop_app, SecondaryNode, [], []),
- {error, _} = control_action(start_app, []),
- ok = control_action(start_app, SecondaryNode, [], []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
-
- %% change cluster config while remaining in same cluster
- ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
-
- %% join non-existing cluster as a ram node
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- {error, _} = control_action(start_app, []),
- ok = assert_ram_node(),
-
- %% join empty cluster as a ram node (converts to disc)
- ok = control_action(cluster, []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
-
- %% make a new ram node
- ok = control_action(reset, []),
- ok = control_action(force_cluster, [SecondaryNodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_ram_node(),
-
- %% turn ram node into disk node
- ok = control_action(cluster, [SecondaryNodeS, NodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
-
- %% convert a disk node into a ram node
- ok = assert_disc_node(),
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- ok = assert_ram_node(),
-
- %% make a new disk node
- ok = control_action(force_reset, []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
-
- %% turn a disk node into a ram node
- ok = control_action(reset, []),
- ok = control_action(cluster, [SecondaryNodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_ram_node(),
-
- %% NB: this will log an inconsistent_database error, which is harmless
- %% Turning cover on / off is OK even if we're not in general using cover,
- %% it just turns the engine on / off, doesn't actually log anything.
- cover:stop([SecondaryNode]),
- true = disconnect_node(SecondaryNode),
- pong = net_adm:ping(SecondaryNode),
- cover:start([SecondaryNode]),
-
- %% leaving a cluster as a ram node
- ok = control_action(reset, []),
- %% ...and as a disk node
- ok = control_action(cluster, [SecondaryNodeS, NodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- cover:stop(SecondaryNode),
- ok = control_action(reset, []),
- cover:start(SecondaryNode),
-
- %% attempt to leave cluster when no other node is alive
- ok = control_action(cluster, [SecondaryNodeS, NodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, SecondaryNode, [], []),
- ok = control_action(stop_app, []),
- {error, {no_running_cluster_nodes, _, _}} =
- control_action(reset, []),
-
- %% attempt to change type when no other node is alive
- {error, {no_running_cluster_nodes, _, _}} =
- control_action(cluster, [SecondaryNodeS]),
-
- %% leave system clustered, with the secondary node as a ram node
- ok = control_action(force_reset, []),
- ok = control_action(start_app, []),
- %% Yes, this is rather ugly. But since we're a clustered Mnesia
- %% node and we're telling another clustered node to reset itself,
- %% we will get disconnected half way through causing a
- %% badrpc. This never happens in real life since rabbitmqctl is
- %% not a clustered Mnesia node.
- cover:stop(SecondaryNode),
- {badrpc, nodedown} = control_action(force_reset, SecondaryNode, [], []),
- pong = net_adm:ping(SecondaryNode),
- cover:start(SecondaryNode),
- ok = control_action(cluster, SecondaryNode, [NodeS], []),
- ok = control_action(start_app, SecondaryNode, [], []),
-
- passed.
-
test_user_management() ->
%% lots if stuff that should fail
@@ -1135,22 +968,21 @@ test_runtime_parameters() ->
Bad = fun(L) -> {error_string, _} = control_action(set_parameter, L) end,
%% Acceptable for bijection
- Good(["test", "good", "<<\"ignore\">>"]),
+ Good(["test", "good", "\"ignore\""]),
Good(["test", "good", "123"]),
Good(["test", "good", "true"]),
Good(["test", "good", "false"]),
Good(["test", "good", "null"]),
- Good(["test", "good", "[{<<\"key\">>, <<\"value\">>}]"]),
+ Good(["test", "good", "{\"key\": \"value\"}"]),
- %% Various forms of fail due to non-bijectability
+ %% Invalid json
Bad(["test", "good", "atom"]),
- Bad(["test", "good", "{tuple, foo}"]),
- Bad(["test", "good", "[{<<\"key\">>, <<\"value\">>, 1}]"]),
- Bad(["test", "good", "[{key, <<\"value\">>}]"]),
+ Bad(["test", "good", "{\"foo\": \"bar\""]),
+ Bad(["test", "good", "{foo: \"bar\"}"]),
%% Test actual validation hook
- Good(["test", "maybe", "<<\"good\">>"]),
- Bad(["test", "maybe", "<<\"bad\">>"]),
+ Good(["test", "maybe", "\"good\""]),
+ Bad(["test", "maybe", "\"bad\""]),
ok = control_action(list_parameters, []),
@@ -1216,7 +1048,15 @@ test_server_status() ->
ok = control_action(list_consumers, []),
%% set vm memory high watermark
+ HWM = vm_memory_monitor:get_vm_memory_high_watermark(),
+ ok = control_action(set_vm_memory_high_watermark, ["1"]),
ok = control_action(set_vm_memory_high_watermark, ["1.0"]),
+ ok = control_action(set_vm_memory_high_watermark, [float_to_list(HWM)]),
+
+ %% eval
+ {parse_error, _} = control_action(eval, ["\""]),
+ {parse_error, _} = control_action(eval, ["a("]),
+ ok = control_action(eval, ["a."]),
%% cleanup
[{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]],
@@ -2447,10 +2287,10 @@ test_dropwhile(VQ0) ->
fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
%% drop the first 5 messages
- {undefined, VQ2} = rabbit_variable_queue:dropwhile(
- fun(#message_properties { expiry = Expiry }) ->
- Expiry =< 5
- end, false, VQ1),
+ {_, undefined, VQ2} = rabbit_variable_queue:dropwhile(
+ fun(#message_properties { expiry = Expiry }) ->
+ Expiry =< 5
+ end, false, VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
@@ -2467,11 +2307,11 @@ test_dropwhile(VQ0) ->
test_dropwhile_varying_ram_duration(VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- {undefined, VQ3} = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, false, VQ2),
+ {_, undefined, VQ3} = rabbit_variable_queue:dropwhile(
+ fun(_) -> false end, false, VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- {undefined, VQ6} =
+ {_, undefined, VQ6} =
rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5),
VQ6.
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 732c29b6..8966bcab 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -64,7 +64,7 @@
#basic_message{exchange_name :: rabbit_exchange:name(),
routing_keys :: [rabbit_router:routing_key()],
content :: content(),
- id :: msg_id(),
+ id :: msg_id(),
is_persistent :: boolean()}).
-type(message() :: basic_message()).
-type(delivery() ::
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index e1a7bcae..3fbfeed0 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -121,10 +121,7 @@ remove_backup() ->
info("upgrades: Mnesia backup removed~n", []).
maybe_upgrade_mnesia() ->
- %% rabbit_mnesia:all_clustered_nodes/0 will return [] at this point
- %% if we are a RAM node since Mnesia has not started yet.
- AllNodes = lists:usort(rabbit_mnesia:all_clustered_nodes() ++
- rabbit_mnesia:read_cluster_nodes_config()),
+ AllNodes = rabbit_mnesia:all_clustered_nodes(),
case rabbit_version:upgrades_required(mnesia) of
{error, starting_from_scratch} ->
ok;
@@ -150,12 +147,12 @@ maybe_upgrade_mnesia() ->
upgrade_mode(AllNodes) ->
case nodes_running(AllNodes) of
[] ->
- AfterUs = rabbit_mnesia:read_previously_running_nodes(),
+ AfterUs = rabbit_mnesia:running_clustered_nodes() -- [node()],
case {is_disc_node_legacy(), AfterUs} of
{true, []} ->
primary;
{true, _} ->
- Filename = rabbit_mnesia:running_nodes_filename(),
+ Filename = rabbit_node_monitor:running_nodes_filename(),
die("Cluster upgrade needed but other disc nodes shut "
"down after this one.~nPlease first start the last "
"disc node to shut down.~n~nNote: if several disc "
@@ -222,15 +219,8 @@ secondary_upgrade(AllNodes) ->
IsDiscNode = is_disc_node_legacy(),
rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
cannot_delete_schema),
- %% Note that we cluster with all nodes, rather than all disc nodes
- %% (as we can't know all disc nodes at this point). This is safe as
- %% we're not writing the cluster config, just setting up Mnesia.
- ClusterNodes = case IsDiscNode of
- true -> AllNodes;
- false -> AllNodes -- [node()]
- end,
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
- ok = rabbit_mnesia:init_db(ClusterNodes, true, fun () -> ok end),
+ ok = rabbit_mnesia:init_db(AllNodes, IsDiscNode, true),
ok = rabbit_version:record_desired_for_scope(mnesia),
ok.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 49213c95..98c45717 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -19,8 +19,8 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
publish/4, publish_delivered/5, drain_confirmed/1,
dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
- set_ram_duration_target/2, ram_duration/1, needs_timeout/1,
- timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
+ depth/1, set_ram_duration_target/2, ram_duration/1,
+ needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]).
-export([start/1, stop/0]).
@@ -589,12 +589,12 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
dropwhile(Pred, AckRequired, State, Msgs) ->
- End = fun(S) when AckRequired -> {lists:reverse(Msgs), S};
- (S) -> {undefined, S}
+ End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S};
+ (Next, S) -> {Next, undefined, S}
end,
case queue_out(State) of
{empty, State1} ->
- End(a(State1));
+ End(undefined, a(State1));
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case {Pred(MsgProps), AckRequired} of
{true, true} ->
@@ -606,7 +606,7 @@ dropwhile(Pred, AckRequired, State, Msgs) ->
{_, State2} = internal_fetch(false, MsgStatus, State1),
dropwhile(Pred, AckRequired, State2, undefined);
{false, _} ->
- End(a(in_r(MsgStatus, State1)))
+ End(MsgProps, a(in_r(MsgStatus, State1)))
end
end.
@@ -681,6 +681,9 @@ len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
+depth(State = #vqstate { pending_ack = Ack }) ->
+ len(State) + gb_trees:size(Ack).
+
set_ram_duration_target(
DurationTarget, State = #vqstate {
rates = #rates { avg_egress = AvgEgressRate,
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 5548ef6d..03dfbe24 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -90,12 +90,13 @@ delete(VHostPath) ->
R.
internal_delete(VHostPath) ->
- lists:foreach(
- fun (Info) ->
- ok = rabbit_auth_backend_internal:clear_permissions(
- proplists:get_value(user, Info), VHostPath)
- end,
- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)),
+ [ok = rabbit_auth_backend_internal:clear_permissions(
+ proplists:get_value(user, Info), VHostPath)
+ || Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)],
+ [ok = rabbit_runtime_parameters:clear(VHostPath,
+ proplists:get_value(component, Info),
+ proplists:get_value(key, Info))
+ || Info <- rabbit_runtime_parameters:list(VHostPath)],
ok = mnesia:delete({rabbit_vhost, VHostPath}),
ok.
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 3d3623d7..5af38573 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -255,10 +255,10 @@ behaviour_info(_Other) ->
%%% ---------------------------------------------------
start_link(Mod, Args) ->
gen_server:start_link(?MODULE, {self, Mod, Args}, []).
-
+
start_link(SupName, Mod, Args) ->
gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []).
-
+
%%% ---------------------------------------------------
%%% Interface functions.
%%% ---------------------------------------------------
@@ -298,9 +298,9 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) ->
check_childspecs(X) -> {error, {badarg, X}}.
%%% ---------------------------------------------------
-%%%
+%%%
%%% Initialize the supervisor.
-%%%
+%%%
%%% ---------------------------------------------------
init({SupName, Mod, Args}) ->
process_flag(trap_exit, true),
@@ -319,7 +319,7 @@ init({SupName, Mod, Args}) ->
Error ->
{stop, {bad_return, {Mod, init, Error}}}
end.
-
+
init_children(State, StartSpec) ->
SupName = State#state.name,
case check_startspec(StartSpec) of
@@ -349,7 +349,7 @@ init_dynamic(_State, StartSpec) ->
%% Func: start_children/2
%% Args: Children = [#child] in start order
%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod}
-%% Purpose: Start all children. The new list contains #child's
+%% Purpose: Start all children. The new list contains #child's
%% with pids.
%% Returns: {ok, NChildren} | {error, NChildren}
%% NChildren = [#child] in termination order (reversed
@@ -381,7 +381,7 @@ do_start_child(SupName, Child) ->
NChild = Child#child{pid = Pid},
report_progress(NChild, SupName),
{ok, Pid, Extra};
- ignore ->
+ ignore ->
{ok, undefined};
{error, What} -> {error, What};
What -> {error, What}
@@ -400,12 +400,12 @@ do_start_child_i(M, F, A) ->
What ->
{error, What}
end.
-
+
%%% ---------------------------------------------------
-%%%
+%%%
%%% Callback functions.
-%%%
+%%%
%%% ---------------------------------------------------
handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) ->
#child{mfa = {M, F, A}} = hd(State#state.children),
@@ -414,11 +414,11 @@ handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) ->
{ok, undefined} ->
{reply, {ok, undefined}, State};
{ok, Pid} ->
- NState = State#state{dynamics =
+ NState = State#state{dynamics =
?DICT:store(Pid, Args, State#state.dynamics)},
{reply, {ok, Pid}, NState};
{ok, Pid, Extra} ->
- NState = State#state{dynamics =
+ NState = State#state{dynamics =
?DICT:store(Pid, Args, State#state.dynamics)},
{reply, {ok, Pid, Extra}, NState};
What ->
@@ -497,7 +497,7 @@ handle_call(which_children, _From, State) ->
%%% Hopefully cause a function-clause as there is no API function
%%% that utilizes cast.
handle_cast(null, State) ->
- error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n",
+ error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n",
[]),
{noreply, State}.
@@ -527,7 +527,7 @@ handle_info({'EXIT', Pid, Reason}, State) ->
end;
handle_info(Msg, State) ->
- error_logger:error_msg("Supervisor received unexpected message: ~p~n",
+ error_logger:error_msg("Supervisor received unexpected message: ~p~n",
[Msg]),
{noreply, State}.
%%
@@ -577,13 +577,13 @@ check_flags({Strategy, MaxIntensity, Period}) ->
check_flags(What) ->
{bad_flags, What}.
-update_childspec(State, StartSpec) when ?is_simple(State) ->
- case check_startspec(StartSpec) of
- {ok, [Child]} ->
- {ok, State#state{children = [Child]}};
- Error ->
- {error, Error}
- end;
+update_childspec(State, StartSpec) when ?is_simple(State) ->
+ case check_startspec(StartSpec) of
+ {ok, [Child]} ->
+ {ok, State#state{children = [Child]}};
+ Error ->
+ {error, Error}
+ end;
update_childspec(State, StartSpec) ->
case check_startspec(StartSpec) of
@@ -604,7 +604,7 @@ update_childspec1([Child|OldC], Children, KeepOld) ->
end;
update_childspec1([], Children, KeepOld) ->
% Return them in (keeped) reverse start order.
- lists:reverse(Children ++ KeepOld).
+ lists:reverse(Children ++ KeepOld).
update_chsp(OldCh, Children) ->
case lists:map(fun (Ch) when OldCh#child.name =:= Ch#child.name ->
@@ -618,7 +618,7 @@ update_chsp(OldCh, Children) ->
NewC ->
{ok, NewC}
end.
-
+
%%% ---------------------------------------------------
%%% Start a new child.
%%% ---------------------------------------------------
@@ -630,12 +630,12 @@ handle_start_child(Child, State) ->
{ok, Pid} ->
Children = State#state.children,
{{ok, Pid},
- State#state{children =
+ State#state{children =
[Child#child{pid = Pid}|Children]}};
{ok, Pid, Extra} ->
Children = State#state.children,
{{ok, Pid, Extra},
- State#state{children =
+ State#state{children =
[Child#child{pid = Pid}|Children]}};
{error, What} ->
{{error, {What, Child}}, State}
@@ -816,29 +816,32 @@ terminate_simple_children(Child, Dynamics, SupName) ->
{Replies, Timedout} =
lists:foldl(
fun (_Pid, {Replies, Timedout}) ->
- {Reply, Timedout1} =
+ {Pid1, Reason1, Timedout1} =
receive
TimeoutMsg ->
Remaining = Pids -- [P || {P, _} <- Replies],
[exit(P, kill) || P <- Remaining],
- receive {'DOWN', _MRef, process, Pid, Reason} ->
- {{error, Reason}, true}
+ receive
+ {'DOWN', _MRef, process, Pid, Reason} ->
+ {Pid, Reason, true}
end;
{'DOWN', _MRef, process, Pid, Reason} ->
- {child_res(Child, Reason, Timedout), Timedout};
- {'EXIT', Pid, Reason} ->
- receive {'DOWN', _MRef, process, Pid, _} ->
- {{error, Reason}, Timedout}
- end
+ {Pid, Reason, Timedout}
end,
- {[{Pid, Reply} | Replies], Timedout1}
+ {[{Pid1, child_res(Child, Reason1, Timedout1)} | Replies],
+ Timedout1}
end, {[], false}, Pids),
timeout_stop(Child, TRef, TimeoutMsg, Timedout),
ReportError = shutdown_error_reporter(SupName),
- [case Reply of
- {_Pid, ok} -> ok;
- {Pid, {error, R}} -> ReportError(R, Child#child{pid = Pid})
- end || Reply <- Replies],
+ Report = fun(_, ok) -> ok;
+ (Pid, {error, R}) -> ReportError(R, Child#child{pid = Pid})
+ end,
+ [receive
+ {'EXIT', Pid, Reason} ->
+ Report(Pid, child_res(Child, Reason, Timedout))
+ after
+ 0 -> Report(Pid, Reply)
+ end || {Pid, Reply} <- Replies],
ok.
child_exit_reason(#child{shutdown = brutal_kill}) -> kill;
@@ -863,7 +866,7 @@ timeout_stop(#child{shutdown = Time}, TRef, Msg, false) when is_integer(Time) ->
after
0 -> ok
end;
-timeout_stop(#child{}, ok, _Msg, _Timedout) ->
+timeout_stop(#child{}, _TRef, _Msg, _Timedout) ->
ok.
do_terminate(Child, SupName) when Child#child.pid =/= undefined ->
@@ -885,17 +888,17 @@ do_terminate(Child, _SupName) ->
Child.
%%-----------------------------------------------------------------
-%% Shutdowns a child. We must check the EXIT value
+%% Shutdowns a child. We must check the EXIT value
%% of the child, because it might have died with another reason than
-%% the wanted. In that case we want to report the error. We put a
-%% monitor on the child an check for the 'DOWN' message instead of
-%% checking for the 'EXIT' message, because if we check the 'EXIT'
-%% message a "naughty" child, who does unlink(Sup), could hang the
-%% supervisor.
+%% the wanted. In that case we want to report the error. We put a
+%% monitor on the child an check for the 'DOWN' message instead of
+%% checking for the 'EXIT' message, because if we check the 'EXIT'
+%% message a "naughty" child, who does unlink(Sup), could hang the
+%% supervisor.
%% Returns: ok | {error, OtherReason} (this should be reported)
%%-----------------------------------------------------------------
shutdown(Pid, brutal_kill) ->
-
+
case monitor_child(Pid) of
ok ->
exit(Pid, kill),
@@ -905,16 +908,16 @@ shutdown(Pid, brutal_kill) ->
{'DOWN', _MRef, process, Pid, OtherReason} ->
{error, OtherReason}
end;
- {error, Reason} ->
+ {error, Reason} ->
{error, Reason}
end;
shutdown(Pid, Time) ->
-
+
case monitor_child(Pid) of
ok ->
exit(Pid, shutdown), %% Try to shutdown gracefully
- receive
+ receive
{'DOWN', _MRef, process, Pid, shutdown} ->
ok;
{'DOWN', _MRef, process, Pid, OtherReason} ->
@@ -926,14 +929,14 @@ shutdown(Pid, Time) ->
{error, OtherReason}
end
end;
- {error, Reason} ->
+ {error, Reason} ->
{error, Reason}
end.
%% Help function to shutdown/2 switches from link to monitor approach
monitor_child(Pid) ->
-
- %% Do the monitor operation first so that if the child dies
+
+ %% Do the monitor operation first so that if the child dies
%% before the monitoring is done causing a 'DOWN'-message with
%% reason noproc, we will get the real reason in the 'EXIT'-message
%% unless a naughty child has already done unlink...
@@ -943,22 +946,22 @@ monitor_child(Pid) ->
receive
%% If the child dies before the unlik we must empty
%% the mail-box of the 'EXIT'-message and the 'DOWN'-message.
- {'EXIT', Pid, Reason} ->
- receive
+ {'EXIT', Pid, Reason} ->
+ receive
{'DOWN', _, process, Pid, _} ->
{error, Reason}
end
- after 0 ->
+ after 0 ->
%% If a naughty child did unlink and the child dies before
- %% monitor the result will be that shutdown/2 receives a
+ %% monitor the result will be that shutdown/2 receives a
%% 'DOWN'-message with reason noproc.
%% If the child should die after the unlink there
%% will be a 'DOWN'-message with a correct reason
- %% that will be handled in shutdown/2.
- ok
+ %% that will be handled in shutdown/2.
+ ok
end.
-
-
+
+
%%-----------------------------------------------------------------
%% Child/State manipulating functions.
%%-----------------------------------------------------------------
@@ -1012,7 +1015,7 @@ remove_child(Child, State) ->
%% Args: SupName = {local, atom()} | {global, atom()} | self
%% Type = {Strategy, MaxIntensity, Period}
%% Strategy = one_for_one | one_for_all | simple_one_for_one |
-%% rest_for_one
+%% rest_for_one
%% MaxIntensity = integer()
%% Period = integer()
%% Mod :== atom()
@@ -1107,10 +1110,10 @@ validChildType(supervisor) -> true;
validChildType(worker) -> true;
validChildType(What) -> throw({invalid_child_type, What}).
-validName(_Name) -> true.
+validName(_Name) -> true.
-validFunc({M, F, A}) when is_atom(M),
- is_atom(F),
+validFunc({M, F, A}) when is_atom(M),
+ is_atom(F),
is_list(A) -> true;
validFunc(Func) -> throw({invalid_mfa, Func}).
@@ -1128,7 +1131,7 @@ validDelay(Delay) when is_number(Delay),
Delay >= 0 -> true;
validDelay(What) -> throw({invalid_delay, What}).
-validShutdown(Shutdown, _)
+validShutdown(Shutdown, _)
when is_integer(Shutdown), Shutdown > 0 -> true;
validShutdown(infinity, supervisor) -> true;
validShutdown(brutal_kill, _) -> true;
@@ -1154,7 +1157,7 @@ validMods(Mods) -> throw({invalid_modules, Mods}).
%%% Returns: {ok, State'} | {terminate, State'}
%%% ------------------------------------------------------
-add_restart(State) ->
+add_restart(State) ->
I = State#state.intensity,
P = State#state.period,
R = State#state.restarts,
diff --git a/src/supervisor2_tests.erl b/src/supervisor2_tests.erl
new file mode 100644
index 00000000..e42ded7b
--- /dev/null
+++ b/src/supervisor2_tests.erl
@@ -0,0 +1,70 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(supervisor2_tests).
+-behaviour(supervisor2).
+
+-export([test_all/0, start_link/0]).
+-export([init/1]).
+
+test_all() ->
+ ok = check_shutdown(stop, 200, 200, 2000),
+ ok = check_shutdown(ignored, 1, 2, 2000).
+
+check_shutdown(SigStop, Iterations, ChildCount, SupTimeout) ->
+ {ok, Sup} = supervisor2:start_link(?MODULE, [SupTimeout]),
+ Res = lists:foldl(
+ fun (I, ok) ->
+ TestSupPid = erlang:whereis(?MODULE),
+ ChildPids =
+ [begin
+ {ok, ChildPid} =
+ supervisor2:start_child(TestSupPid, []),
+ ChildPid
+ end || _ <- lists:seq(1, ChildCount)],
+ MRef = erlang:monitor(process, TestSupPid),
+ [P ! SigStop || P <- ChildPids],
+ ok = supervisor2:terminate_child(Sup, test_sup),
+ {ok, _} = supervisor2:restart_child(Sup, test_sup),
+ receive
+ {'DOWN', MRef, process, TestSupPid, shutdown} ->
+ ok;
+ {'DOWN', MRef, process, TestSupPid, Reason} ->
+ {error, {I, Reason}}
+ end;
+ (_, R) ->
+ R
+ end, ok, lists:seq(1, Iterations)),
+ unlink(Sup),
+ exit(Sup, shutdown),
+ Res.
+
+start_link() ->
+ Pid = spawn_link(fun () ->
+ process_flag(trap_exit, true),
+ receive stop -> ok end
+ end),
+ {ok, Pid}.
+
+init([Timeout]) ->
+ {ok, {{one_for_one, 0, 1},
+ [{test_sup, {supervisor2, start_link,
+ [{local, ?MODULE}, ?MODULE, []]},
+ transient, Timeout, supervisor, [?MODULE]}]}};
+init([]) ->
+ {ok, {{simple_one_for_one_terminate, 0, 1},
+ [{test_worker, {?MODULE, start_link, []},
+ temporary, 1000, worker, [?MODULE]}]}}.