summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2012-10-23 14:23:16 +0100
committerTim Watson <tim@rabbitmq.com>2012-10-23 14:23:16 +0100
commit87b8cc8eb06feb26f15170ce53cb626fb416d87a (patch)
tree277f1f89ab142cf66942082b31cfd77c82afb7b6
parent572ec0a712306d4e4055884a8b80b063c3bf324f (diff)
parent75a2d3c389ca787446d498af09cd12f22cfc11f9 (diff)
downloadrabbitmq-server-bug25227.tar.gz
merge defaultbug25227
-rw-r--r--docs/rabbitmqctl.1.xml127
-rw-r--r--include/rabbit.hrl3
-rw-r--r--packaging/RPMS/Fedora/Makefile11
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.init4
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec10
-rw-r--r--packaging/debs/Debian/debian/control4
-rw-r--r--src/gm.erl113
-rw-r--r--src/gm_soak_test.erl4
-rw-r--r--src/gm_speed_test.erl3
-rw-r--r--src/gm_tests.erl10
-rw-r--r--src/rabbit.erl49
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl59
-rw-r--r--src/rabbit_backing_queue.erl26
-rw-r--r--src/rabbit_backing_queue_qc.erl18
-rw-r--r--src/rabbit_control_main.erl40
-rw-r--r--src/rabbit_exchange.erl5
-rw-r--r--src/rabbit_guid.erl6
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl6
-rw-r--r--src/rabbit_mirror_queue_master.erl71
-rw-r--r--src/rabbit_mirror_queue_misc.erl77
-rw-r--r--src/rabbit_mirror_queue_slave.erl214
-rw-r--r--src/rabbit_misc.erl9
-rw-r--r--src/rabbit_policy.erl134
-rw-r--r--src/rabbit_policy_validator.erl37
-rw-r--r--src/rabbit_registry.erl3
-rw-r--r--src/rabbit_runtime_parameters.erl109
-rw-r--r--src/rabbit_runtime_parameters_test.erl30
-rw-r--r--src/rabbit_tests.erl40
-rw-r--r--src/rabbit_upgrade_functions.erl15
-rw-r--r--src/rabbit_variable_queue.erl28
-rw-r--r--src/rabbit_vhost.erl2
32 files changed, 848 insertions, 422 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 73347cea..3082fe14 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -313,8 +313,8 @@
linkend="stop_app"><command>stop_app</command></link>.
</para>
<para>
- Cluster nodes can be of two types: disk or RAM. Disk nodes
- replicate data in RAM and on disk, thus providing redundancy in
+ Cluster nodes can be of two types: disc or RAM. Disc nodes
+ replicate data in RAM and on disc, thus providing redundancy in
the event of node failure and recovery from global events such
as power failure across all nodes. RAM nodes replicate data in
RAM only (with the exception of queue contents, which can reside
@@ -322,10 +322,10 @@
and are mainly used for scalability. RAM nodes are more
performant only when managing resources (e.g. adding/removing
queues, exchanges, or bindings). A cluster must always have at
- least one disk node, and usually should have more than one.
+ least one disc node, and usually should have more than one.
</para>
<para>
- The node will be a disk node by default. If you wish to
+ The node will be a disc node by default. If you wish to
create a RAM node, provide the <command>--ram</command> flag.
</para>
<para>
@@ -367,18 +367,18 @@
</listitem>
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>change_cluster_node_type</command> <arg choice="req">disk | ram</arg></cmdsynopsis>
+ <term><cmdsynopsis><command>change_cluster_node_type</command> <arg choice="req">disc | ram</arg></cmdsynopsis>
</term>
<listitem>
<para>
Changes the type of the cluster node. The node must be stopped for
this operation to succeed, and when turning a node into a RAM node
- the node must not be the only disk node in the cluster.
+ the node must not be the only disc node in the cluster.
</para>
<para role="example-prefix">For example:</para>
- <screen role="example">rabbitmqctl change_cluster_node_type disk</screen>
+ <screen role="example">rabbitmqctl change_cluster_node_type disc</screen>
<para role="example">
- This command will turn a RAM node into a disk node.
+ This command will turn a RAM node into a disc node.
</para>
</listitem>
</varlistentry>
@@ -637,7 +637,7 @@
</para>
<para>
Deleting a virtual host deletes all its exchanges,
- queues, bindings, user permissions and parameters.
+ queues, bindings, user permissions, parameters and policies.
</para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl delete_vhost test</screen>
@@ -806,8 +806,8 @@
Certain features of RabbitMQ (such as the federation plugin)
are controlled by dynamic,
cluster-wide <emphasis>parameters</emphasis>. Each parameter
- consists of a component name, a key and a value, and is
- associated with a virtual host. The component name and key are
+ consists of a component name, a name and a value, and is
+ associated with a virtual host. The component name and name are
strings, and the value is an Erlang term. Parameters can be
set, cleared and listed. In general you should refer to the
documentation for the feature in question to see how to set
@@ -815,7 +815,7 @@
</para>
<variablelist>
<varlistentry>
- <term><cmdsynopsis><command>set_parameter</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>set_parameter</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>name</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term>
<listitem>
<para>
Sets a parameter.
@@ -829,24 +829,24 @@
</para></listitem>
</varlistentry>
<varlistentry>
- <term>key</term>
+ <term>name</term>
<listitem><para>
- The key for which the parameter is being set.
+ The name of the parameter being set.
</para></listitem>
</varlistentry>
<varlistentry>
<term>value</term>
<listitem><para>
- The value for the parameter, as an
- Erlang term. In most shells you are very likely to
+ The value for the parameter, as a
+ JSON term. In most shells you are very likely to
need to quote this.
</para></listitem>
</varlistentry>
</variablelist>
<para role="example-prefix">For example:</para>
- <screen role="example">rabbitmqctl set_parameter federation local_username '&lt;&lt;"guest">>'</screen>
+ <screen role="example">rabbitmqctl set_parameter federation local_username '"guest"'</screen>
<para role="example">
- This command sets the parameter <command>local_username</command> for the <command>federation</command> component in the default virtual host to the Erlang term <command>&lt;&lt;"guest">></command>.
+ This command sets the parameter <command>local_username</command> for the <command>federation</command> component in the default virtual host to the JSON term <command>"guest"</command>.
</para>
</listitem>
</varlistentry>
@@ -865,9 +865,9 @@
</para></listitem>
</varlistentry>
<varlistentry>
- <term>key</term>
+ <term>name</term>
<listitem><para>
- The key for which the parameter is being cleared.
+ The name of the parameter being cleared.
</para></listitem>
</varlistentry>
</variablelist>
@@ -895,6 +895,93 @@
</refsect2>
<refsect2>
+ <title>Policy Management</title>
+ <para>
+ Policies are used to control and modify the behaviour of queues
+ and exchanges on a cluster-wide basis. Policies apply within a
+ given vhost, and consist of a name, pattern, definition and an
+ optional priority. Policies can be set, cleared and listed.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term><cmdsynopsis><command>set_policy</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>name</replaceable></arg> <arg choice="req"><replaceable>pattern</replaceable></arg> <arg choice="req"><replaceable>definition</replaceable></arg> <arg choice="opt"><replaceable>priority</replaceable></arg> </cmdsynopsis></term>
+ <listitem>
+ <para>
+ Sets a policy.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term>name</term>
+ <listitem><para>
+ The name of the policy.
+ </para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>pattern</term>
+ <listitem><para>
+ The regular expression, which when matches on a given resources causes the policy to apply.
+ </para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>definition</term>
+ <listitem><para>
+ The definition of the policy, as a
+ JSON term. In most shells you are very likely to
+ need to quote this.
+ </para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>priority</term>
+ <listitem><para>
+ The priority of the policy as an integer, defaulting to 0. Higher numbers indicate greater precedence.
+ </para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl set_policy federate-me "^amq." '{"federation-upstream-set":"all"}'</screen>
+ <para role="example">
+ This command sets the policy <command>federate-me</command> in the default virtual host so that built-in exchanges are federated.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>clear_policy</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>name</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Clears a policy.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term>name</term>
+ <listitem><para>
+ The name of the policy being cleared.
+ </para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl clear_policy federate-me</screen>
+ <para role="example">
+ This command clears the <command>federate-me</command> policy in the default virtual host.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>list_policies</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Lists all policies for a virtual host.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl list_policies</screen>
+ <para role="example">
+ This command lists all policies in the default virtual host.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect2>
+
+ <refsect2>
<title>Server Status</title>
<para>
The server status queries interrogate the server and return a list of
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 41cce0a3..3db2b68a 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -47,7 +47,8 @@
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid, slave_pids, sync_slave_pids, policy}).
+ arguments, pid, slave_pids, sync_slave_pids, policy,
+ gm_pids}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index 03e513f8..4f5f1327 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -13,13 +13,17 @@ RPM_OS=fedora
endif
ifeq "$(RPM_OS)" "suse"
+FUNCTION_LIBRARY=
REQUIRES=/sbin/chkconfig /sbin/service
OS_DEFINES=--define '_initrddir /etc/init.d' --define 'dist .suse'
-START_PROG=setsid
+SPEC_DEFINES=--define 'group_tag Productivity/Networking/Other'
+START_PROG=startproc
else
+FUNCTION_LIBRARY=\# Source function library.\n. /etc/init.d/functions
REQUIRES=chkconfig initscripts
OS_DEFINES=--define '_initrddir /etc/rc.d/init.d'
-START_PROG=runuser rabbitmq --session-command
+SPEC_DEFINES=--define 'group_tag Development/Libraries'
+START_PROG=daemon
endif
rpms: clean server
@@ -35,6 +39,7 @@ prepare:
cp rabbitmq-server.init SOURCES/rabbitmq-server.init
sed -i \
-e 's|^START_PROG=.*$$|START_PROG="$(START_PROG)"|' \
+ -e 's|^@FUNCTION_LIBRARY@|$(FUNCTION_LIBRARY)|' \
SOURCES/rabbitmq-server.init
ifeq "$(RPM_OS)" "fedora"
# Fedora says that only vital services should have Default-Start
@@ -47,7 +52,7 @@ endif
cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate
server: prepare
- rpmbuild -ba --nodeps SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES)
+ rpmbuild -ba --nodeps SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) $(SPEC_DEFINES)
clean:
rm -rf SOURCES SPECS RPMS SRPMS BUILD tmp
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.init b/packaging/RPMS/Fedora/rabbitmq-server.init
index 2d2680e3..3e48147b 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.init
+++ b/packaging/RPMS/Fedora/rabbitmq-server.init
@@ -10,12 +10,14 @@
# Provides: rabbitmq-server
# Required-Start: $remote_fs $network
# Required-Stop: $remote_fs $network
-# Default-Start: 3 4 5
+# Default-Start: 3 5
# Default-Stop: 0 1 2 6
# Description: RabbitMQ broker
# Short-Description: Enable AMQP service provided by RabbitMQ broker
### END INIT INFO
+@FUNCTION_LIBRARY@
+
PATH=/sbin:/usr/sbin:/bin:/usr/bin
NAME=rabbitmq-server
DAEMON=/usr/sbin/${NAME}
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index a6899005..d73c5634 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -3,8 +3,8 @@
Name: rabbitmq-server
Version: %%VERSION%%
Release: 1%{?dist}
-License: MPLv1.1
-Group: Development/Libraries
+License: MPLv1.1 and MIT and ASL 2.0 and BSD
+Group: %{group_tag}
Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.gz
Source1: rabbitmq-server.init
Source2: rabbitmq-script-wrapper
@@ -31,8 +31,10 @@ scalable implementation of an AMQP broker.
%define _rabbit_server_ocf %{_builddir}/`basename %{S:4}`
%define _plugins_state_dir %{_localstatedir}/lib/rabbitmq/plugins
+
%define _maindir %{buildroot}%{_rabbit_erllibdir}
+
%prep
%setup -q
@@ -110,8 +112,8 @@ done
%files -f ../%{name}.files
%defattr(-,root,root,-)
-%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq
-%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/log/rabbitmq
+%attr(0755, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq
+%attr(0755, rabbitmq, rabbitmq) %dir %{_localstatedir}/log/rabbitmq
%dir %{_sysconfdir}/rabbitmq
%{_initrddir}/rabbitmq-server
%config(noreplace) %{_sysconfdir}/logrotate.d/rabbitmq-server
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index 943ed48f..d4526d87 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -5,12 +5,12 @@ Maintainer: RabbitMQ Team <packaging@rabbitmq.com>
Uploaders: Emile Joubert <emile@rabbitmq.com>
DM-Upload-Allowed: yes
Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:12.b.3), erlang-src (>= 1:12.b.3), unzip, zip
-Standards-Version: 3.8.0
+Standards-Version: 3.9.2
Package: rabbitmq-server
Architecture: all
Depends: erlang-nox (>= 1:12.b.3), adduser, logrotate, ${misc:Depends}
-Description: An AMQP server written in Erlang
+Description: AMQP server written in Erlang
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
scalable implementation of an AMQP broker.
diff --git a/src/gm.erl b/src/gm.erl
index 90433e84..4a95de0d 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -376,7 +376,7 @@
-behaviour(gen_server2).
--export([create_tables/0, start_link/3, leave/1, broadcast/2,
+-export([create_tables/0, start_link/4, leave/1, broadcast/2,
confirmed_broadcast/2, info/1, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -408,7 +408,8 @@
callback_args,
confirms,
broadcast_buffer,
- broadcast_timer
+ broadcast_timer,
+ txn_executor
}).
-record(gm_group, { name, version, members }).
@@ -428,9 +429,10 @@
-export_type([group_name/0]).
-type(group_name() :: any()).
+-type(txn_fun() :: fun((fun(() -> any())) -> any())).
-spec(create_tables/0 :: () -> 'ok' | {'aborted', any()}).
--spec(start_link/3 :: (group_name(), atom(), any()) ->
+-spec(start_link/4 :: (group_name(), atom(), any(), txn_fun()) ->
rabbit_types:ok_pid_or_error()).
-spec(leave/1 :: (pid()) -> 'ok').
-spec(broadcast/2 :: (pid(), any()) -> 'ok').
@@ -507,8 +509,8 @@ table_definitions() ->
{Name, Attributes} = ?TABLE,
[{Name, [?TABLE_MATCH | Attributes]}].
-start_link(GroupName, Module, Args) ->
- gen_server2:start_link(?MODULE, [GroupName, Module, Args], []).
+start_link(GroupName, Module, Args, TxnFun) ->
+ gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []).
leave(Server) ->
gen_server2:cast(Server, leave).
@@ -529,7 +531,7 @@ forget_group(GroupName) ->
end),
ok.
-init([GroupName, Module, Args]) ->
+init([GroupName, Module, Args, TxnFun]) ->
{MegaSecs, Secs, MicroSecs} = now(),
random:seed(MegaSecs, Secs, MicroSecs),
Self = make_member(GroupName),
@@ -545,7 +547,8 @@ init([GroupName, Module, Args]) ->
callback_args = Args,
confirms = queue:new(),
broadcast_buffer = [],
- broadcast_timer = undefined }, hibernate,
+ broadcast_timer = undefined,
+ txn_executor = TxnFun }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -585,7 +588,8 @@ handle_call({add_on_right, NewMember}, _From,
view = View,
members_state = MembersState,
module = Module,
- callback_args = Args }) ->
+ callback_args = Args,
+ txn_executor = TxnFun }) ->
{MembersState1, Group} =
record_new_member_in_group(
GroupName, Self, NewMember,
@@ -596,7 +600,7 @@ handle_call({add_on_right, NewMember}, _From,
{catchup, Self,
prepare_members_state(MembersState1)}),
MembersState1
- end),
+ end, TxnFun),
View2 = group_to_view(Group),
State1 = check_neighbours(State #state { view = View2,
members_state = MembersState1 }),
@@ -642,8 +646,9 @@ handle_cast(join, State = #state { self = Self,
group_name = GroupName,
members_state = undefined,
module = Module,
- callback_args = Args }) ->
- View = join_group(Self, GroupName),
+ callback_args = Args,
+ txn_executor = TxnFun }) ->
+ View = join_group(Self, GroupName, TxnFun),
MembersState =
case alive_view_members(View) of
[Self] -> blank_member_state();
@@ -670,7 +675,8 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
view = View,
module = Module,
callback_args = Args,
- confirms = Confirms }) ->
+ confirms = Confirms,
+ txn_executor = TxnFun }) ->
Member = case {Left, Right} of
{{Member1, MRef}, _} -> Member1;
{_, {Member1, MRef}} -> Member1;
@@ -683,7 +689,8 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
noreply(State);
_ ->
View1 =
- group_to_view(record_dead_member_in_group(Member, GroupName)),
+ group_to_view(record_dead_member_in_group(Member,
+ GroupName, TxnFun)),
{Result, State2} =
case alive_view_members(View1) of
[Self] ->
@@ -985,14 +992,15 @@ ensure_alive_suffix1(MembersQ) ->
%% View modification
%% ---------------------------------------------------------------------------
-join_group(Self, GroupName) ->
- join_group(Self, GroupName, read_group(GroupName)).
+join_group(Self, GroupName, TxnFun) ->
+ join_group(Self, GroupName, read_group(GroupName), TxnFun).
-join_group(Self, GroupName, {error, not_found}) ->
- join_group(Self, GroupName, prune_or_create_group(Self, GroupName));
-join_group(Self, _GroupName, #gm_group { members = [Self] } = Group) ->
+join_group(Self, GroupName, {error, not_found}, TxnFun) ->
+ join_group(Self, GroupName,
+ prune_or_create_group(Self, GroupName, TxnFun), TxnFun);
+join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) ->
group_to_view(Group);
-join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
+join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
case lists:member(Self, Members) of
true ->
group_to_view(Group);
@@ -1000,20 +1008,22 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
case lists:filter(fun is_member_alive/1, Members) of
[] ->
join_group(Self, GroupName,
- prune_or_create_group(Self, GroupName));
+ prune_or_create_group(Self, GroupName, TxnFun));
Alive ->
Left = lists:nth(random:uniform(length(Alive)), Alive),
Handler =
fun () ->
join_group(
Self, GroupName,
- record_dead_member_in_group(Left, GroupName))
+ record_dead_member_in_group(
+ Left, GroupName, TxnFun),
+ TxnFun)
end,
try
case gen_server2:call(
get_pid(Left), {add_on_right, Self}, infinity) of
{ok, Group1} -> group_to_view(Group1);
- not_ready -> join_group(Self, GroupName)
+ not_ready -> join_group(Self, GroupName, TxnFun)
end
catch
exit:{R, _}
@@ -1032,29 +1042,29 @@ read_group(GroupName) ->
[Group] -> Group
end.
-prune_or_create_group(Self, GroupName) ->
- {atomic, Group} =
- mnesia:sync_transaction(
- fun () -> GroupNew = #gm_group { name = GroupName,
- members = [Self],
- version = ?VERSION_START },
- case mnesia:read({?GROUP_TABLE, GroupName}) of
- [] ->
- mnesia:write(GroupNew),
- GroupNew;
- [Group1 = #gm_group { members = Members }] ->
- case lists:any(fun is_member_alive/1, Members) of
- true -> Group1;
- false -> mnesia:write(GroupNew),
- GroupNew
- end
- end
- end),
+prune_or_create_group(Self, GroupName, TxnFun) ->
+ Group = TxnFun(
+ fun () ->
+ GroupNew = #gm_group { name = GroupName,
+ members = [Self],
+ version = ?VERSION_START },
+ case mnesia:read({?GROUP_TABLE, GroupName}) of
+ [] ->
+ mnesia:write(GroupNew),
+ GroupNew;
+ [Group1 = #gm_group { members = Members }] ->
+ case lists:any(fun is_member_alive/1, Members) of
+ true -> Group1;
+ false -> mnesia:write(GroupNew),
+ GroupNew
+ end
+ end
+ end),
Group.
-record_dead_member_in_group(Member, GroupName) ->
- {atomic, Group} =
- mnesia:sync_transaction(
+record_dead_member_in_group(Member, GroupName, TxnFun) ->
+ Group =
+ TxnFun(
fun () -> [Group1 = #gm_group { members = Members, version = Ver }] =
mnesia:read({?GROUP_TABLE, GroupName}),
case lists:splitwith(
@@ -1071,9 +1081,9 @@ record_dead_member_in_group(Member, GroupName) ->
end),
Group.
-record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
- {atomic, {Result, Group}} =
- mnesia:sync_transaction(
+record_new_member_in_group(GroupName, Left, NewMember, Fun, TxnFun) ->
+ {Result, Group} =
+ TxnFun(
fun () ->
[#gm_group { members = Members, version = Ver } = Group1] =
mnesia:read({?GROUP_TABLE, GroupName}),
@@ -1088,10 +1098,10 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
end),
{Result, Group}.
-erase_members_in_group(Members, GroupName) ->
+erase_members_in_group(Members, GroupName, TxnFun) ->
DeadMembers = [{dead, Id} || Id <- Members],
- {atomic, Group} =
- mnesia:sync_transaction(
+ Group =
+ TxnFun(
fun () ->
[Group1 = #gm_group { members = [_|_] = Members1,
version = Ver }] =
@@ -1112,7 +1122,8 @@ maybe_erase_aliases(State = #state { self = Self,
view = View0,
members_state = MembersState,
module = Module,
- callback_args = Args }, View) ->
+ callback_args = Args,
+ txn_executor = TxnFun }, View) ->
#view_member { aliases = Aliases } = fetch_view_member(Self, View),
{Erasable, MembersState1}
= ?SETS:fold(
@@ -1129,7 +1140,7 @@ maybe_erase_aliases(State = #state { self = Self,
case Erasable of
[] -> {ok, State1 #state { view = View }};
_ -> View1 = group_to_view(
- erase_members_in_group(Erasable, GroupName)),
+ erase_members_in_group(Erasable, GroupName, TxnFun)),
{callback_view_changed(Args, Module, View0, View1),
check_neighbours(State1 #state { view = View1 })}
end.
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index 57217541..5fbfc223 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -105,7 +105,9 @@ spawn_member() ->
random:seed(MegaSecs, Secs, MicroSecs),
%% start up delay of no more than 10 seconds
timer:sleep(random:uniform(10000)),
- {ok, Pid} = gm:start_link(?MODULE, ?MODULE, []),
+ {ok, Pid} = gm:start_link(
+ ?MODULE, ?MODULE, [],
+ fun rabbit_misc:execute_mnesia_transaction/1),
Start = random:uniform(10000),
send_loop(Pid, Start, Start + random:uniform(10000)),
gm:leave(Pid),
diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl
index dad75bd4..84d4ab2f 100644
--- a/src/gm_speed_test.erl
+++ b/src/gm_speed_test.erl
@@ -44,7 +44,8 @@ terminate(Owner, _Reason) ->
%% other
wile_e_coyote(Time, WriteUnit) ->
- {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()),
+ {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
receive joined -> ok end,
timer:sleep(1000), %% wait for all to join
timer:send_after(Time, stop),
diff --git a/src/gm_tests.erl b/src/gm_tests.erl
index 0a2d4204..a9c0ba90 100644
--- a/src/gm_tests.erl
+++ b/src/gm_tests.erl
@@ -76,7 +76,9 @@ test_confirmed_broadcast() ->
test_member_death() ->
with_two_members(
fun (Pid, Pid2) ->
- {ok, Pid3} = gm:start_link(?MODULE, ?MODULE, self()),
+ {ok, Pid3} = gm:start_link(
+ ?MODULE, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
passed = receive_joined(Pid3, [Pid, Pid2, Pid3],
timeout_joining_gm_group_3),
passed = receive_birth(Pid, Pid3, timeout_waiting_for_birth_3_1),
@@ -128,10 +130,12 @@ test_broadcast_fun(Fun) ->
with_two_members(Fun) ->
ok = gm:create_tables(),
- {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()),
+ {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
passed = receive_joined(Pid, [Pid], timeout_joining_gm_group_1),
- {ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self()),
+ {ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self(),
+ fun rabbit_misc:execute_mnesia_transaction/1),
passed = receive_joined(Pid2, [Pid, Pid2], timeout_joining_gm_group_2),
passed = receive_birth(Pid, Pid2, timeout_waiting_for_birth_2),
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 93808f84..69f77824 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -300,9 +300,9 @@ start() ->
%% We do not want to HiPE compile or upgrade
%% mnesia after just restarting the app
ok = ensure_application_loaded(),
- ok = rabbit_node_monitor:prepare_cluster_status_files(),
- ok = rabbit_mnesia:check_cluster_consistency(),
ok = ensure_working_log_handlers(),
+ rabbit_node_monitor:prepare_cluster_status_files(),
+ rabbit_mnesia:check_cluster_consistency(),
ok = app_utils:start_applications(
app_startup_order(), fun handle_app_error/2),
ok = print_plugin_info(rabbit_plugins:active())
@@ -312,13 +312,13 @@ boot() ->
start_it(fun() ->
ok = ensure_application_loaded(),
maybe_hipe_compile(),
- ok = rabbit_node_monitor:prepare_cluster_status_files(),
ok = ensure_working_log_handlers(),
+ rabbit_node_monitor:prepare_cluster_status_files(),
ok = rabbit_upgrade:maybe_upgrade_mnesia(),
%% It's important that the consistency check happens after
%% the upgrade, since if we are a secondary node the
%% primary node will have forgotten us
- ok = rabbit_mnesia:check_cluster_consistency(),
+ rabbit_mnesia:check_cluster_consistency(),
Plugins = rabbit_plugins:setup(),
ToBeLoaded = Plugins ++ ?APPS,
ok = app_utils:load_applications(ToBeLoaded),
@@ -330,21 +330,26 @@ boot() ->
end).
handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) ->
- boot_error({could_not_start, App, Reason}, not_available);
+ throw({could_not_start, App, Reason});
handle_app_error(App, Reason) ->
- boot_error({could_not_start, App, Reason}, not_available).
+ throw({could_not_start, App, Reason}).
start_it(StartFun) ->
try
StartFun()
+ catch
+ throw:{could_not_start, _App, _Reason}=Err ->
+ boot_error(Err, not_available);
+ _:Reason ->
+ boot_error(Reason, erlang:get_stacktrace())
after
%% give the error loggers some time to catch up
timer:sleep(100)
end.
stop() ->
- rabbit_log:info("Stopping Rabbit~n"),
+ rabbit_log:info("Stopping RabbitMQ~n"),
ok = app_utils:stop_applications(app_shutdown_order()).
stop_and_halt() ->
@@ -412,6 +417,9 @@ rotate_logs(BinarySuffix) ->
start(normal, []) ->
case erts_version_check() of
ok ->
+ {ok, Vsn} = application:get_key(rabbit, vsn),
+ error_logger:info_msg("Starting RabbitMQ ~s on Erlang ~s~n",
+ [Vsn, erlang:system_info(otp_release)]),
{ok, SupPid} = rabbit_sup:start_link(),
true = register(rabbit, self()),
print_banner(),
@@ -498,13 +506,16 @@ sort_boot_steps(UnsortedSteps) ->
not erlang:function_exported(M, F, length(A))] of
[] -> SortedSteps;
MissingFunctions -> basic_boot_error(
+ {missing_functions, MissingFunctions},
"Boot step functions not exported: ~p~n",
[MissingFunctions])
end;
{error, {vertex, duplicate, StepName}} ->
- basic_boot_error("Duplicate boot step name: ~w~n", [StepName]);
+ basic_boot_error({duplicate_boot_step, StepName},
+ "Duplicate boot step name: ~w~n", [StepName]);
{error, {edge, Reason, From, To}} ->
basic_boot_error(
+ {invalid_boot_step_dependency, From, To},
"Could not add boot step dependency of ~w on ~w:~n~s",
[To, From,
case Reason of
@@ -518,7 +529,7 @@ sort_boot_steps(UnsortedSteps) ->
end])
end.
-boot_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
+boot_error(Term={error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
AllNodes = rabbit_mnesia:cluster_nodes(all),
{Err, Nodes} =
case AllNodes -- [node()] of
@@ -529,23 +540,27 @@ boot_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
"Timeout contacting cluster nodes: ~p.~n", [Ns]),
Ns}
end,
- basic_boot_error(Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []);
-
+ basic_boot_error(Term,
+ Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []);
boot_error(Reason, Stacktrace) ->
- Fmt = "Error description:~n ~p~n~n"
+ Fmt = "Error description:~n ~p~n~n" ++
"Log files (may contain more information):~n ~s~n ~s~n~n",
Args = [Reason, log_location(kernel), log_location(sasl)],
+ boot_error(Reason, Fmt, Args, Stacktrace).
+
+boot_error(Reason, Fmt, Args, Stacktrace) ->
case Stacktrace of
- not_available -> basic_boot_error(Fmt, Args);
- _ -> basic_boot_error(Fmt ++ "Stack trace:~n ~p~n~n",
+ not_available -> basic_boot_error(Reason, Fmt, Args);
+ _ -> basic_boot_error(Reason, Fmt ++
+ "Stack trace:~n ~p~n~n",
Args ++ [Stacktrace])
end.
-basic_boot_error(Format, Args) ->
+basic_boot_error(Reason, Format, Args) ->
io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
- error_logger:error_msg(Format, Args),
+ rabbit_misc:local_info_msg(Format, Args),
timer:sleep(1000),
- exit({?MODULE, failure_during_boot}).
+ exit({?MODULE, failure_during_boot, Reason}).
%%---------------------------------------------------------------------------
%% boot step functions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a8b0ea24..6ad85b24 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -219,7 +219,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
exclusive_owner = Owner,
pid = none,
slave_pids = [],
- sync_slave_pids = []}),
+ sync_slave_pids = [],
+ gm_pids = []}),
{Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0),
Q1 = start_queue_process(Node, Q0),
case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f1821b5a..68f95778 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -86,6 +86,7 @@
-define(STATISTICS_KEYS,
[pid,
+ policy,
exclusive_consumer_pid,
exclusive_consumer_tag,
messages_ready,
@@ -512,6 +513,14 @@ send_or_record_confirm(#delivery{sender = SenderPid,
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
{immediately, State}.
+discard(#delivery{sender = SenderPid, message = #basic_message{id = MsgId}},
+ State) ->
+ %% fake an 'eventual' confirm from BQ; noop if not needed
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ confirm_messages([MsgId], State),
+ BQS1 = BQ:discard(MsgId, SenderPid, BQS),
+ State1#q{backing_queue_state = BQS1}.
+
run_message_queue(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_messages(State),
@@ -520,17 +529,20 @@ run_message_queue(State) ->
BQ:is_empty(BQS), State1),
State2.
-attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props,
+attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
+ Props = #message_properties{delivered = Delivered},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
deliver_msgs_to_consumers(
- fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
+ fun (true, State1 = #q{backing_queue_state = BQS2}) ->
{AckTag, BQS3} = BQ:publish_delivered(
- AckRequired, Message, Props,
- SenderPid, BQS2),
- {{Message, Props#message_properties.delivered, AckTag},
- true, State1#q{backing_queue_state = BQS3}}
+ Message, Props, SenderPid, BQS2),
+ {{Message, Delivered, AckTag},
+ true, State1#q{backing_queue_state = BQS3}};
+ (false, State1) ->
+ {{Message, Delivered, undefined},
+ true, discard(Delivery, State1)}
end, false, State#q{backing_queue_state = BQS1});
{published, BQS1} ->
{true, State#q{backing_queue_state = BQS1}};
@@ -545,13 +557,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
case attempt_delivery(Delivery, Props, State1) of
{true, State2} ->
State2;
- %% the next one is an optimisations
- %% TODO: optimise the Confirm =/= never case too
- {false, State2 = #q{ttl = 0, dlx = undefined,
- backing_queue = BQ, backing_queue_state = BQS}}
- when Confirm == never ->
- BQS1 = BQ:discard(Message, SenderPid, BQS),
- State2#q{backing_queue_state = BQS1};
+ %% The next one is an optimisation
+ {false, State2 = #q{ttl = 0, dlx = undefined}} ->
+ discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
@@ -687,16 +695,15 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
Now = now_micros(),
DLXFun = dead_letter_fun(expired, State),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
- {Props, BQS1} =
- case DLXFun of
- undefined ->
- {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS),
- {Next, BQS2};
- _ ->
- {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
- DLXFun(Msgs),
- {Next, BQS2}
- end,
+ {Props, BQS1} = case DLXFun of
+ undefined -> {Next, undefined, BQS2} =
+ BQ:dropwhile(ExpirePred, false, BQS),
+ {Next, BQS2};
+ _ -> {Next, Msgs, BQS2} =
+ BQ:dropwhile(ExpirePred, true, BQS),
+ DLXFun(Msgs),
+ {Next, BQS2}
+ end,
ensure_ttl_timer(case Props of
undefined -> undefined;
#message_properties{expiry = Exp} -> Exp
@@ -867,6 +874,12 @@ i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) ->
'';
i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) ->
ExclusiveOwner;
+i(policy, #q{q = #amqqueue{name = Name}}) ->
+ {ok, Q} = rabbit_amqqueue:lookup(Name),
+ case rabbit_policy:name(Q) of
+ none -> '';
+ Policy -> Policy
+ end;
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
'';
i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index c6d17785..af660c60 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -84,12 +84,16 @@
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
--callback publish_delivered(true, rabbit_types:basic_message(),
+-callback publish_delivered(rabbit_types:basic_message(),
rabbit_types:message_properties(), pid(), state())
- -> {ack(), state()};
- (false, rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state())
- -> {undefined, state()}.
+ -> {ack(), state()}.
+
+%% Called to inform the BQ about messages which have reached the
+%% queue, but are not going to be further passed to BQ for some
+%% reason. Note that this may be invoked for messages for which
+%% BQ:is_duplicate/2 has already returned {'published' | 'discarded',
+%% BQS}.
+-callback discard(rabbit_types:msg_id(), pid(), state()) -> state().
%% Return ids of messages which have been confirmed since the last
%% invocation of this function (or initialisation).
@@ -200,13 +204,6 @@
-callback is_duplicate(rabbit_types:basic_message(), state())
-> {'false'|'published'|'discarded', state()}.
-%% Called to inform the BQ about messages which have reached the
-%% queue, but are not going to be further passed to BQ for some
-%% reason. Note that this is may be invoked for messages for which
-%% BQ:is_duplicate/2 has already returned {'published' | 'discarded',
-%% BQS}.
--callback discard(rabbit_types:basic_message(), pid(), state()) -> state().
-
-else.
-export([behaviour_info/1]).
@@ -214,12 +211,11 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
{delete_and_terminate, 2}, {purge, 1}, {publish, 4},
- {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3},
+ {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
{fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
- {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2},
- {discard, 3}];
+ {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index e40d9b29..b37fbb29 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -119,7 +119,7 @@ qc_publish_multiple(#state{}) ->
qc_publish_delivered(#state{bqstate = BQ}) ->
{call, ?BQMOD, publish_delivered,
- [boolean(), qc_message(), #message_properties{}, self(), BQ]}.
+ [qc_message(), #message_properties{}, self(), BQ]}.
qc_fetch(#state{bqstate = BQ}) ->
{call, ?BQMOD, fetch, [boolean(), BQ]}.
@@ -199,7 +199,7 @@ next_state(S, _BQ, {call, ?MODULE, publish_multiple, [PublishCount]}) ->
next_state(S, Res,
{call, ?BQMOD, publish_delivered,
- [AckReq, Msg, MsgProps, _Pid, _BQ]}) ->
+ [Msg, MsgProps, _Pid, _BQ]}) ->
#state{confirms = Confirms, acks = Acks, next_seq_id = NextSeq} = S,
AckTag = {call, erlang, element, [1, Res]},
BQ1 = {call, erlang, element, [2, Res]},
@@ -213,10 +213,7 @@ next_state(S, Res,
true -> gb_sets:add(MsgId, Confirms);
_ -> Confirms
end,
- acks = case AckReq of
- true -> [{AckTag, {NextSeq, {MsgProps, Msg}}}|Acks];
- false -> Acks
- end
+ acks = [{AckTag, {NextSeq, {MsgProps, Msg}}}|Acks]
};
next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) ->
@@ -391,4 +388,13 @@ drop_messages(Messages) ->
end
end.
+-else.
+
+-export([prop_disabled/0]).
+
+prop_disabled() ->
+ exit({compiled_without_proper,
+ "PropEr was not present during compilation of the test module. "
+ "Hence all tests are disabled."}).
+
-endif.
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index e75e1f6f..25f7d758 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -70,6 +70,10 @@
{clear_parameter, [?VHOST_DEF]},
{list_parameters, [?VHOST_DEF]},
+ {set_policy, [?VHOST_DEF]},
+ {clear_policy, [?VHOST_DEF]},
+ {list_policies, [?VHOST_DEF]},
+
{list_queues, [?VHOST_DEF]},
{list_exchanges, [?VHOST_DEF]},
{list_bindings, [?VHOST_DEF]},
@@ -98,7 +102,9 @@
{"Bindings", rabbit_binding, info_all, info_keys},
{"Consumers", rabbit_amqqueue, consumers_all, consumer_info_keys},
{"Permissions", rabbit_auth_backend_internal, list_vhost_permissions,
- vhost_perms_info_keys}]).
+ vhost_perms_info_keys},
+ {"Policies", rabbit_policy, list_formatted, info_keys},
+ {"Parameters", rabbit_runtime_parameters, list_formatted, info_keys}]).
%%----------------------------------------------------------------------------
@@ -170,8 +176,8 @@ start() ->
{error, Reason} ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
- {parse_error, {_Line, Mod, Err}} ->
- print_error("~s", [lists:flatten(Mod:format_error(Err))]),
+ {error_string, Reason} ->
+ print_error("~s", [Reason]),
rabbit_misc:quit(2);
{badrpc, {'EXIT', Reason}} ->
print_error("~p", [Reason]),
@@ -458,6 +464,28 @@ action(list_parameters, Node, [], Opts, Inform) ->
rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]),
rabbit_runtime_parameters:info_keys());
+action(set_policy, Node, [Key, Pattern, Defn | Prio], Opts, Inform)
+ when Prio == [] orelse length(Prio) == 1 ->
+ Msg = "Setting policy ~p for pattern ~p to ~p",
+ {InformMsg, Prio1} = case Prio of [] -> {Msg, undefined};
+ [P] -> {Msg ++ " with priority ~s", P}
+ end,
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform(InformMsg, [Key, Pattern, Defn] ++ Prio),
+ rpc_call(Node, rabbit_policy, parse_set,
+ [VHostArg, list_to_binary(Key), Pattern, Defn, Prio1]);
+
+action(clear_policy, Node, [Key], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform("Clearing policy ~p", [Key]),
+ rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]);
+
+action(list_policies, Node, [], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform("Listing policies", []),
+ display_info_list(rpc_call(Node, rabbit_policy, list_formatted, [VHostArg]),
+ rabbit_policy:info_keys());
+
action(report, Node, _Args, _Opts, Inform) ->
Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]),
[begin ok = action(Action, N, [], [], Inform), io:nl() end ||
@@ -477,12 +505,14 @@ action(eval, Node, [Expr], _Opts, _Inform) ->
Node, erl_eval, exprs, [Parsed, []]),
io:format("~p~n", [Value]),
ok;
- {error, E} -> {parse_error, E}
+ {error, E} -> {error_string, format_parse_error(E)}
end;
{error, E, _} ->
- {parse_error, E}
+ {error_string, format_parse_error(E)}
end.
+format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)).
+
%%----------------------------------------------------------------------------
wait_for_application(Node, PidFile, Application, Inform) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 4cc96ef5..a205b23d 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -298,7 +298,10 @@ i(durable, #exchange{durable = Durable}) -> Durable;
i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
i(internal, #exchange{internal = Internal}) -> Internal;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
-i(policy, X) -> rabbit_policy:name(X);
+i(policy, X) -> case rabbit_policy:name(X) of
+ none -> '';
+ Policy -> Policy
+ end;
i(Item, _) -> throw({bad_argument, Item}).
info(X = #exchange{}) -> infos(?INFO_KEYS, X).
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index ba0cb04f..cedbbdb3 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -144,11 +144,7 @@ gen_secure() ->
%% employs base64url encoding, which is safer in more contexts than
%% plain base64.
string(G, Prefix) ->
- Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
- ($\/, Acc) -> [$\_ | Acc];
- ($\=, Acc) -> Acc;
- (Chr, Acc) -> [Chr | Acc]
- end, [], base64:encode_to_string(G)).
+ Prefix ++ "-" ++ rabbit_misc:base64url(G).
binary(G, Prefix) ->
list_to_binary(string(G, Prefix)).
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 6cd71fc3..e1a21cf7 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -326,7 +326,9 @@ ensure_monitoring(CPid, Pids) ->
init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) ->
GM1 = case GM of
undefined ->
- {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]),
+ {ok, GM2} = gm:start_link(
+ QueueName, ?MODULE, [self()],
+ fun rabbit_misc:execute_mnesia_transaction/1),
receive {joined, GM2, _Members} ->
ok
end,
@@ -349,7 +351,7 @@ handle_call(get_gm, _From, State = #state { gm = GM }) ->
handle_cast({gm_deaths, Deaths},
State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
when node(MPid) =:= node() ->
- case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
+ case rabbit_mirror_queue_misc:remove_from_queue(QueueName, MPid, Deaths) of
{ok, MPid, DeadPids} ->
rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
DeadPids),
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 6dac2808..cce19c90 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,11 +17,11 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
+ purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2,
requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, discard/3, fold/3]).
+ status/1, invoke/3, is_duplicate/2, fold/3]).
-export([start/1, stop/0]).
@@ -97,10 +97,18 @@ init(Q = #amqqueue{name = QName}, Recover, AsyncCallback) ->
ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
State.
-init_with_existing_bq(Q, BQ, BQS) ->
+init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
Q, undefined, sender_death_fun(), depth_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
+ Self = self(),
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q1 = #amqqueue{gm_pids = GMPids}]
+ = mnesia:read({rabbit_queue, QName}),
+ ok = rabbit_amqqueue:store_queue(
+ Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]})
+ end),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -157,7 +165,7 @@ stop_all_slaves(Reason, #state{gm = GM}) ->
fun () ->
[Q] = mnesia:read({rabbit_queue, QName}),
rabbit_mirror_queue_misc:store_updated_slaves(
- Q #amqqueue { slave_pids = [] })
+ Q #amqqueue { gm_pids = [], slave_pids = [] })
end),
ok = gm:forget_group(QName).
@@ -175,25 +183,42 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}),
+ ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}),
BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
-publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
+publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
ChPid, State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS,
ack_msg_id = AM }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(
- GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}),
- {AckTag, BQS1} =
- BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS),
+ ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}),
+ {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
AM1 = maybe_store_acktag(AckTag, MsgId, AM),
- {AckTag,
- ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1,
- ack_msg_id = AM1 })}.
+ State1 = State #state { backing_queue_state = BQS1, ack_msg_id = AM1 },
+ {AckTag, ensure_monitoring(ChPid, State1)}.
+
+discard(MsgId, ChPid, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen_status = SS }) ->
+ %% It's a massive error if we get told to discard something that's
+ %% already been published or published-and-confirmed. To do that
+ %% would require non FIFO access. Hence we should not find
+ %% 'published' or 'confirmed' in this dict:find.
+ case dict:find(MsgId, SS) of
+ error ->
+ ok = gm:broadcast(GM, {discard, ChPid, MsgId}),
+ BQS1 = BQ:discard(MsgId, ChPid, BQS),
+ ensure_monitoring(
+ ChPid, State #state {
+ backing_queue_state = BQS1,
+ seen_status = dict:erase(MsgId, SS) });
+ {ok, discarded} ->
+ State
+ end.
dropwhile(Pred, AckRequired,
State = #state{gm = GM,
@@ -367,26 +392,6 @@ is_duplicate(Message = #basic_message { id = MsgId },
{discarded, State}
end.
-discard(Msg = #basic_message { id = MsgId }, ChPid,
- State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- seen_status = SS }) ->
- %% It's a massive error if we get told to discard something that's
- %% already been published or published-and-confirmed. To do that
- %% would require non FIFO access. Hence we should not find
- %% 'published' or 'confirmed' in this dict:find.
- case dict:find(MsgId, SS) of
- error ->
- ok = gm:broadcast(GM, {discard, ChPid, Msg}),
- ensure_monitoring(
- ChPid, State #state {
- backing_queue_state = BQ:discard(Msg, ChPid, BQS),
- seen_status = dict:erase(MsgId, SS) });
- {ok, discarded} ->
- State
- end.
-
%% ---------------------------------------------------------------------------
%% Other exported functions
%% ---------------------------------------------------------------------------
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index b6c229aa..4a00846e 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -15,22 +15,32 @@
%%
-module(rabbit_mirror_queue_misc).
+-behaviour(rabbit_policy_validator).
--export([remove_from_queue/2, on_node_up/0, add_mirrors/2, add_mirror/2,
+-export([remove_from_queue/3, on_node_up/0, add_mirrors/2, add_mirror/2,
report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1,
- is_mirrored/1, update_mirrors/2]).
+ is_mirrored/1, update_mirrors/2, validate_policy/1]).
%% for testing only
-export([suggested_queue_nodes/4]).
-include("rabbit.hrl").
+-rabbit_boot_step({?MODULE,
+ [{description, "HA policy validation"},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-mode">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-params">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(remove_from_queue/2 ::
- (rabbit_amqqueue:name(), [pid()])
+-spec(remove_from_queue/3 ::
+ (rabbit_amqqueue:name(), pid(), [pid()])
-> {'ok', pid(), [pid()]} | {'error', 'not_found'}).
-spec(on_node_up/0 :: () -> 'ok').
-spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok').
@@ -57,9 +67,7 @@
%% slave (now master) receives messages it's not ready for (for
%% example, new consumers).
%% Returns {ok, NewMPid, DeadPids}
-
-remove_from_queue(QueueName, DeadGMPids) ->
- DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids],
+remove_from_queue(QueueName, Self, DeadGMPids) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
@@ -67,20 +75,27 @@ remove_from_queue(QueueName, DeadGMPids) ->
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
[Q = #amqqueue { pid = QPid,
- slave_pids = SPids }] ->
- Alive = [Pid || Pid <- [QPid | SPids],
- not lists:member(node(Pid), DeadNodes)],
+ slave_pids = SPids,
+ gm_pids = GMPids }] ->
+ {Dead, GMPids1} = lists:partition(
+ fun ({GM, _}) ->
+ lists:member(GM, DeadGMPids)
+ end, GMPids),
+ DeadPids = [Pid || {_GM, Pid} <- Dead],
+ Alive = [QPid | SPids] -- DeadPids,
{QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
+ GMPids = GMPids1, %% ASSERTION
{ok, QPid1, []};
- _ when QPid =:= QPid1 orelse node(QPid1) =:= node() ->
+ _ when QPid =:= QPid1 orelse QPid1 =:= Self ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
store_updated_slaves(
Q #amqqueue { pid = QPid1,
- slave_pids = SPids1 }),
+ slave_pids = SPids1,
+ gm_pids = GMPids1 }),
{ok, QPid1, [QPid | SPids] -- Alive};
_ ->
%% Master has changed, and we're not it,
@@ -156,10 +171,8 @@ add_mirror(QName, MirrorNode) ->
start_child(Name, MirrorNode, Q);
[SPid] ->
case rabbit_misc:is_process_alive(SPid) of
- true ->
- {ok, already_mirrored};
- false ->
- start_child(Name, MirrorNode, Q)
+ true -> {ok, already_mirrored};
+ false -> start_child(Name, MirrorNode, Q)
end
end
end).
@@ -325,3 +338,35 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
add_mirrors(QName, NewNodes -- OldNodes),
drop_mirrors(QName, OldNodes -- NewNodes),
ok.
+
+%%----------------------------------------------------------------------------
+
+validate_policy(KeyList) ->
+ validate_policy(
+ proplists:get_value(<<"ha-mode">>, KeyList),
+ proplists:get_value(<<"ha-params">>, KeyList, none)).
+
+validate_policy(<<"all">>, none) ->
+ ok;
+validate_policy(<<"all">>, _Params) ->
+ {error, "ha-mode=\"all\" does not take parameters", []};
+
+validate_policy(<<"nodes">>, []) ->
+ {error, "ha-mode=\"nodes\" list must be non-empty", []};
+validate_policy(<<"nodes">>, Nodes) when is_list(Nodes) ->
+ case [I || I <- Nodes, not is_binary(I)] of
+ [] -> ok;
+ Invalid -> {error, "ha-mode=\"nodes\" takes a list of strings, "
+ "~p was not a string", [Invalid]}
+ end;
+validate_policy(<<"nodes">>, Params) ->
+ {error, "ha-mode=\"nodes\" takes a list, ~p given", [Params]};
+
+validate_policy(<<"exactly">>, N) when is_integer(N) andalso N > 0 ->
+ ok;
+validate_policy(<<"exactly">>, Params) ->
+ {error, "ha-mode=\"exactly\" takes an integer, ~p given", [Params]};
+
+validate_policy(Mode, _Params) ->
+ {error, "~p is not a valid ha-mode value", [Mode]}.
+
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 9e290126..1ba1420f 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -64,7 +64,6 @@
-record(state, { q,
gm,
- master_pid,
backing_queue,
backing_queue_state,
sync_timer_ref,
@@ -87,7 +86,7 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) -> gen_server2:call(QPid, info, infinity).
-init(#amqqueue { name = QueueName } = Q) ->
+init(Q = #amqqueue { name = QName }) ->
%% We join the GM group before we add ourselves to the amqqueue
%% record. As a result:
%% 1. We can receive msgs from GM that correspond to messages we will
@@ -100,23 +99,24 @@ init(#amqqueue { name = QueueName } = Q) ->
%% above.
%%
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
- {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ {ok, GM} = gm:start_link(QName, ?MODULE, [self()],
+ fun rabbit_misc:execute_mnesia_transaction/1),
receive {joined, GM} -> ok end,
Self = self(),
Node = node(),
case rabbit_misc:execute_mnesia_transaction(
- fun() -> init_it(Self, Node, QueueName) end) of
- {new, MPid} ->
- erlang:monitor(process, MPid),
+ fun() -> init_it(Self, GM, Node, QName) end) of
+ {new, QPid} ->
+ erlang:monitor(process, QPid),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [Self]),
ok = rabbit_memory_monitor:register(
Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
- BQS = bq_init(BQ, Q, false),
- State = #state { q = Q,
+ Q1 = Q #amqqueue { pid = QPid },
+ BQS = bq_init(BQ, Q1, false),
+ State = #state { q = Q1,
gm = GM,
- master_pid = MPid,
backing_queue = BQ,
backing_queue_state = BQS,
rate_timer_ref = undefined,
@@ -141,14 +141,15 @@ init(#amqqueue { name = QueueName } = Q) ->
duplicate_live_master ->
{stop, {duplicate_live_master, Node}};
existing ->
+ gm:leave(GM),
ignore
end.
-init_it(Self, Node, QueueName) ->
- [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
- mnesia:read({rabbit_queue, QueueName}),
- case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] -> add_slave(Q1, Self, MPids),
+init_it(Self, GM, Node, QName) ->
+ [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] =
+ mnesia:read({rabbit_queue, QName}),
+ case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
+ [] -> add_slave(Q, Self, GM),
{new, QPid};
[QPid] -> case rabbit_misc:is_process_alive(QPid) of
true -> duplicate_live_master;
@@ -156,15 +157,20 @@ init_it(Self, Node, QueueName) ->
end;
[SPid] -> case rabbit_misc:is_process_alive(SPid) of
true -> existing;
- false -> add_slave(Q1, Self, MPids -- [SPid]),
+ false -> Q1 = Q#amqqueue {
+ slave_pids = SPids -- [SPid],
+ gm_pids = [T || T = {_, S} <- GMPids,
+ S =/= SPid] },
+ add_slave(Q1, Self, GM),
{new, QPid}
end
end.
%% Add to the end, so they are in descending order of age, see
%% rabbit_mirror_queue_misc:promote_slave/1
-add_slave(Q, New, MPids) -> rabbit_mirror_queue_misc:store_updated_slaves(
- Q#amqqueue{slave_pids = MPids ++ [New]}).
+add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) ->
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}).
handle_call({deliver, Delivery, true}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
@@ -172,30 +178,29 @@ handle_call({deliver, Delivery, true}, From, State) ->
noreply(maybe_enqueue_message(Delivery, State));
handle_call({gm_deaths, Deaths}, From,
- State = #state { q = #amqqueue { name = QueueName },
- master_pid = MPid }) ->
- %% The GM has told us about deaths, which means we're not going to
- %% receive any more messages from GM
- case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
+ State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) ->
+ Self = self(),
+ case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, Deaths) of
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
{ok, Pid, DeadPids} ->
- rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName,
+ rabbit_mirror_queue_misc:report_deaths(Self, false, QName,
DeadPids),
- if node(Pid) =:= node(MPid) ->
+ case Pid of
+ MPid ->
%% master hasn't changed
gen_server2:reply(From, ok),
noreply(State);
- node(Pid) =:= node() ->
+ Self ->
%% we've become master
QueueState = promote_me(From, State),
{become, rabbit_amqqueue_process, QueueState, hibernate};
- true ->
- %% master has changed to not us.
+ _ ->
+ %% master has changed to not us
gen_server2:reply(From, ok),
erlang:monitor(process, Pid),
- noreply(State #state { master_pid = Pid })
+ noreply(State #state { q = Q #amqqueue { pid = Pid } })
end
end;
@@ -245,7 +250,7 @@ handle_info(timeout, State) ->
noreply(backing_queue_timeout(State));
handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
- State = #state { gm = GM, master_pid = MPid }) ->
+ State = #state { gm = GM, q = #amqqueue { pid = MPid } }) ->
ok = gm:broadcast(GM, process_death),
noreply(State);
@@ -368,7 +373,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _State) -> self();
i(name, #state { q = #amqqueue { name = Name } }) -> Name;
-i(master_pid, #state { master_pid = MPid }) -> MPid;
+i(master_pid, #state { q = #amqqueue { pid = MPid } }) -> MPid;
i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0;
i(Item, _State) -> throw({bad_argument, Item}).
@@ -387,14 +392,20 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
-needs_confirming(#delivery{ msg_seq_no = undefined }, _State) ->
- never;
-needs_confirming(#delivery { message = #basic_message {
- is_persistent = true } },
- #state { q = #amqqueue { durable = true } }) ->
- eventually;
-needs_confirming(_Delivery, _State) ->
- immediately.
+send_or_record_confirm(_, #delivery{ msg_seq_no = undefined }, MS, _State) ->
+ MS;
+send_or_record_confirm(published, #delivery { sender = ChPid,
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message {
+ id = MsgId,
+ is_persistent = true } },
+ MS, #state { q = #amqqueue { durable = true } }) ->
+ dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS);
+send_or_record_confirm(_Status, #delivery { sender = ChPid,
+ msg_seq_no = MsgSeqNo },
+ MS, _State) ->
+ ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
+ MS.
confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
{CMs, MS1} =
@@ -621,9 +632,8 @@ confirm_sender_death(Pid) ->
ok.
maybe_enqueue_message(
- Delivery = #delivery { message = #basic_message { id = MsgId },
- msg_seq_no = MsgSeqNo,
- sender = ChPid },
+ Delivery = #delivery { message = #basic_message { id = MsgId },
+ sender = ChPid },
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
@@ -633,28 +643,11 @@ maybe_enqueue_message(
MQ1 = queue:in(Delivery, MQ),
SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
State1 #state { sender_queues = SQ1 };
- {ok, confirmed} ->
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
- sender_queues = SQ1 };
- {ok, published} ->
- MS1 = case needs_confirming(Delivery, State1) of
- never -> dict:erase(MsgId, MS);
- eventually -> MMS = {published, ChPid, MsgSeqNo},
- dict:store(MsgId, MMS, MS);
- immediately -> ok = rabbit_misc:confirm_to_sender(
- ChPid, [MsgSeqNo]),
- dict:erase(MsgId, MS)
- end,
+ {ok, Status} ->
+ MS1 = send_or_record_confirm(
+ Status, Delivery, dict:erase(MsgId, MS), State1),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { msg_id_status = MS1,
- sender_queues = SQ1 };
- {ok, discarded} ->
- %% We've already heard from GM that the msg is to be
- %% discarded. We won't see this again.
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
sender_queues = SQ1 }
end.
@@ -672,39 +665,26 @@ remove_from_pending_ch(MsgId, ChPid, SQ) ->
dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ)
end.
-process_instruction(
- {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }},
- State = #state { sender_queues = SQ,
- backing_queue = BQ,
- backing_queue_state = BQS,
- msg_id_status = MS }) ->
-
- %% We really are going to do the publish right now, even though we
- %% may not have seen it directly from the channel. But we cannot
- %% issues confirms until the latter has happened. So we need to
- %% keep track of the MsgId and its confirmation status in the
- %% meantime.
+publish_or_discard(Status, ChPid, MsgId,
+ State = #state { sender_queues = SQ, msg_id_status = MS }) ->
+ %% We really are going to do the publish/discard right now, even
+ %% though we may not have seen it directly from the channel. But
+ %% we cannot issues confirms until the latter has happened. So we
+ %% need to keep track of the MsgId and its confirmation status in
+ %% the meantime.
State1 = ensure_monitoring(ChPid, State),
{MQ, PendingCh} = get_sender_queue(ChPid, SQ),
{MQ1, PendingCh1, MS1} =
case queue:out(MQ) of
{empty, _MQ2} ->
{MQ, sets:add_element(MsgId, PendingCh),
- dict:store(MsgId, published, MS)};
+ dict:store(MsgId, Status, MS)};
{{value, Delivery = #delivery {
- msg_seq_no = MsgSeqNo,
- message = #basic_message { id = MsgId } }}, MQ2} ->
+ message = #basic_message { id = MsgId } }}, MQ2} ->
{MQ2, PendingCh,
%% We received the msg from the channel first. Thus
%% we need to deal with confirms here.
- case needs_confirming(Delivery, State1) of
- never -> MS;
- eventually -> MMS = {published, ChPid, MsgSeqNo},
- dict:store(MsgId, MMS , MS);
- immediately -> ok = rabbit_misc:confirm_to_sender(
- ChPid, [MsgSeqNo]),
- MS
- end};
+ send_or_record_confirm(Status, Delivery, MS, State1)};
{{value, #delivery {}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}
@@ -713,52 +693,28 @@ process_instruction(
%% expecting any confirms from us.
{MQ, PendingCh, MS}
end,
-
- SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
- State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 },
-
- {ok,
- case Deliver of
- false ->
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
- State2 #state { backing_queue_state = BQS1 };
- {true, AckRequired} ->
- {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
- ChPid, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State2 #state { backing_queue_state = BQS1 })
- end};
-process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
- State = #state { sender_queues = SQ,
- backing_queue = BQ,
- backing_queue_state = BQS,
- msg_id_status = MS }) ->
- %% Many of the comments around the publish head above apply here
- %% too.
- State1 = ensure_monitoring(ChPid, State),
- {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
- {MQ1, PendingCh1, MS1} =
- case queue:out(MQ) of
- {empty, _MQ} ->
- {MQ, sets:add_element(MsgId, PendingCh),
- dict:store(MsgId, discarded, MS)};
- {{value, #delivery { message = #basic_message { id = MsgId } }},
- MQ2} ->
- %% We've already seen it from the channel, we're not
- %% going to see this again, so don't add it to MS
- {MQ2, PendingCh, MS};
- {{value, #delivery {}}, _MQ2} ->
- %% The instruction was sent to us before we were
- %% within the slave_pids within the #amqqueue{}
- %% record. We'll never receive the message directly
- %% from the channel.
- {MQ, PendingCh, MS}
- end,
SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
- BQS1 = BQ:discard(Msg, ChPid, BQS),
- {ok, State1 #state { sender_queues = SQ1,
- msg_id_status = MS1,
- backing_queue_state = BQS1 }};
+ State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
+
+
+process_instruction({publish, ChPid, MsgProps,
+ Msg = #basic_message { id = MsgId }}, State) ->
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ publish_or_discard(published, ChPid, MsgId, State),
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ {ok, State1 #state { backing_queue_state = BQS1 }};
+process_instruction({publish_delivered, ChPid, MsgProps,
+ Msg = #basic_message { id = MsgId }}, State) ->
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ publish_or_discard(published, ChPid, MsgId, State),
+ {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
+ {ok, maybe_store_ack(true, MsgId, AckTag,
+ State1 #state { backing_queue_state = BQS1 })};
+process_instruction({discard, ChPid, MsgId}, State) ->
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ publish_or_discard(discarded, ChPid, MsgId, State),
+ BQS1 = BQ:discard(MsgId, ChPid, BQS),
+ {ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index a0536a50..ab9a9ceb 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -63,6 +63,7 @@
-export([version/0]).
-export([sequence_error/1]).
-export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]).
+-export([base64url/1]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -227,6 +228,7 @@
-spec(json_decode/1 :: (string()) -> {'ok', any()} | 'error').
-spec(json_to_term/1 :: (any()) -> any()).
-spec(term_to_json/1 :: (any()) -> any()).
+-spec(base64url/1 :: (binary()) -> string()).
-endif.
@@ -987,3 +989,10 @@ term_to_json(L) when is_list(L) ->
term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
V =:= true orelse V =:= false ->
V.
+
+base64url(In) ->
+ lists:reverse(lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
+ ($\/, Acc) -> [$\_ | Acc];
+ ($\=, Acc) -> Acc;
+ (Chr, Acc) -> [Chr | Acc]
+ end, [], base64:encode_to_string(In))).
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index f4c1f42b..9af8fa18 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -22,11 +22,13 @@
-include("rabbit.hrl").
--import(rabbit_misc, [pget/2, pget/3]).
+-import(rabbit_misc, [pget/2]).
-export([register/0]).
-export([name/1, get/2, set/1]).
-export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
+-export([parse_set/5, set/5, delete/2, lookup/2, list/0, list/1,
+ list_formatted/1, info_keys/0]).
-rabbit_boot_step({?MODULE,
[{description, "policy parameters"},
@@ -55,7 +57,7 @@ get(Name, EntityName = #resource{virtual_host = VHost}) ->
get0(Name, match(EntityName, list(VHost))).
get0(_Name, undefined) -> {error, not_found};
-get0(Name, List) -> case pget(<<"policy">>, List) of
+get0(Name, List) -> case pget(definition, List) of
undefined -> {error, not_found};
Policy -> case pget(Name, Policy) of
undefined -> {error, not_found};
@@ -65,6 +67,81 @@ get0(Name, List) -> case pget(<<"policy">>, List) of
%%----------------------------------------------------------------------------
+parse_set(VHost, Name, Pattern, Definition, undefined) ->
+ parse_set0(VHost, Name, Pattern, Definition, 0);
+parse_set(VHost, Name, Pattern, Definition, Priority) ->
+ try list_to_integer(Priority) of
+ Num -> parse_set0(VHost, Name, Pattern, Definition, Num)
+ catch
+ error:badarg -> {error, "~p priority must be a number", [Priority]}
+ end.
+
+parse_set0(VHost, Name, Pattern, Defn, Priority) ->
+ case rabbit_misc:json_decode(Defn) of
+ {ok, JSON} ->
+ set0(VHost, Name,
+ [{<<"pattern">>, list_to_binary(Pattern)},
+ {<<"definition">>, rabbit_misc:json_to_term(JSON)},
+ {<<"priority">>, Priority}]);
+ error ->
+ {error_string, "JSON decoding error"}
+ end.
+
+set(VHost, Name, Pattern, Definition, Priority) ->
+ PolicyProps = [{<<"pattern">>, Pattern},
+ {<<"definition">>, Definition},
+ {<<"priority">>, case Priority of
+ undefined -> 0;
+ _ -> Priority
+ end}],
+ set0(VHost, Name, PolicyProps).
+
+set0(VHost, Name, Term) ->
+ rabbit_runtime_parameters:set_any(VHost, <<"policy">>, Name, Term).
+
+delete(VHost, Name) ->
+ rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Name).
+
+lookup(VHost, Name) ->
+ case rabbit_runtime_parameters:lookup(VHost, <<"policy">>, Name) of
+ not_found -> not_found;
+ P -> p(P, fun ident/1)
+ end.
+
+list() ->
+ list('_').
+
+list(VHost) ->
+ list0(VHost, fun ident/1).
+
+list_formatted(VHost) ->
+ order_policies(list0(VHost, fun format/1)).
+
+list0(VHost, DefnFun) ->
+ [p(P, DefnFun) || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)].
+
+order_policies(PropList) ->
+ lists:sort(fun (A, B) -> pget(priority, A) < pget(priority, B) end,
+ PropList).
+
+p(Parameter, DefnFun) ->
+ Value = pget(value, Parameter),
+ [{vhost, pget(vhost, Parameter)},
+ {name, pget(name, Parameter)},
+ {pattern, pget(<<"pattern">>, Value)},
+ {definition, DefnFun(pget(<<"definition">>, Value))},
+ {priority, pget(<<"priority">>, Value)}].
+
+format(Term) ->
+ {ok, JSON} = rabbit_misc:json_encode(rabbit_misc:term_to_json(Term)),
+ list_to_binary(JSON).
+
+ident(X) -> X.
+
+info_keys() -> [vhost, name, pattern, definition, priority].
+
+%%----------------------------------------------------------------------------
+
validate(_VHost, <<"policy">>, Name, Term) ->
rabbit_parameter_validation:proplist(
Name, policy_validation(), Term).
@@ -80,10 +157,6 @@ notify_clear(VHost, <<"policy">>, _Name) ->
%%----------------------------------------------------------------------------
-list(VHost) ->
- [[{<<"name">>, pget(key, P)} | pget(value, P)]
- || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)].
-
update_policies(VHost) ->
Policies = list(VHost),
{Xs, Qs} = rabbit_misc:execute_mnesia_transaction(
@@ -127,13 +200,52 @@ match(Name, Policies) ->
end.
matches(#resource{name = Name}, Policy) ->
- match =:= re:run(Name, pget(<<"pattern">>, Policy), [{capture, none}]).
+ match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]).
-sort_pred(A, B) -> pget(<<"priority">>, A, 0) >= pget(<<"priority">>, B, 0).
+sort_pred(A, B) -> pget(priority, A) >= pget(priority, B).
%%----------------------------------------------------------------------------
policy_validation() ->
- [{<<"priority">>, fun rabbit_parameter_validation:number/2, optional},
- {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
- {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}].
+ [{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory},
+ {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
+ {<<"definition">>, fun validation/2, mandatory}].
+
+validation(_Name, []) ->
+ {error, "no policy provided", []};
+validation(_Name, Terms) when is_list(Terms) ->
+ {Keys, Modules} = lists:unzip(
+ rabbit_registry:lookup_all(policy_validator)),
+ [] = dups(Keys), %% ASSERTION
+ Validators = lists:zipwith(fun (M, K) -> {M, a2b(K)} end, Modules, Keys),
+ {TermKeys, _} = lists:unzip(Terms),
+ case dups(TermKeys) of
+ [] -> validation0(Validators, Terms);
+ Dup -> {error, "~p duplicate keys not allowed", [Dup]}
+ end;
+validation(_Name, Term) ->
+ {error, "parse error while reading policy: ~p", [Term]}.
+
+validation0(Validators, Terms) ->
+ case lists:foldl(
+ fun (Mod, {ok, TermsLeft}) ->
+ ModKeys = proplists:get_all_values(Mod, Validators),
+ case [T || {Key, _} = T <- TermsLeft,
+ lists:member(Key, ModKeys)] of
+ [] -> {ok, TermsLeft};
+ Scope -> {Mod:validate_policy(Scope), TermsLeft -- Scope}
+ end;
+ (_, Acc) ->
+ Acc
+ end, {ok, Terms}, proplists:get_keys(Validators)) of
+ {ok, []} ->
+ ok;
+ {ok, Unvalidated} ->
+ {error, "~p are not recognised policy settings", [Unvalidated]};
+ {Error, _} ->
+ Error
+ end.
+
+a2b(A) -> list_to_binary(atom_to_list(A)).
+
+dups(L) -> L -- lists:usort(L).
diff --git a/src/rabbit_policy_validator.erl b/src/rabbit_policy_validator.erl
new file mode 100644
index 00000000..624c3d54
--- /dev/null
+++ b/src/rabbit_policy_validator.erl
@@ -0,0 +1,37 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_policy_validator).
+
+-ifdef(use_specs).
+
+-type(validate_results() ::
+ 'ok' | {error, string(), [term()]} | [validate_results()]).
+
+-callback validate_policy([{binary(), term()}]) -> validate_results().
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ {validate_policy, 1},
+ ];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index e14bbba0..32709d24 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -107,7 +107,8 @@ sanity_check_module(ClassModule, Module) ->
class_module(exchange) -> rabbit_exchange_type;
class_module(auth_mechanism) -> rabbit_auth_mechanism;
class_module(runtime_parameter) -> rabbit_runtime_parameter;
-class_module(exchange_decorator) -> rabbit_exchange_decorator.
+class_module(exchange_decorator) -> rabbit_exchange_decorator;
+class_module(policy_validator) -> rabbit_policy_validator.
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index b58b459a..4a83e61f 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -18,9 +18,9 @@
-include("rabbit.hrl").
--export([parse_set/4, set/4, clear/3,
- list/0, list/1, list_strict/1, list/2, list_strict/2, list_formatted/1,
- lookup/3, value/3, value/4, info_keys/0]).
+-export([parse_set/4, set/4, set_any/4, clear/3, clear_any/3, list/0, list/1,
+ list_strict/1, list/2, list_strict/2, list_formatted/1, lookup/3,
+ value/3, value/4, info_keys/0]).
%%----------------------------------------------------------------------------
@@ -32,8 +32,12 @@
-> ok_or_error_string()).
-spec(set/4 :: (rabbit_types:vhost(), binary(), binary(), term())
-> ok_or_error_string()).
+-spec(set_any/4 :: (rabbit_types:vhost(), binary(), binary(), term())
+ -> ok_or_error_string()).
-spec(clear/3 :: (rabbit_types:vhost(), binary(), binary())
-> ok_or_error_string()).
+-spec(clear_any/3 :: (rabbit_types:vhost(), binary(), binary())
+ -> ok_or_error_string()).
-spec(list/0 :: () -> [rabbit_types:infos()]).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(list_strict/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found').
@@ -57,29 +61,37 @@
%%---------------------------------------------------------------------------
-parse_set(VHost, Component, Key, String) ->
+parse_set(_, <<"policy">>, _, _) ->
+ {error_string, "policies may not be set using this method"};
+parse_set(VHost, Component, Name, String) ->
case rabbit_misc:json_decode(String) of
- {ok, JSON} -> set(VHost, Component, Key, rabbit_misc:json_to_term(JSON));
+ {ok, JSON} -> set(VHost, Component, Name,
+ rabbit_misc:json_to_term(JSON));
error -> {error_string, "JSON decoding error"}
end.
-set(VHost, Component, Key, Term) ->
- case set0(VHost, Component, Key, Term) of
- ok -> ok;
- {errors, L} -> format_error(L)
- end.
+set(_, <<"policy">>, _, _) ->
+ {error_string, "policies may not be set using this method"};
+set(VHost, Component, Name, Term) ->
+ set_any(VHost, Component, Name, Term).
format_error(L) ->
{error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}.
-set0(VHost, Component, Key, Term) ->
+set_any(VHost, Component, Name, Term) ->
+ case set_any0(VHost, Component, Name, Term) of
+ ok -> ok;
+ {errors, L} -> format_error(L)
+ end.
+
+set_any0(VHost, Component, Name, Term) ->
case lookup_component(Component) of
{ok, Mod} ->
- case flatten_errors(Mod:validate(VHost, Component, Key, Term)) of
+ case flatten_errors(Mod:validate(VHost, Component, Name, Term)) of
ok ->
- case mnesia_update(VHost, Component, Key, Term) of
+ case mnesia_update(VHost, Component, Name, Term) of
{old, Term} -> ok;
- _ -> Mod:notify(VHost, Component, Key, Term)
+ _ -> Mod:notify(VHost, Component, Name, Term)
end,
ok;
E ->
@@ -89,43 +101,49 @@ set0(VHost, Component, Key, Term) ->
E
end.
-mnesia_update(VHost, Component, Key, Term) ->
+mnesia_update(VHost, Component, Name, Term) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- Res = case mnesia:read(?TABLE, {VHost, Component, Key}, read) of
+ Res = case mnesia:read(?TABLE, {VHost, Component, Name}, read) of
[] -> new;
[Params] -> {old, Params#runtime_parameters.value}
end,
- ok = mnesia:write(?TABLE, c(VHost, Component, Key, Term), write),
+ ok = mnesia:write(?TABLE, c(VHost, Component, Name, Term), write),
Res
end).
-clear(VHost, Component, Key) ->
- case clear0(VHost, Component, Key) of
+clear(_, <<"policy">> , _) ->
+ {error_string, "policies may not be cleared using this method"};
+clear(VHost, Component, Name) ->
+ clear_any(VHost, Component, Name).
+
+clear_any(VHost, Component, Name) ->
+ case clear_any0(VHost, Component, Name) of
ok -> ok;
{errors, L} -> format_error(L)
end.
-clear0(VHost, Component, Key) ->
+clear_any0(VHost, Component, Name) ->
case lookup_component(Component) of
{ok, Mod} -> case flatten_errors(
- Mod:validate_clear(VHost, Component, Key)) of
- ok -> mnesia_clear(VHost, Component, Key),
- Mod:notify_clear(VHost, Component, Key),
+ Mod:validate_clear(VHost, Component, Name)) of
+ ok -> mnesia_clear(VHost, Component, Name),
+ Mod:notify_clear(VHost, Component, Name),
ok;
E -> E
end;
E -> E
end.
-mnesia_clear(VHost, Component, Key) ->
+mnesia_clear(VHost, Component, Name) ->
ok = rabbit_misc:execute_mnesia_transaction(
fun () ->
- ok = mnesia:delete(?TABLE, {VHost, Component, Key}, write)
+ ok = mnesia:delete(?TABLE, {VHost, Component, Name}, write)
end).
list() ->
- [p(P) || P <- rabbit_misc:dirty_read_all(?TABLE)].
+ [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <-
+ rabbit_misc:dirty_read_all(?TABLE), Comp /= <<"policy">>].
list(VHost) -> list(VHost, '_', []).
list_strict(Component) -> list('_', Component, not_found).
@@ -136,60 +154,63 @@ list(VHost, Component, Default) ->
case component_good(Component) of
true -> Match = #runtime_parameters{key = {VHost, Component, '_'},
_ = '_'},
- [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)];
+ [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <-
+ mnesia:dirty_match_object(?TABLE, Match),
+ Comp =/= <<"policy">> orelse
+ Component =:= <<"policy">>];
_ -> Default
end.
list_formatted(VHost) ->
[pset(value, format(pget(value, P)), P) || P <- list(VHost)].
-lookup(VHost, Component, Key) ->
- case lookup0(VHost, Component, Key, rabbit_misc:const(not_found)) of
+lookup(VHost, Component, Name) ->
+ case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> p(Params)
end.
-value(VHost, Component, Key) ->
- case lookup0(VHost, Component, Key, rabbit_misc:const(not_found)) of
+value(VHost, Component, Name) ->
+ case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> Params#runtime_parameters.value
end.
-value(VHost, Component, Key, Default) ->
- Params = lookup0(VHost, Component, Key,
+value(VHost, Component, Name, Default) ->
+ Params = lookup0(VHost, Component, Name,
fun () ->
- lookup_missing(VHost, Component, Key, Default)
+ lookup_missing(VHost, Component, Name, Default)
end),
Params#runtime_parameters.value.
-lookup0(VHost, Component, Key, DefaultFun) ->
- case mnesia:dirty_read(?TABLE, {VHost, Component, Key}) of
+lookup0(VHost, Component, Name, DefaultFun) ->
+ case mnesia:dirty_read(?TABLE, {VHost, Component, Name}) of
[] -> DefaultFun();
[R] -> R
end.
-lookup_missing(VHost, Component, Key, Default) ->
+lookup_missing(VHost, Component, Name, Default) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:read(?TABLE, {VHost, Component, Key}, read) of
- [] -> Record = c(VHost, Component, Key, Default),
+ case mnesia:read(?TABLE, {VHost, Component, Name}, read) of
+ [] -> Record = c(VHost, Component, Name, Default),
mnesia:write(?TABLE, Record, write),
Record;
[R] -> R
end
end).
-c(VHost, Component, Key, Default) ->
- #runtime_parameters{key = {VHost, Component, Key},
+c(VHost, Component, Name, Default) ->
+ #runtime_parameters{key = {VHost, Component, Name},
value = Default}.
-p(#runtime_parameters{key = {VHost, Component, Key}, value = Value}) ->
+p(#runtime_parameters{key = {VHost, Component, Name}, value = Value}) ->
[{vhost, VHost},
{component, Component},
- {key, Key},
+ {name, Name},
{value, Value}].
-info_keys() -> [component, key, value].
+info_keys() -> [component, name, value].
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl
index 5224ccaa..d4d7271e 100644
--- a/src/rabbit_runtime_parameters_test.erl
+++ b/src/rabbit_runtime_parameters_test.erl
@@ -16,9 +16,14 @@
-module(rabbit_runtime_parameters_test).
-behaviour(rabbit_runtime_parameter).
+-behaviour(rabbit_policy_validator).
-export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
-export([register/0, unregister/0]).
+-export([validate_policy/1]).
+-export([register_policy_validator/0, unregister_policy_validator/0]).
+
+%----------------------------------------------------------------------------
register() ->
rabbit_registry:register(runtime_parameter, <<"test">>, ?MODULE).
@@ -36,3 +41,28 @@ validate_clear(_, <<"test">>, _) -> {error, "meh", []}.
notify(_, _, _, _) -> ok.
notify_clear(_, _, _) -> ok.
+
+%----------------------------------------------------------------------------
+
+register_policy_validator() ->
+ rabbit_registry:register(policy_validator, <<"testeven">>, ?MODULE),
+ rabbit_registry:register(policy_validator, <<"testpos">>, ?MODULE).
+
+unregister_policy_validator() ->
+ rabbit_registry:unregister(policy_validator, <<"testeven">>),
+ rabbit_registry:unregister(policy_validator, <<"testpos">>).
+
+validate_policy([{<<"testeven">>, Terms}]) when is_list(Terms) ->
+ case length(Terms) rem 2 =:= 0 of
+ true -> ok;
+ false -> {error, "meh", []}
+ end;
+
+validate_policy([{<<"testpos">>, Terms}]) when is_list(Terms) ->
+ case lists:all(fun (N) -> is_integer(N) andalso N > 0 end, Terms) of
+ true -> ok;
+ false -> {error, "meh", []}
+ end;
+
+validate_policy(_) ->
+ {error, "meh", []}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 2e26837d..962bb648 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -57,6 +57,7 @@ all_tests() ->
passed = test_dynamic_mirroring(),
passed = test_user_management(),
passed = test_runtime_parameters(),
+ passed = test_policy_validation(),
passed = test_server_status(),
passed = test_confirms(),
passed =
@@ -774,7 +775,9 @@ test_log_management_during_startup() ->
ok = case catch control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
log_rotation_tty_no_handlers_test});
- {error, {cannot_log_to_tty, _, _}} -> ok
+ {badrpc, {'EXIT', {rabbit,failure_during_boot,
+ {error,{cannot_log_to_tty,
+ _, not_installed}}}}} -> ok
end,
%% fix sasl logging
@@ -798,7 +801,9 @@ test_log_management_during_startup() ->
ok = case control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
log_rotation_no_write_permission_dir_test});
- {error, {cannot_log_to_file, _, _}} -> ok
+ {badrpc, {'EXIT',
+ {rabbit, failure_during_boot,
+ {error, {cannot_log_to_file, _, _}}}}} -> ok
end,
%% start application with logging to a subdirectory which
@@ -809,8 +814,11 @@ test_log_management_during_startup() ->
ok = case control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
log_rotatation_parent_dirs_test});
- {error, {cannot_log_to_file, _,
- {error, {cannot_create_parent_dirs, _, eacces}}}} -> ok
+ {badrpc,
+ {'EXIT', {rabbit,failure_during_boot,
+ {error, {cannot_log_to_file, _,
+ {error,
+ {cannot_create_parent_dirs, _, eacces}}}}}}} -> ok
end,
ok = set_permissions(TmpDir, 8#00700),
ok = set_permissions(TmpLog, 8#00600),
@@ -1039,6 +1047,26 @@ test_runtime_parameters() ->
rabbit_runtime_parameters_test:unregister(),
passed.
+test_policy_validation() ->
+ rabbit_runtime_parameters_test:register_policy_validator(),
+ SetPol =
+ fun (Key, Val) ->
+ control_action(
+ set_policy,
+ ["name", ".*", rabbit_misc:format("{\"~s\":~p}", [Key, Val])])
+ end,
+
+ ok = SetPol("testeven", []),
+ ok = SetPol("testeven", [1, 2]),
+ ok = SetPol("testeven", [1, 2, 3, 4]),
+ ok = SetPol("testpos", [2, 5, 5678]),
+
+ {error_string, _} = SetPol("testpos", [-1, 0, 1]),
+ {error_string, _} = SetPol("testeven", [ 1, 2, 3]),
+
+ rabbit_runtime_parameters_test:unregister_policy_validator(),
+ passed.
+
test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
@@ -1100,8 +1128,8 @@ test_server_status() ->
ok = control_action(set_vm_memory_high_watermark, [float_to_list(HWM)]),
%% eval
- {parse_error, _} = control_action(eval, ["\""]),
- {parse_error, _} = control_action(eval, ["a("]),
+ {error_string, _} = control_action(eval, ["\""]),
+ {error_string, _} = control_action(eval, ["a("]),
ok = control_action(eval, ["a."]),
%% cleanup
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index ddc9c565..21fdcd66 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -42,6 +42,7 @@
[exchange_scratches, ha_mirrors]}).
-rabbit_upgrade({sync_slave_pids, mnesia, [policy]}).
-rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}).
+-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}).
%% -------------------------------------------------------------------
@@ -66,6 +67,7 @@
-spec(policy/0 :: () -> 'ok').
-spec(sync_slave_pids/0 :: () -> 'ok').
-spec(no_mirror_nodes/0 :: () -> 'ok').
+-spec(gm_pids/0 :: () -> 'ok').
-endif.
@@ -268,6 +270,19 @@ no_mirror_nodes() ->
|| T <- Tables],
ok.
+gm_pids() ->
+ Tables = [rabbit_queue, rabbit_durable_queue],
+ AddGMPidsFun =
+ fun ({amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol}) ->
+ {amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol, []}
+ end,
+ [ok = transform(T, AddGMPidsFun,
+ [name, durable, auto_delete, exclusive_owner, arguments,
+ pid, slave_pids, sync_slave_pids, policy, gm_pids])
+ || T <- Tables],
+ ok.
+
+
%%--------------------------------------------------------------------
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ddb136a7..8a3fd9d9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -17,11 +17,11 @@
-module(rabbit_variable_queue).
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
- publish/4, publish_delivered/5, drain_confirmed/1,
+ publish/4, publish_delivered/4, discard/3, drain_confirmed/1,
dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]).
+ is_duplicate/2, multiple_routing_keys/0, fold/3]).
-export([start/1, stop/0]).
@@ -545,17 +545,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
ram_msg_count = RamMsgCount + 1,
unconfirmed = UC1 })).
-publish_delivered(false, #basic_message { id = MsgId },
- #message_properties { needs_confirming = NeedsConfirming },
- _ChPid, State = #vqstate { async_callback = Callback,
- len = 0 }) ->
- case NeedsConfirming of
- true -> blind_confirm(Callback, gb_sets:singleton(MsgId));
- false -> ok
- end,
- {undefined, a(State)};
-publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
- id = MsgId },
+publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
+ id = MsgId },
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
_ChPid, State = #vqstate { len = 0,
@@ -579,6 +570,8 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
persistent_count = PCount1,
unconfirmed = UC1 }))}.
+discard(_MsgId, _ChPid, State) -> State.
+
drain_confirmed(State = #vqstate { confirmed = C }) ->
case gb_sets:is_empty(C) of
true -> {[], State}; %% common case
@@ -821,8 +814,6 @@ invoke(?MODULE, Fun, State) -> Fun(?MODULE, State).
is_duplicate(_Msg, State) -> {false, State}.
-discard(_Msg, _ChPid, State) -> State.
-
%%----------------------------------------------------------------------------
%% Minor helpers
%%----------------------------------------------------------------------------
@@ -1325,12 +1316,9 @@ must_sync_index(#vqstate { msg_indices_on_disk = MIOD,
%% subtraction.
not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
-blind_confirm(Callback, MsgIdSet) ->
- Callback(?MODULE,
- fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
-
msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
- blind_confirm(Callback, MsgIdSet);
+ Callback(?MODULE,
+ fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);
msgs_written_to_disk(Callback, MsgIdSet, written) ->
Callback(?MODULE,
fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 03dfbe24..297fa56f 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -97,6 +97,8 @@ internal_delete(VHostPath) ->
proplists:get_value(component, Info),
proplists:get_value(key, Info))
|| Info <- rabbit_runtime_parameters:list(VHostPath)],
+ [ok = rabbit_policy:delete(VHostPath, proplists:get_value(key, Info))
+ || Info <- rabbit_policy:list(VHostPath)],
ok = mnesia:delete({rabbit_vhost, VHostPath}),
ok.