summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-09-10 17:12:41 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-09-10 17:12:41 +0100
commit2c3a7a80dd38c07c190164546f29fbf2d5dc0400 (patch)
tree64ec18e366ba2ada8a56058377f7a5f41efd21ee
parent1a1acaa30a629312da86503de4a40c278b47bbc8 (diff)
parent432aefa0b24e199ec5b694da32187b777348cf61 (diff)
downloadrabbitmq-server-2c3a7a80dd38c07c190164546f29fbf2d5dc0400.tar.gz
Merging default into bug 23157
-rw-r--r--docs/html-to-website-xml.xsl2
-rw-r--r--docs/rabbitmqctl.1.xml115
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/rabbit.hrl2
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--packaging/debs/Debian/debian/control2
-rw-r--r--packaging/generic-unix/Makefile1
-rw-r--r--packaging/windows/Makefile1
-rw-r--r--src/file_handle_cache.erl111
-rw-r--r--src/gen_server2.erl80
-rw-r--r--src/rabbit.erl14
-rw-r--r--src/rabbit_access_control.erl146
-rw-r--r--src/rabbit_amqqueue.erl29
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_binding.erl378
-rw-r--r--src/rabbit_channel.erl69
-rw-r--r--src/rabbit_channel_sup.erl96
-rw-r--r--src/rabbit_channel_sup_sup.erl (renamed from src/rabbit_tracer.erl)45
-rw-r--r--src/rabbit_connection_sup.erl99
-rw-r--r--src/rabbit_control.erl71
-rw-r--r--src/rabbit_exchange.erl367
-rw-r--r--src/rabbit_exchange_type_headers.erl4
-rw-r--r--src/rabbit_framing_channel.erl19
-rw-r--r--src/rabbit_heartbeat.erl59
-rw-r--r--src/rabbit_hooks.erl73
-rw-r--r--src/rabbit_limiter.erl24
-rw-r--r--src/rabbit_load.erl78
-rw-r--r--src/rabbit_misc.erl20
-rw-r--r--src/rabbit_mnesia.erl4
-rw-r--r--src/rabbit_msg_store.erl143
-rw-r--r--src/rabbit_multi.erl23
-rw-r--r--src/rabbit_net.erl44
-rw-r--r--src/rabbit_networking.erl25
-rw-r--r--src/rabbit_plugin_activator.erl74
-rw-r--r--src/rabbit_queue_collector.erl10
-rw-r--r--src/rabbit_reader.erl228
-rw-r--r--src/rabbit_router.erl12
-rw-r--r--src/rabbit_tests.erl90
-rw-r--r--src/rabbit_types.erl10
-rw-r--r--src/rabbit_variable_queue.erl8
-rw-r--r--src/rabbit_writer.erl137
-rw-r--r--src/supervisor2.erl13
-rw-r--r--src/tcp_client_sup.erl10
44 files changed, 1483 insertions, 1276 deletions
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl
index 662dbea0..c325bb5a 100644
--- a/docs/html-to-website-xml.xsl
+++ b/docs/html-to-website-xml.xsl
@@ -30,7 +30,7 @@
<code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
</p>
<p>
- <a href="manpages.html">See a list of all manual pages</a>.
+ <a href="../manpages.html">See a list of all manual pages</a>.
</p>
</xsl:when>
<xsl:otherwise>
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 33552e17..5179eb25 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -417,7 +417,8 @@
<screen role="example">rabbitmqctl add_user tonyg changeit</screen>
<para role="example">
This command instructs the RabbitMQ broker to create a
- user named <command>tonyg</command> with (initial) password
+ (non-administrative) user named <command>tonyg</command> with
+ (initial) password
<command>changeit</command>.
</para>
</listitem>
@@ -465,13 +466,57 @@
</varlistentry>
<varlistentry>
+ <term><cmdsynopsis><command>set_admin</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>username</term>
+ <listitem><para>The name of the user whose administrative
+ status is to be set.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl set_admin tonyg</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to ensure the user
+ named <command>tonyg</command> is an administrator. This has no
+ effect when the user logs in via AMQP, but can be used to permit
+ the user to manage users, virtual hosts and permissions when the
+ user logs in via some other means (for example with the
+ management plugin).
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>clear_admin</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>username</term>
+ <listitem><para>The name of the user whose administrative
+ status is to be cleared.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl clear_admin tonyg</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to ensure the user
+ named <command>tonyg</command> is not an administrator.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><cmdsynopsis><command>list_users</command></cmdsynopsis></term>
<listitem>
<para>Lists users</para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl list_users</screen>
<para role="example">
- This command instructs the RabbitMQ broker to list all users.
+ This command instructs the RabbitMQ broker to list all
+ users. Each result row will contain the user name and
+ the administrator status of the user, in that order.
</para>
</listitem>
</varlistentry>
@@ -704,7 +749,7 @@
<variablelist>
<varlistentry>
<term>name</term>
- <listitem><para>The name of the queue with non-ASCII characters URL-escaped.</para></listitem>
+ <listitem><para>The name of the queue with non-ASCII characters escaped as in C.</para></listitem>
</varlistentry>
<varlistentry>
<term>durable</term>
@@ -795,7 +840,7 @@
<variablelist>
<varlistentry>
<term>name</term>
- <listitem><para>The name of the exchange with non-ASCII characters URL-escaped.</para></listitem>
+ <listitem><para>The name of the exchange with non-ASCII characters escaped as in C.</para></listitem>
</varlistentry>
<varlistentry>
<term>type</term>
@@ -830,22 +875,58 @@
</para>
</listitem>
</varlistentry>
- </variablelist>
- <variablelist>
- <varlistentry>
- <term><cmdsynopsis><command>list_bindings</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg></cmdsynopsis></term>
+ <varlistentry role="usage-has-option-list">
+ <term><cmdsynopsis><command>list_bindings</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt" role="usage-option-list"><replaceable>bindinginfoitem</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<para>
- By default the bindings for the <command>/</command> virtual
- host are returned. The "-p" flag can be used to override
- this default. Each result row will contain an exchange
- name, queue name, routing key and binding arguments, in
- that order. Non-ASCII characters will be URL-encoded.
+ Returns binding details. By default the bindings for
+ the <command>/</command> virtual host are returned. The
+ "-p" flag can be used to override this default.
</para>
- <para role="usage">
- The output format for "list_bindings" is a list of rows containing
- exchange name, queue name, routing key and arguments, in that order.
+ <para>
+ The <command>bindinginfoitem</command> parameter is used
+ to indicate which binding information items to include
+ in the results. The column order in the results will
+ match the order of the parameters.
+ <command>bindinginfoitem</command> can take any value
+ from the list that follows:
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term>exchange_name</term>
+ <listitem><para>The name of the exchange to which the
+ binding is attached. with non-ASCII characters
+ escaped as in C.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>queue_name</term>
+ <listitem><para>The name of the queue to which the
+ binding is attached. with non-ASCII characters
+ escaped as in C.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>routing_key</term>
+ <listitem><para>The binding's routing key, with
+ non-ASCII characters escaped as in C.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>arguments</term>
+ <listitem><para>The binding's arguments.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ If no <command>bindinginfoitem</command>s are specified then
+ all above items are displayed.
+ </para>
+ <para role="example-prefix">
+ For example:
+ </para>
+ <screen role="example">rabbitmqctl list_bindings -p /myvhost exchange_name queue_name</screen>
+ <para role="example">
+ This command displays the exchange name and queue name
+ of the bindings in the virtual host
+ named <command>/myvhost</command>.
</para>
</listitem>
</varlistentry>
@@ -904,7 +985,7 @@
</varlistentry>
<varlistentry>
<term>vhost</term>
- <listitem><para>Virtual host name with non-ASCII characters URL-escaped.</para></listitem>
+ <listitem><para>Virtual host name with non-ASCII characters escaped as in C.</para></listitem>
</varlistentry>
<varlistentry>
<term>timeout</term>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 48e19ff8..4be09c5a 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -26,6 +26,7 @@
{queue_index_max_journal_entries, 262144},
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
+ {default_user_is_admin, true},
{default_vhost, <<"/">>},
{default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
{collect_statistics, none}]}]}.
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index b9abd788..24aa8d98 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -29,7 +29,7 @@
%% Contributor(s): ______________________________________.
%%
--record(user, {username, password}).
+-record(user, {username, password, is_admin}).
-record(permission, {scope, configure, write, read}).
-record(user_vhost, {username, virtual_host}).
-record(user_permission, {user_vhost, permission}).
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index c0d9aeda..17518ddf 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -127,6 +127,9 @@ done
rm -rf %{buildroot}
%changelog
+* Mon Aug 23 2010 mikeb@rabbitmq.com 2.0.0-1
+- New Upstream Release
+
* Wed Jul 14 2010 Emile Joubert <emile@rabbitmq.com> 1.8.1-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 0dccf938..7ee60016 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (2.0.0-1) karmic; urgency=low
+
+ * New Upstream Release
+
+ -- Michael Bridgen <mikeb@rabbitmq.com> Mon, 23 Aug 2010 14:55:39 +0100
+
rabbitmq-server (1.8.1-1) lucid; urgency=low
* New Upstream Release
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index a44f49a0..4afc66ac 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -1,7 +1,7 @@
Source: rabbitmq-server
Section: net
Priority: extra
-Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com>
+Maintainer: RabbitMQ Team <packaging@rabbitmq.com>
Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc
Standards-Version: 3.8.0
diff --git a/packaging/generic-unix/Makefile b/packaging/generic-unix/Makefile
index 4eade6c7..c4e01f4a 100644
--- a/packaging/generic-unix/Makefile
+++ b/packaging/generic-unix/Makefile
@@ -4,7 +4,6 @@ TARGET_DIR=rabbitmq_server-$(VERSION)
TARGET_TARBALL=rabbitmq-server-generic-unix-$(VERSION)
dist:
- $(MAKE) -C ../.. VERSION=$(VERSION) srcdist
tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz
$(MAKE) -C $(SOURCE_DIR) \
diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile
index f47b5340..abe174e0 100644
--- a/packaging/windows/Makefile
+++ b/packaging/windows/Makefile
@@ -4,7 +4,6 @@ TARGET_DIR=rabbitmq_server-$(VERSION)
TARGET_ZIP=rabbitmq-server-windows-$(VERSION)
dist:
- $(MAKE) -C ../.. VERSION=$(VERSION) srcdist
tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz
$(MAKE) -C $(SOURCE_DIR)
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index f83fa0bc..d2830a25 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -34,13 +34,15 @@
%% A File Handle Cache
%%
%% This extends a subset of the functionality of the Erlang file
-%% module.
+%% module. In the below, we use "file handle" to specifically refer to
+%% file handles, and "file descriptor" to refer to descriptors which
+%% are not file handles, e.g. sockets.
%%
%% Some constraints
%% 1) This supports one writer, multiple readers per file. Nothing
%% else.
%% 2) Do not open the same file from different processes. Bad things
-%% may happen.
+%% may happen, especially for writes.
%% 3) Writes are all appends. You cannot write to the middle of a
%% file, although you can truncate and then append if you want.
%% 4) Although there is a write buffer, there is no read buffer. Feel
@@ -49,10 +51,10 @@
%%
%% Some benefits
%% 1) You do not have to remember to call sync before close
-%% 2) Buffering is much more flexible than with plain file module, and
-%% you can control when the buffer gets flushed out. This means that
-%% you can rely on reads-after-writes working, without having to call
-%% the expensive sync.
+%% 2) Buffering is much more flexible than with the plain file module,
+%% and you can control when the buffer gets flushed out. This means
+%% that you can rely on reads-after-writes working, without having to
+%% call the expensive sync.
%% 3) Unnecessary calls to position and sync get optimised out.
%% 4) You can find out what your 'real' offset is, and what your
%% 'virtual' offset is (i.e. where the hdl really is, and where it
@@ -60,14 +62,19 @@
%% 5) You can find out what the offset was when you last sync'd.
%%
%% There is also a server component which serves to limit the number
-%% of open file handles in a "soft" way - the server will never
-%% prevent a client from opening a handle, but may immediately tell it
-%% to close the handle. Thus you can set the limit to zero and it will
-%% still all work correctly, it is just that effectively no caching
-%% will take place. The operation of limiting is as follows:
+%% of open file descriptors. This is a hard limit: the server
+%% component will ensure that clients do not have more file
+%% descriptors open than it's configured to allow.
%%
-%% On open and close, the client sends messages to the server
-%% informing it of opens and closes. This allows the server to keep
+%% On open, the client requests permission from the server to open the
+%% required number of file handles. The server may ask the client to
+%% close other file handles that it has open, or it may queue the
+%% request and ask other clients to close file handles they have open
+%% in order to satisfy the request. Requests are always satisfied in
+%% the order they arrive, even if a latter request (for a small number
+%% of file handles) can be satisfied before an earlier request (for a
+%% larger number of file handles). On close, the client sends a
+%% message to the server. These messages allow the server to keep
%% track of the number of open handles. The client also keeps a
%% gb_tree which is updated on every use of a file handle, mapping the
%% time at which the file handle was last used (timestamp) to the
@@ -81,21 +88,38 @@
%% Note that this data can go very out of date, by the client using
%% the least recently used handle.
%%
-%% When the limit is reached, the server calculates the average age of
-%% the last reported least recently used file handle of all the
-%% clients. It then tells all the clients to close any handles not
-%% used for longer than this average, by invoking the callback the
-%% client registered. The client should receive this message and pass
-%% it into set_maximum_since_use/1. However, it is highly possible
-%% this age will be greater than the ages of all the handles the
-%% client knows of because the client has used its file handles in the
-%% mean time. Thus at this point the client reports to the server the
+%% When the limit is exceeded (i.e. the number of open file handles is
+%% at the limit and there are pending 'open' requests), the server
+%% calculates the average age of the last reported least recently used
+%% file handle of all the clients. It then tells all the clients to
+%% close any handles not used for longer than this average, by
+%% invoking the callback the client registered. The client should
+%% receive this message and pass it into
+%% set_maximum_since_use/1. However, it is highly possible this age
+%% will be greater than the ages of all the handles the client knows
+%% of because the client has used its file handles in the mean
+%% time. Thus at this point the client reports to the server the
%% current timestamp at which its least recently used file handle was
%% last used. The server will check two seconds later that either it
%% is back under the limit, in which case all is well again, or if
%% not, it will calculate a new average age. Its data will be much
%% more recent now, and so it is very likely that when this is
%% communicated to the clients, the clients will close file handles.
+%% (In extreme cases, where it's very likely that all clients have
+%% used their open handles since they last sent in an update, which
+%% would mean that the average will never cause any file handles to
+%% be closed, the server can send out an average age of 0, resulting
+%% in all available clients closing all their file handles.)
+%%
+%% Care is taken to ensure that (a) processes which are blocked
+%% waiting for file descriptors to become available are not sent
+%% requests to close file handles; and (b) given it is known how many
+%% file handles a process has open, when the average age is forced to
+%% 0, close messages are only sent to enough processes to release the
+%% correct number of file handles and the list of processes is
+%% randomly shuffled. This ensures we don't cause processes to
+%% needlessly close file handles, and ensures that we don't always
+%% make such requests of the same processes.
%%
%% The advantage of this scheme is that there is only communication
%% from the client to the server on open, close, and when in the
@@ -103,11 +127,7 @@
%% communication from the client to the server on normal file handle
%% operations. This scheme forms a feed-back loop - the server does
%% not care which file handles are closed, just that some are, and it
-%% checks this repeatedly when over the limit. Given the guarantees of
-%% now(), even if there is just one file handle open, a limit of 1,
-%% and one client, it is certain that when the client calculates the
-%% age of the handle, it will be greater than when the server
-%% calculated it, hence it should be closed.
+%% checks this repeatedly when over the limit.
%%
%% Handles which are closed as a result of the server are put into a
%% "soft-closed" state in which the handle is closed (data flushed out
@@ -117,8 +137,19 @@
%% - reopening them when necessary is handled transparently.
%%
%% The server also supports obtain and transfer. obtain/0 blocks until
-%% a file descriptor is available. transfer/1 is transfers ownership
-%% of a file descriptor between processes. It is non-blocking.
+%% a file descriptor is available, at which point the requesting
+%% process is considered to 'own' one more descriptor. transfer/1
+%% transfers ownership of a file descriptor between processes. It is
+%% non-blocking. Obtain is used to obtain permission to accept file
+%% descriptors. Obtain has a lower limit, set by the ?OBTAIN_LIMIT/1
+%% macro. File handles can use the entire limit, but will be evicted
+%% by obtain calls up to the point at which no more obtain calls can
+%% be satisfied by the obtains limit. Thus there will always be some
+%% capacity available for file handles. Processes that use obtain are
+%% never asked to return them, and they are not managed in any way by
+%% the server. It is simply a mechanism to ensure that processes that
+%% need file descriptors such as sockets can do so in such a way that
+%% the overall number of open file descriptors is managed.
%%
%% The callers of register_callback/3, obtain/0, and the argument of
%% transfer/1 are monitored, reducing the count of handles in use
@@ -131,6 +162,7 @@
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
-export([obtain/0, transfer/1, set_limit/1, get_limit/0]).
+-export([ulimit/0]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -242,6 +274,7 @@
-spec(transfer/1 :: (pid()) -> 'ok').
-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
-spec(get_limit/0 :: () -> non_neg_integer()).
+-spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()).
-endif.
@@ -781,7 +814,11 @@ init([]) ->
Watermark > 0) ->
Watermark;
_ ->
- ulimit()
+ case ulimit() of
+ infinity -> infinity;
+ unknown -> ?FILE_HANDLES_LIMIT_OTHER;
+ Lim -> lists:max([2, Lim - ?RESERVED_FOR_OTHERS])
+ end
end,
ObtainLimit = obtain_limit(Limit),
error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
@@ -1131,7 +1168,7 @@ track_client(Pid, Clients) ->
ulimit() ->
case os:type() of
{win32, _OsName} ->
- ?FILE_HANDLES_LIMIT_WINDOWS - ?RESERVED_FOR_OTHERS;
+ ?FILE_HANDLES_LIMIT_WINDOWS;
{unix, _OsName} ->
%% Under Linux, Solaris and FreeBSD, ulimit is a shell
%% builtin, not a command. In OS X, it's a command.
@@ -1141,16 +1178,14 @@ ulimit() ->
"unlimited" ->
infinity;
String = [C|_] when $0 =< C andalso C =< $9 ->
- Num = list_to_integer(
- lists:takewhile(
- fun (D) -> $0 =< D andalso D =< $9 end, String)) -
- ?RESERVED_FOR_OTHERS,
- lists:max([1, Num]);
+ list_to_integer(
+ lists:takewhile(
+ fun (D) -> $0 =< D andalso D =< $9 end, String));
_ ->
%% probably a variant of
%% "/bin/sh: line 1: ulimit: command not found\n"
- ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
+ unknown
end;
_ ->
- ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
+ unknown
end.
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 019438d5..20f701bd 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -957,54 +957,56 @@ terminate(Reason, Msg, #gs2_state { name = Name,
state = State,
debug = Debug }) ->
case catch Mod:terminate(Reason, State) of
- {'EXIT', R} ->
- error_info(R, Name, Msg, State, Debug),
- exit(R);
- _ ->
- case Reason of
- normal ->
- exit(normal);
- shutdown ->
- exit(shutdown);
- {shutdown,_}=Shutdown ->
- exit(Shutdown);
- _ ->
- error_info(Reason, Name, Msg, State, Debug),
- exit(Reason)
- end
+ {'EXIT', R} ->
+ error_info(R, Reason, Name, Msg, State, Debug),
+ exit(R);
+ _ ->
+ case Reason of
+ normal ->
+ exit(normal);
+ shutdown ->
+ exit(shutdown);
+ {shutdown,_}=Shutdown ->
+ exit(Shutdown);
+ _ ->
+ error_info(Reason, undefined, Name, Msg, State, Debug),
+ exit(Reason)
+ end
end.
-error_info(_Reason, application_controller, _Msg, _State, _Debug) ->
+error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) ->
%% OTP-5811 Don't send an error report if it's the system process
%% application_controller which is terminating - let init take care
%% of it instead
ok;
-error_info(Reason, Name, Msg, State, Debug) ->
- Reason1 =
- case Reason of
- {undef,[{M,F,A}|MFAs]} ->
- case code:is_loaded(M) of
- false ->
- {'module could not be loaded',[{M,F,A}|MFAs]};
- _ ->
- case erlang:function_exported(M, F, length(A)) of
- true ->
- Reason;
- false ->
- {'function not exported',[{M,F,A}|MFAs]}
- end
- end;
- _ ->
- Reason
- end,
- format("** Generic server ~p terminating \n"
- "** Last message in was ~p~n"
- "** When Server state == ~p~n"
- "** Reason for termination == ~n** ~p~n",
- [Name, Msg, State, Reason1]),
+error_info(Reason, RootCause, Name, Msg, State, Debug) ->
+ Reason1 = error_reason(Reason),
+ Fmt =
+ "** Generic server ~p terminating~n"
+ "** Last message in was ~p~n"
+ "** When Server state == ~p~n"
+ "** Reason for termination == ~n** ~p~n",
+ case RootCause of
+ undefined -> format(Fmt, [Name, Msg, State, Reason1]);
+ _ -> format(Fmt ++ "** In 'terminate' callback "
+ "with reason ==~n** ~p~n",
+ [Name, Msg, State, Reason1,
+ error_reason(RootCause)])
+ end,
sys:print_log(Debug),
ok.
+error_reason({undef,[{M,F,A}|MFAs]} = Reason) ->
+ case code:is_loaded(M) of
+ false -> {'module could not be loaded',[{M,F,A}|MFAs]};
+ _ -> case erlang:function_exported(M, F, length(A)) of
+ true -> Reason;
+ false -> {'function not exported',[{M,F,A}|MFAs]}
+ end
+ end;
+error_reason(Reason) ->
+ Reason.
+
%%% ---------------------------------------------------
%%% Misc. functions.
%%% ---------------------------------------------------
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 41c628a0..8c36a9f0 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -83,12 +83,6 @@
{requires, external_infrastructure},
{enables, kernel_ready}]}).
--rabbit_boot_step({rabbit_hooks,
- [{description, "internal event notification system"},
- {mfa, {rabbit_hooks, start, []}},
- {requires, external_infrastructure},
- {enables, kernel_ready}]}).
-
-rabbit_boot_step({rabbit_event,
[{description, "statistics event manager"},
{mfa, {rabbit_sup, start_restartable_child,
@@ -211,8 +205,7 @@
%%----------------------------------------------------------------------------
prepare() ->
- ok = ensure_working_log_handlers(),
- ok = rabbit_mnesia:ensure_mnesia_dir().
+ ok = ensure_working_log_handlers().
start() ->
try
@@ -496,11 +489,16 @@ maybe_insert_default_data() ->
insert_default_data() ->
{ok, DefaultUser} = application:get_env(default_user),
{ok, DefaultPass} = application:get_env(default_pass),
+ {ok, DefaultAdmin} = application:get_env(default_user_is_admin),
{ok, DefaultVHost} = application:get_env(default_vhost),
{ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} =
application:get_env(default_permissions),
ok = rabbit_access_control:add_vhost(DefaultVHost),
ok = rabbit_access_control:add_user(DefaultUser, DefaultPass),
+ case DefaultAdmin of
+ true -> rabbit_access_control:set_admin(DefaultUser);
+ _ -> ok
+ end,
ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost,
DefaultConfigurePerm,
DefaultWritePerm,
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 8d00f591..73fd6f0e 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -35,11 +35,12 @@
-export([check_login/2, user_pass_login/2,
check_vhost_access/2, check_resource_access/3]).
--export([add_user/2, delete_user/1, change_password/2, list_users/0,
- lookup_user/1]).
--export([add_vhost/1, delete_vhost/1, list_vhosts/0]).
+-export([add_user/2, delete_user/1, change_password/2, set_admin/1,
+ clear_admin/1, list_users/0, lookup_user/1]).
+-export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]).
-export([set_permissions/5, set_permissions/6, clear_permissions/2,
- list_vhost_permissions/1, list_user_permissions/1]).
+ list_permissions/0, list_vhost_permissions/1, list_user_permissions/1,
+ list_user_vhost_permissions/2]).
%%----------------------------------------------------------------------------
@@ -52,6 +53,7 @@
-type(password() :: binary()).
-type(regexp() :: binary()).
-type(scope() :: binary()).
+-type(scope_atom() :: 'client' | 'all').
-spec(check_login/2 ::
(binary(), binary()) -> rabbit_types:user() |
@@ -68,26 +70,33 @@
-spec(add_user/2 :: (username(), password()) -> 'ok').
-spec(delete_user/1 :: (username()) -> 'ok').
-spec(change_password/2 :: (username(), password()) -> 'ok').
+-spec(set_admin/1 :: (username()) -> 'ok').
+-spec(clear_admin/1 :: (username()) -> 'ok').
-spec(list_users/0 :: () -> [username()]).
-spec(lookup_user/1 ::
(username()) -> rabbit_types:ok(rabbit_types:user())
| rabbit_types:error('not_found')).
--spec(add_vhost/1 ::
- (rabbit_types:vhost()) -> 'ok').
--spec(delete_vhost/1 ::
- (rabbit_types:vhost()) -> 'ok').
+-spec(add_vhost/1 :: (rabbit_types:vhost()) -> 'ok').
+-spec(delete_vhost/1 :: (rabbit_types:vhost()) -> 'ok').
+-spec(vhost_exists/1 :: (rabbit_types:vhost()) -> boolean()).
-spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]).
-spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(),
regexp(), regexp()) -> 'ok').
-spec(set_permissions/6 ::(scope(), username(), rabbit_types:vhost(),
regexp(), regexp(), regexp()) -> 'ok').
-spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok').
+-spec(list_permissions/0 ::
+ () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp(),
+ scope_atom()}]).
-spec(list_vhost_permissions/1 ::
- (rabbit_types:vhost())
- -> [{username(), regexp(), regexp(), regexp()}]).
+ (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp(),
+ scope_atom()}]).
-spec(list_user_permissions/1 ::
- (username())
- -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]).
+ (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp(),
+ scope_atom()}]).
+-spec(list_user_vhost_permissions/2 ::
+ (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp(),
+ scope_atom()}]).
-endif.
@@ -142,7 +151,7 @@ internal_lookup_vhost_access(Username, VHostPath) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:read({rabbit_user_permission,
- #user_vhost{username = Username,
+ #user_vhost{username = Username,
virtual_host = VHostPath}}) of
[] -> not_found;
[R] -> {ok, R}
@@ -160,7 +169,6 @@ check_vhost_access(#user{username = Username}, VHostPath) ->
[VHostPath, Username])
end.
-permission_index(scope) -> #permission.scope;
permission_index(configure) -> #permission.configure;
permission_index(write) -> #permission.write;
permission_index(read) -> #permission.read.
@@ -171,27 +179,29 @@ check_resource_access(Username,
check_resource_access(Username,
R#resource{name = <<"amq.default">>},
Permission);
-check_resource_access(_Username,
- #resource{name = <<"amq.gen",_/binary>>},
- #permission{scope = client}) ->
- ok;
check_resource_access(Username,
R = #resource{virtual_host = VHostPath, name = Name},
Permission) ->
Res = case mnesia:dirty_read({rabbit_user_permission,
- #user_vhost{username = Username,
+ #user_vhost{username = Username,
virtual_host = VHostPath}}) of
[] ->
false;
[#user_permission{permission = P}] ->
- PermRegexp = case element(permission_index(Permission), P) of
- %% <<"^$">> breaks Emacs' erlang mode
- <<"">> -> <<$^, $$>>;
- RE -> RE
- end,
- case re:run(Name, PermRegexp, [{capture, none}]) of
- match -> true;
- nomatch -> false
+ case {Name, P} of
+ {<<"amq.gen",_/binary>>, #permission{scope = client}} ->
+ true;
+ _ ->
+ PermRegexp =
+ case element(permission_index(Permission), P) of
+ %% <<"^$">> breaks Emacs' erlang mode
+ <<"">> -> <<$^, $$>>;
+ RE -> RE
+ end,
+ case re:run(Name, PermRegexp, [{capture, none}]) of
+ match -> true;
+ nomatch -> false
+ end
end
end,
if Res -> ok;
@@ -207,7 +217,8 @@ add_user(Username, Password) ->
[] ->
ok = mnesia:write(rabbit_user,
#user{username = Username,
- password = Password},
+ password = Password,
+ is_admin = false},
write);
_ ->
mnesia:abort({user_already_exists, Username})
@@ -237,20 +248,39 @@ delete_user(Username) ->
R.
change_password(Username, Password) ->
- R = rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user(
- Username,
- fun () ->
- ok = mnesia:write(rabbit_user,
- #user{username = Username,
- password = Password},
- write)
- end)),
+ R = update_user(Username, fun(User) ->
+ User#user{password = Password}
+ end),
rabbit_log:info("Changed password for user ~p~n", [Username]),
R.
+set_admin(Username) ->
+ set_admin(Username, true).
+
+clear_admin(Username) ->
+ set_admin(Username, false).
+
+set_admin(Username, IsAdmin) ->
+ R = update_user(Username, fun(User) ->
+ User#user{is_admin = IsAdmin}
+ end),
+ rabbit_log:info("Set user admin flag for user ~p to ~p~n",
+ [Username, IsAdmin]),
+ R.
+
+update_user(Username, Fun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user(
+ Username,
+ fun () ->
+ {ok, User} = lookup_user(Username),
+ ok = mnesia:write(rabbit_user, Fun(User), write)
+ end)).
+
list_users() ->
- mnesia:dirty_all_keys(rabbit_user).
+ [{Username, IsAdmin} ||
+ #user{username = Username, is_admin = IsAdmin} <-
+ mnesia:dirty_match_object(rabbit_user, #user{_ = '_'})].
lookup_user(Username) ->
rabbit_misc:dirty_read({rabbit_user, Username}).
@@ -300,7 +330,7 @@ delete_vhost(VHostPath) ->
R.
internal_delete_vhost(VHostPath) ->
- lists:foreach(fun (#exchange{name=Name}) ->
+ lists:foreach(fun (#exchange{name = Name}) ->
ok = rabbit_exchange:delete(Name, false)
end,
rabbit_exchange:list(VHostPath)),
@@ -311,6 +341,9 @@ internal_delete_vhost(VHostPath) ->
ok = mnesia:delete({rabbit_vhost, VHostPath}),
ok.
+vhost_exists(VHostPath) ->
+ mnesia:dirty_read({rabbit_vhost, VHostPath}) /= [].
+
list_vhosts() ->
mnesia:dirty_all_keys(rabbit_vhost).
@@ -338,13 +371,13 @@ set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPer
fun () -> ok = mnesia:write(
rabbit_user_permission,
#user_permission{user_vhost = #user_vhost{
- username = Username,
+ username = Username,
virtual_host = VHostPath},
permission = #permission{
- scope = Scope,
+ scope = Scope,
configure = ConfigurePerm,
- write = WritePerm,
- read = ReadPerm}},
+ write = WritePerm,
+ read = ReadPerm}},
write)
end)).
@@ -355,10 +388,15 @@ clear_permissions(Username, VHostPath) ->
Username, VHostPath,
fun () ->
ok = mnesia:delete({rabbit_user_permission,
- #user_vhost{username = Username,
+ #user_vhost{username = Username,
virtual_host = VHostPath}})
end)).
+list_permissions() ->
+ [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
+ {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
+ list_permissions(match_user_vhost('_', '_'))].
+
list_vhost_permissions(VHostPath) ->
[{Username, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
{Username, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
@@ -371,15 +409,21 @@ list_user_permissions(Username) ->
list_permissions(rabbit_misc:with_user(
Username, match_user_vhost(Username, '_')))].
+list_user_vhost_permissions(Username, VHostPath) ->
+ [{ConfigurePerm, WritePerm, ReadPerm, Scope} ||
+ {_, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
+ list_permissions(rabbit_misc:with_user_and_vhost(
+ Username, VHostPath,
+ match_user_vhost(Username, VHostPath)))].
+
list_permissions(QueryThunk) ->
[{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
- #user_permission{user_vhost = #user_vhost{username = Username,
+ #user_permission{user_vhost = #user_vhost{username = Username,
virtual_host = VHostPath},
- permission = #permission{
- scope = Scope,
- configure = ConfigurePerm,
- write = WritePerm,
- read = ReadPerm}} <-
+ permission = #permission{ scope = Scope,
+ configure = ConfigurePerm,
+ write = WritePerm,
+ read = ReadPerm}} <-
%% TODO: use dirty ops instead
rabbit_misc:execute_mnesia_transaction(QueryThunk)].
@@ -387,7 +431,7 @@ match_user_vhost(Username, VHostPath) ->
fun () -> mnesia:match_object(
rabbit_user_permission,
#user_permission{user_vhost = #user_vhost{
- username = Username,
+ username = Username,
virtual_host = VHostPath},
permission = '_'},
read)
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index abcb87d7..3e677c38 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -56,7 +56,7 @@
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
--define(EXPIRES_TYPE, long).
+-define(EXPIRES_TYPES, [byte, short, signedint, long]).
%%----------------------------------------------------------------------------
@@ -249,11 +249,12 @@ start_queue_process(Q) ->
Q#amqqueue{pid = Pid}.
add_default_binding(#amqqueue{name = QueueName}) ->
- Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
+ ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [],
- fun (_X, _Q) -> ok end),
- ok.
+ rabbit_binding:add(#binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = RoutingKey,
+ args = []}).
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_queue, Name}).
@@ -313,13 +314,13 @@ check_declare_arguments(QueueName, Args) ->
check_expires_argument(undefined) ->
ok;
-check_expires_argument({?EXPIRES_TYPE, Expires})
- when is_integer(Expires) andalso Expires > 0 ->
- ok;
-check_expires_argument({?EXPIRES_TYPE, _Expires}) ->
- {error, expires_zero_or_less};
-check_expires_argument(_) ->
- {error, expires_not_of_type_long}.
+check_expires_argument({Type, Expires}) when Expires > 0 ->
+ case lists:member(Type, ?EXPIRES_TYPES) of
+ true -> ok;
+ false -> {error, {expires_not_of_acceptable_type, Type, Expires}}
+ end;
+check_expires_argument({_Type, _Expires}) ->
+ {error, expires_zero_or_less}.
list(VHostPath) ->
mnesia:dirty_match_object(
@@ -433,7 +434,7 @@ internal_delete1(QueueName) ->
%% we want to execute some things, as
%% decided by rabbit_exchange, after the
%% transaction.
- rabbit_exchange:delete_queue_bindings(QueueName).
+ rabbit_binding:remove_for_queue(QueueName).
internal_delete(QueueName) ->
case
@@ -477,7 +478,7 @@ on_node_down(Node) ->
ok.
delete_queue(QueueName) ->
- Post = rabbit_exchange:delete_transient_queue_bindings(QueueName),
+ Post = rabbit_binding:remove_transient_for_queue(QueueName),
ok = mnesia:delete({rabbit_queue, QueueName}),
Post.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 97a264d7..7b6f9897 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -147,8 +147,8 @@ code_change(_OldVsn, State, _Extra) ->
init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of
- {long, Expires} -> ensure_expiry_timer(State#q{expires = Expires});
- undefined -> State
+ {_Type, Expires} -> ensure_expiry_timer(State#q{expires = Expires});
+ undefined -> State
end.
declare(Recover, From,
@@ -164,10 +164,8 @@ declare(Recover, From,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
- rabbit_event:notify(
- queue_created,
- [{Item, i(Item, State)} ||
- Item <- ?CREATION_EVENT_KEYS]),
+ rabbit_event:notify(queue_created,
+ infos(?CREATION_EVENT_KEYS, State)),
noreply(init_expires(State#q{backing_queue_state = BQS}));
Q1 -> {stop, normal, {existing, Q1}, State}
end.
@@ -588,8 +586,7 @@ i(Item, _) ->
throw({bad_argument, Item}).
emit_stats(State) ->
- rabbit_event:notify(queue_stats,
- [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
+ rabbit_event:notify(queue_stats, infos(?STATISTICS_KEYS, State)).
%---------------------------------------------------------------------------
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
new file mode 100644
index 00000000..bb29580f
--- /dev/null
+++ b/src/rabbit_binding.erl
@@ -0,0 +1,378 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_binding).
+-include("rabbit.hrl").
+
+-export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]).
+-export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]).
+-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
+%% these must all be run inside a mnesia tx
+-export([has_for_exchange/1, remove_for_exchange/1,
+ remove_for_queue/1, remove_transient_for_queue/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-export_type([key/0]).
+
+-type(key() :: binary()).
+
+-type(bind_errors() :: rabbit_types:error('queue_not_found' |
+ 'exchange_not_found' |
+ 'exchange_and_queue_not_found')).
+-type(bind_res() :: 'ok' | bind_errors()).
+-type(inner_fun() ::
+ fun((rabbit_types:exchange(), queue()) ->
+ rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
+-type(bindings() :: [rabbit_types:binding()]).
+
+-spec(recover/0 :: () -> [rabbit_types:binding()]).
+-spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()).
+-spec(add/1 :: (rabbit_types:binding()) -> bind_res()).
+-spec(remove/1 :: (rabbit_types:binding()) ->
+ bind_res() | rabbit_types:error('binding_not_found')).
+-spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()).
+-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) ->
+ bind_res() | rabbit_types:error('binding_not_found')).
+-spec(list/1 :: (rabbit_types:vhost()) -> bindings()).
+-spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()).
+-spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()).
+-spec(list_for_exchange_and_queue/2 ::
+ (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()).
+-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
+-spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]).
+-spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) ->
+ [rabbit_types:info()]).
+-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]).
+-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()])
+ -> [[rabbit_types:info()]]).
+-spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()).
+-spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()).
+-spec(remove_for_queue/1 ::
+ (rabbit_amqqueue:name()) -> fun (() -> any())).
+-spec(remove_transient_for_queue/1 ::
+ (rabbit_amqqueue:name()) -> fun (() -> any())).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]).
+
+recover() ->
+ rabbit_misc:table_fold(
+ fun (Route = #route{binding = B}, Acc) ->
+ {_, ReverseRoute} = route_with_reverse(Route),
+ ok = mnesia:write(rabbit_route, Route, write),
+ ok = mnesia:write(rabbit_reverse_route, ReverseRoute, write),
+ [B | Acc]
+ end, [], rabbit_durable_route).
+
+exists(Binding) ->
+ binding_action(
+ Binding,
+ fun (_X, _Q, B) -> mnesia:read({rabbit_route, B}) /= [] end).
+
+add(Binding) -> add(Binding, fun (_X, _Q) -> ok end).
+
+remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end).
+
+add(Binding, InnerFun) ->
+ case binding_action(
+ Binding,
+ fun (X, Q, B) ->
+ %% this argument is used to check queue exclusivity;
+ %% in general, we want to fail on that in preference to
+ %% anything else
+ case InnerFun(X, Q) of
+ ok ->
+ case mnesia:read({rabbit_route, B}) of
+ [] -> Durable = (X#exchange.durable andalso
+ Q#amqqueue.durable),
+ ok = sync_binding(
+ B, Durable,
+ fun mnesia:write/3),
+ {new, X, B};
+ [_] -> {existing, X, B}
+ end;
+ {error, _} = E ->
+ E
+ end
+ end) of
+ {new, Exchange = #exchange{ type = Type }, B} ->
+ ok = (type_to_module(Type)):add_binding(Exchange, B),
+ rabbit_event:notify(binding_created, info(B));
+ {existing, _, _} ->
+ ok;
+ {error, _} = Err ->
+ Err
+ end.
+
+remove(Binding, InnerFun) ->
+ case binding_action(
+ Binding,
+ fun (X, Q, B) ->
+ case mnesia:match_object(rabbit_route, #route{binding = B},
+ write) of
+ [] -> {error, binding_not_found};
+ [_] -> case InnerFun(X, Q) of
+ ok ->
+ Durable = (X#exchange.durable andalso
+ Q#amqqueue.durable),
+ ok = sync_binding(
+ B, Durable,
+ fun mnesia:delete_object/3),
+ Deleted =
+ rabbit_exchange:maybe_auto_delete(X),
+ {{Deleted, X}, B};
+ {error, _} = E ->
+ E
+ end
+ end
+ end) of
+ {error, _} = Err ->
+ Err;
+ {{IsDeleted, X = #exchange{ type = Type }}, B} ->
+ Module = type_to_module(Type),
+ case IsDeleted of
+ auto_deleted -> ok = Module:delete(X, [B]);
+ not_deleted -> ok = Module:remove_bindings(X, [B])
+ end,
+ rabbit_event:notify(binding_deleted, info(B)),
+ ok
+ end.
+
+list(VHostPath) ->
+ Route = #route{binding = #binding{
+ exchange_name = rabbit_misc:r(VHostPath, exchange),
+ queue_name = rabbit_misc:r(VHostPath, queue),
+ _ = '_'},
+ _ = '_'},
+ [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
+ Route)].
+
+list_for_exchange(ExchangeName) ->
+ Route = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
+ [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
+ Route)].
+
+list_for_queue(QueueName) ->
+ Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}},
+ [reverse_binding(B) || #reverse_route{reverse_binding = B} <-
+ mnesia:dirty_match_object(rabbit_reverse_route,
+ reverse_route(Route))].
+
+list_for_exchange_and_queue(ExchangeName, QueueName) ->
+ Route = #route{binding = #binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ _ = '_'}},
+ [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
+ Route)].
+
+info_keys() -> ?INFO_KEYS.
+
+map(VHostPath, F) ->
+ %% TODO: there is scope for optimisation here, e.g. using a
+ %% cursor, parallelising the function invocation
+ lists:map(F, list(VHostPath)).
+
+infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items].
+
+i(exchange_name, #binding{exchange_name = XName}) -> XName;
+i(queue_name, #binding{queue_name = QName}) -> QName;
+i(routing_key, #binding{key = RoutingKey}) -> RoutingKey;
+i(arguments, #binding{args = Arguments}) -> Arguments;
+i(Item, _) -> throw({bad_argument, Item}).
+
+info(B = #binding{}) -> infos(?INFO_KEYS, B).
+
+info(B = #binding{}, Items) -> infos(Items, B).
+
+info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end).
+
+info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end).
+
+has_for_exchange(ExchangeName) ->
+ Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
+ %% we need to check for durable routes here too in case a bunch of
+ %% routes to durable queues have been removed temporarily as a
+ %% result of a node failure
+ contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match).
+
+remove_for_exchange(ExchangeName) ->
+ [begin
+ ok = mnesia:delete_object(rabbit_reverse_route,
+ reverse_route(Route), write),
+ ok = delete_forward_routes(Route),
+ Route#route.binding
+ end || Route <- mnesia:match_object(
+ rabbit_route,
+ #route{binding = #binding{exchange_name = ExchangeName,
+ _ = '_'}},
+ write)].
+
+remove_for_queue(QueueName) ->
+ remove_for_queue(QueueName, fun delete_forward_routes/1).
+
+remove_transient_for_queue(QueueName) ->
+ remove_for_queue(QueueName, fun delete_transient_forward_routes/1).
+
+%%----------------------------------------------------------------------------
+
+binding_action(Binding = #binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ args = Arguments}, Fun) ->
+ call_with_exchange_and_queue(
+ ExchangeName, QueueName,
+ fun (X, Q) ->
+ SortedArgs = rabbit_misc:sort_field_table(Arguments),
+ Fun(X, Q, Binding#binding{args = SortedArgs})
+ end).
+
+sync_binding(Binding, Durable, Fun) ->
+ ok = case Durable of
+ true -> Fun(rabbit_durable_route,
+ #route{binding = Binding}, write);
+ false -> ok
+ end,
+ {Route, ReverseRoute} = route_with_reverse(Binding),
+ ok = Fun(rabbit_route, Route, write),
+ ok = Fun(rabbit_reverse_route, ReverseRoute, write),
+ ok.
+
+call_with_exchange_and_queue(Exchange, Queue, Fun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> case {mnesia:read({rabbit_exchange, Exchange}),
+ mnesia:read({rabbit_queue, Queue})} of
+ {[X], [Q]} -> Fun(X, Q);
+ {[ ], [_]} -> {error, exchange_not_found};
+ {[_], [ ]} -> {error, queue_not_found};
+ {[ ], [ ]} -> {error, exchange_and_queue_not_found}
+ end
+ end).
+
+%% Used with atoms from records; e.g., the type is expected to exist.
+type_to_module(T) ->
+ {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
+ Module.
+
+contains(Table, MatchHead) ->
+ continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
+
+continue('$end_of_table') -> false;
+continue({[_|_], _}) -> true;
+continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
+
+remove_for_queue(QueueName, FwdDeleteFun) ->
+ DeletedBindings =
+ [begin
+ Route = reverse_route(ReverseRoute),
+ ok = FwdDeleteFun(Route),
+ ok = mnesia:delete_object(rabbit_reverse_route,
+ ReverseRoute, write),
+ Route#route.binding
+ end || ReverseRoute
+ <- mnesia:match_object(
+ rabbit_reverse_route,
+ reverse_route(#route{binding = #binding{
+ queue_name = QueueName,
+ _ = '_'}}),
+ write)],
+ Grouped = group_bindings_and_auto_delete(
+ lists:keysort(#binding.exchange_name, DeletedBindings), []),
+ fun () ->
+ lists:foreach(
+ fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) ->
+ Module = type_to_module(Type),
+ case IsDeleted of
+ auto_deleted -> Module:delete(X, Bs);
+ not_deleted -> Module:remove_bindings(X, Bs)
+ end
+ end, Grouped)
+ end.
+
+%% Requires that its input binding list is sorted in exchange-name
+%% order, so that the grouping of bindings (for passing to
+%% group_bindings_and_auto_delete1) works properly.
+group_bindings_and_auto_delete([], Acc) ->
+ Acc;
+group_bindings_and_auto_delete(
+ [B = #binding{exchange_name = ExchangeName} | Bs], Acc) ->
+ group_bindings_and_auto_delete(ExchangeName, Bs, [B], Acc).
+
+group_bindings_and_auto_delete(
+ ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs],
+ Bindings, Acc) ->
+ group_bindings_and_auto_delete(ExchangeName, Bs, [B | Bindings], Acc);
+group_bindings_and_auto_delete(ExchangeName, Removed, Bindings, Acc) ->
+ %% either Removed is [], or its head has a non-matching ExchangeName
+ [X] = mnesia:read({rabbit_exchange, ExchangeName}),
+ NewAcc = [{{rabbit_exchange:maybe_auto_delete(X), X}, Bindings} | Acc],
+ group_bindings_and_auto_delete(Removed, NewAcc).
+
+delete_forward_routes(Route) ->
+ ok = mnesia:delete_object(rabbit_route, Route, write),
+ ok = mnesia:delete_object(rabbit_durable_route, Route, write).
+
+delete_transient_forward_routes(Route) ->
+ ok = mnesia:delete_object(rabbit_route, Route, write).
+
+route_with_reverse(#route{binding = Binding}) ->
+ route_with_reverse(Binding);
+route_with_reverse(Binding = #binding{}) ->
+ Route = #route{binding = Binding},
+ {Route, reverse_route(Route)}.
+
+reverse_route(#route{binding = Binding}) ->
+ #reverse_route{reverse_binding = reverse_binding(Binding)};
+
+reverse_route(#reverse_route{reverse_binding = Binding}) ->
+ #route{binding = reverse_binding(Binding)}.
+
+reverse_binding(#reverse_binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args}) ->
+ #binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args};
+
+reverse_binding(#binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args}) ->
+ #reverse_binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7bff5a0c..f19f98d2 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,7 +35,7 @@
-behaviour(gen_server2).
--export([start_link/6, do/2, do/3, shutdown/1]).
+-export([start_link/7, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([emit_stats/1, flush/1]).
@@ -45,7 +45,7 @@
prioritise_cast/2]).
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
- transaction_id, tx_participants, next_tag,
+ start_limiter_fun, transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer}).
@@ -77,9 +77,11 @@
-type(channel_number() :: non_neg_integer()).
--spec(start_link/6 ::
+-spec(start_link/7 ::
(channel_number(), pid(), pid(), rabbit_access_control:username(),
- rabbit_types:vhost(), pid()) -> rabbit_types:ok_pid_or_error()).
+ rabbit_types:vhost(), pid(),
+ fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
+ rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
@@ -101,9 +103,10 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) ->
- gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid,
- Username, VHost, CollectorPid], []).
+start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
+ StartLimiterFun) ->
+ gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, Username,
+ VHost, CollectorPid, StartLimiterFun], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -151,15 +154,16 @@ flush(Pid) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
+init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
+ StartLimiterFun]) ->
process_flag(trap_exit, true),
- link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
State = #ch{state = starting,
channel = Channel,
reader_pid = ReaderPid,
writer_pid = WriterPid,
limiter_pid = undefined,
+ start_limiter_fun = StartLimiterFun,
transaction_id = none,
tx_participants = sets:new(),
next_tag = 1,
@@ -172,9 +176,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
blocking = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = rabbit_event:init_stats_timer()},
- rabbit_event:notify(
- channel_created,
- [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]),
+ rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -253,12 +255,6 @@ handle_cast(emit_stats, State) ->
internal_emit_stats(State),
{noreply, State}.
-handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
- State = #ch{writer_pid = WriterPid}) ->
- State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
- {stop, normal, State};
-handle_info({'EXIT', _Pid, Reason}, State) ->
- {stop, Reason, State};
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State)}.
@@ -273,8 +269,10 @@ terminate(_Reason, State = #ch{state = terminating}) ->
terminate(Reason, State) ->
Res = rollback_and_notify(State),
case Reason of
- normal -> ok = Res;
- _ -> ok
+ normal -> ok = Res;
+ shutdown -> ok = Res;
+ {shutdown, _Term} -> ok = Res;
+ _ -> ok
end,
terminate(State).
@@ -417,7 +415,7 @@ handle_method(_Method, _, #ch{state = starting}) ->
handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
ok = rollback_and_notify(State),
- ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
+ ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
stop;
handle_method(#'access.request'{},_, State) ->
@@ -821,17 +819,17 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
routing_key = RoutingKey,
nowait = NoWait,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin,
- QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
- NoWait, State);
+ binding_action(fun rabbit_binding:add/2,
+ ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ #'queue.bind_ok'{}, NoWait, State);
handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin,
- QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
- false, State);
+ binding_action(fun rabbit_binding:remove/2,
+ ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ #'queue.unbind_ok'{}, false, State);
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
@@ -909,7 +907,10 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
- case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments,
+ case Fun(#binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = ActualRoutingKey,
+ args = Arguments},
fun (_X, Q) ->
try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
catch exit:Reason -> {error, Reason}
@@ -1030,8 +1031,8 @@ fold_per_queue(F, Acc0, UAQ) ->
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
-start_limiter(State = #ch{unacked_message_q = UAMQ}) ->
- {ok, LPid} = rabbit_limiter:start_link(self(), queue:len(UAMQ)),
+start_limiter(State = #ch{unacked_message_q = UAMQ, start_limiter_fun = SLF}) ->
+ {ok, LPid} = SLF(queue:len(UAMQ)),
ok = limit_queues(LPid, State),
LPid.
@@ -1103,11 +1104,9 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
false -> rabbit_writer:send_command(WriterPid, M, Content)
end.
-terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) ->
+terminate(_State) ->
pg_local:leave(rabbit_channels, self()),
- rabbit_event:notify(channel_closed, [{pid, self()}]),
- rabbit_writer:shutdown(WriterPid),
- rabbit_limiter:shutdown(LimiterPid).
+ rabbit_event:notify(channel_closed, [{pid, self()}]).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -1164,7 +1163,7 @@ update_measures(Type, QX, Inc, Measure) ->
orddict:store(Measure, Cur + Inc, Measures)).
internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
- CoarseStats = [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS],
+ CoarseStats = infos(?STATISTICS_KEYS, State),
case rabbit_event:stats_level(StatsTimer) of
coarse ->
rabbit_event:notify(channel_stats, CoarseStats);
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
new file mode 100644
index 00000000..02199a65
--- /dev/null
+++ b/src/rabbit_channel_sup.erl
@@ -0,0 +1,96 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_channel_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/1]).
+
+-export([init/1]).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-export_type([start_link_args/0]).
+
+-type(start_link_args() ::
+ {rabbit_types:protocol(), rabbit_net:socket(),
+ rabbit_channel:channel_number(), non_neg_integer(), pid(),
+ rabbit_access_control:username(), rabbit_types:vhost(), pid()}).
+
+-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
+ Collector}) ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, WriterPid} =
+ supervisor2:start_child(
+ SupPid,
+ {writer, {rabbit_writer, start_link,
+ [Sock, Channel, FrameMax, Protocol, ReaderPid]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}),
+ {ok, ChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel, {rabbit_channel, start_link,
+ [Channel, ReaderPid, WriterPid, Username, VHost,
+ Collector, start_limiter_fun(SupPid)]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
+ {ok, FramingChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {framing_channel, {rabbit_framing_channel, start_link,
+ [ReaderPid, ChannelPid, Protocol]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}),
+ {ok, SupPid, FramingChannelPid}.
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
+
+start_limiter_fun(SupPid) ->
+ fun (UnackedCount) ->
+ Me = self(),
+ {ok, _Pid} =
+ supervisor2:start_child(
+ SupPid,
+ {limiter, {rabbit_limiter, start_link, [Me, UnackedCount]},
+ transient, ?MAX_WAIT, worker, [rabbit_limiter]})
+ end.
diff --git a/src/rabbit_tracer.erl b/src/rabbit_channel_sup_sup.erl
index 484249b1..d1938805 100644
--- a/src/rabbit_tracer.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -29,22 +29,37 @@
%% Contributor(s): ______________________________________.
%%
--module(rabbit_tracer).
--export([start/0]).
+-module(rabbit_channel_sup_sup).
--import(erlang).
+-behaviour(supervisor2).
-start() ->
- spawn(fun mainloop/0),
- ok.
+-export([start_link/0, start_channel/2]).
-mainloop() ->
- erlang:trace(new, true, [all]),
- mainloop1().
+-export([init/1]).
-mainloop1() ->
- receive
- Msg ->
- rabbit_log:info("TRACE: ~p~n", [Msg])
- end,
- mainloop1().
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) ->
+ {'ok', pid(), pid()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ supervisor2:start_link(?MODULE, []).
+
+start_channel(Pid, Args) ->
+ {ok, ChSupPid, _} = Result = supervisor2:start_child(Pid, [Args]),
+ link(ChSupPid),
+ Result.
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{simple_one_for_one_terminate, 0, 1},
+ [{channel_sup, {rabbit_channel_sup, start_link, []},
+ temporary, infinity, supervisor, [rabbit_channel_sup]}]}}.
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
new file mode 100644
index 00000000..b3821d3b
--- /dev/null
+++ b/src/rabbit_connection_sup.erl
@@ -0,0 +1,99 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_connection_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/0, reader/1]).
+
+-export([init/1]).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid(), pid()}).
+-spec(reader/1 :: (pid()) -> pid()).
+
+-endif.
+
+%%--------------------------------------------------------------------------
+
+start_link() ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, ChannelSupSupPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
+ {ok, Collector} =
+ supervisor2:start_child(
+ SupPid,
+ {collector, {rabbit_queue_collector, start_link, []},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
+ {ok, ReaderPid} =
+ supervisor2:start_child(
+ SupPid,
+ {reader, {rabbit_reader, start_link,
+ [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
+ {ok, SupPid, ReaderPid}.
+
+reader(Pid) ->
+ hd(supervisor2:find_child(Pid, reader)).
+
+%%--------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
+
+start_heartbeat_fun(SupPid) ->
+ fun (_Sock, 0) ->
+ none;
+ (Sock, TimeoutSec) ->
+ Parent = self(),
+ {ok, Sender} =
+ supervisor2:start_child(
+ SupPid, {heartbeat_sender,
+ {rabbit_heartbeat, start_heartbeat_sender,
+ [Parent, Sock, TimeoutSec]},
+ transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ {ok, Receiver} =
+ supervisor2:start_child(
+ SupPid, {heartbeat_receiver,
+ {rabbit_heartbeat, start_heartbeat_receiver,
+ [Parent, Sock, TimeoutSec]},
+ transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ {Sender, Receiver}
+ end.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index f0b623c2..a3b6f369 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -209,6 +209,14 @@ action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) ->
Inform("Changing password for user ~p", [Username]),
call(Node, {rabbit_access_control, change_password, Args});
+action(set_admin, Node, [Username], _Opts, Inform) ->
+ Inform("Setting administrative status for user ~p", [Username]),
+ call(Node, {rabbit_access_control, set_admin, [Username]});
+
+action(clear_admin, Node, [Username], _Opts, Inform) ->
+ Inform("Clearing administrative status for user ~p", [Username]),
+ call(Node, {rabbit_access_control, clear_admin, [Username]});
+
action(list_users, Node, [], _Opts, Inform) ->
Inform("Listing users", []),
display_list(call(Node, {rabbit_access_control, list_users, []}));
@@ -246,14 +254,14 @@ action(list_exchanges, Node, Args, Opts, Inform) ->
[VHostArg, ArgAtoms]),
ArgAtoms);
-action(list_bindings, Node, _Args, Opts, Inform) ->
+action(list_bindings, Node, Args, Opts, Inform) ->
Inform("Listing bindings", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- InfoKeys = [exchange_name, queue_name, routing_key, args],
- display_info_list(
- [lists:zip(InfoKeys, tuple_to_list(X)) ||
- X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])],
- InfoKeys);
+ ArgAtoms = default_if_empty(Args, [exchange_name, queue_name,
+ routing_key, arguments]),
+ display_info_list(rpc_call(Node, rabbit_binding, info_all,
+ [VHostArg, ArgAtoms]),
+ ArgAtoms);
action(list_connections, Node, Args, _Opts, Inform) ->
Inform("Listing connections", []),
@@ -304,9 +312,11 @@ default_if_empty(List, Default) when is_list(List) ->
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
- lists:foreach(fun (Result) -> display_row([format_info_item(X, Result) ||
- X <- InfoItemKeys])
- end, Results),
+ lists:foreach(
+ fun (Result) -> display_row(
+ [format_info_item(proplists:get_value(X, Result)) ||
+ X <- InfoItemKeys])
+ end, Results),
ok;
display_info_list(Other, _) ->
Other.
@@ -315,25 +325,30 @@ display_row(Row) ->
io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))),
io:nl().
-format_info_item(Key, Items) ->
- case proplists:get_value(Key, Items) of
- #resource{name = Name} ->
- escape(Name);
- Value when Key =:= address; Key =:= peer_address andalso
- is_tuple(Value) ->
- inet_parse:ntoa(Value);
- Value when is_pid(Value) ->
- rabbit_misc:pid_to_string(Value);
- Value when is_binary(Value) ->
- escape(Value);
- Value when is_atom(Value) ->
- escape(atom_to_list(Value));
- Value = [{TableEntryKey, TableEntryType, _TableEntryValue} | _]
- when is_binary(TableEntryKey) andalso is_atom(TableEntryType) ->
- io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
- Value ->
- io_lib:format("~w", [Value])
- end.
+-define(IS_U8(X), (X >= 0 andalso X =< 255)).
+-define(IS_U16(X), (X >= 0 andalso X =< 65535)).
+
+format_info_item(#resource{name = Name}) ->
+ escape(Name);
+format_info_item({N1, N2, N3, N4} = Value) when
+ ?IS_U8(N1), ?IS_U8(N2), ?IS_U8(N3), ?IS_U8(N4) ->
+ inet_parse:ntoa(Value);
+format_info_item({K1, K2, K3, K4, K5, K6, K7, K8} = Value) when
+ ?IS_U16(K1), ?IS_U16(K2), ?IS_U16(K3), ?IS_U16(K4),
+ ?IS_U16(K5), ?IS_U16(K6), ?IS_U16(K7), ?IS_U16(K8) ->
+ inet_parse:ntoa(Value);
+format_info_item(Value) when is_pid(Value) ->
+ rabbit_misc:pid_to_string(Value);
+format_info_item(Value) when is_binary(Value) ->
+ escape(Value);
+format_info_item(Value) when is_atom(Value) ->
+ escape(atom_to_list(Value));
+format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] =
+ Value) when is_binary(TableEntryKey) andalso
+ is_atom(TableEntryType) ->
+ io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
+format_info_item(Value) ->
+ io_lib:format("~w", [Value]).
display_list(L) when is_list(L) ->
lists:foreach(fun (I) when is_binary(I) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index af4eb1bd..40bee25f 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -34,38 +34,19 @@
-include("rabbit_framing.hrl").
-export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0,
- info/1, info/2, info_all/1, info_all/2, publish/2]).
--export([add_binding/5, delete_binding/5, list_bindings/1]).
--export([delete/2]).
--export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
--export([assert_equivalence/5]).
--export([assert_args_equivalence/2]).
--export([check_type/1]).
-
-%% EXTENDED API
--export([list_exchange_bindings/1]).
--export([list_queue_bindings/1]).
-
--import(mnesia).
--import(sets).
--import(lists).
+ info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]).
+%% this must be run inside a mnesia tx
+-export([maybe_auto_delete/1]).
+-export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([name/0, type/0, binding_key/0]).
+-export_type([name/0, type/0]).
-type(name() :: rabbit_types:r('exchange')).
-type(type() :: atom()).
--type(binding_key() :: binary()).
-
--type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' |
- 'exchange_not_found' |
- 'exchange_and_queue_not_found')).
--type(inner_fun() ::
- fun((rabbit_types:exchange(), queue()) ->
- rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
-spec(recover/0 :: () -> 'ok').
-spec(declare/5 ::
@@ -97,32 +78,12 @@
-> [[rabbit_types:info()]]).
-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
-> {rabbit_router:routing_result(), [pid()]}).
--spec(add_binding/5 ::
- (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table(), inner_fun()) -> bind_res()).
--spec(delete_binding/5 ::
- (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table(), inner_fun())
- -> bind_res() | rabbit_types:error('binding_not_found')).
--spec(list_bindings/1 ::
- (rabbit_types:vhost())
- -> [{name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table()}]).
--spec(delete_queue_bindings/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
--spec(delete_transient_queue_bindings/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
-spec(delete/2 ::
(name(), boolean())-> 'ok' |
rabbit_types:error('not_found') |
rabbit_types:error('in_use')).
--spec(list_queue_bindings/1 ::
- (rabbit_amqqueue:name())
- -> [{name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table()}]).
--spec(list_exchange_bindings/1 ::
- (name()) -> [{rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table()}]).
+-spec(maybe_auto_delete/1:: (rabbit_types:exchange()) ->
+ 'not_deleted' | 'auto_deleted').
-endif.
@@ -136,19 +97,7 @@ recover() ->
ok = mnesia:write(rabbit_exchange, Exchange, write),
[Exchange | Acc]
end, [], rabbit_durable_exchange),
- Bs = rabbit_misc:table_fold(
- fun (Route = #route{binding = B}, Acc) ->
- {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(rabbit_route,
- Route, write),
- ok = mnesia:write(rabbit_reverse_route,
- ReverseRoute, write),
- [B | Acc]
- end, [], rabbit_durable_route),
- recover_with_bindings(Bs, Exs),
- ok.
-
-recover_with_bindings(Bs, Exs) ->
+ Bs = rabbit_binding:recover(),
recover_with_bindings(
lists:keysort(#binding.exchange_name, Bs),
lists:keysort(#exchange.name, Exs), []).
@@ -164,11 +113,11 @@ recover_with_bindings([], [], []) ->
ok.
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
- Exchange = #exchange{name = ExchangeName,
- type = Type,
- durable = Durable,
+ Exchange = #exchange{name = ExchangeName,
+ type = Type,
+ durable = Durable,
auto_delete = AutoDelete,
- arguments = Args},
+ arguments = Args},
%% We want to upset things if it isn't ok; this is different from
%% the other hooks invocations, where we tend to ignore the return
%% value.
@@ -192,9 +141,7 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
end
end) of
{new, X} -> TypeModule:create(X),
- rabbit_event:notify(
- exchange_created,
- [{Item, i(Item, Exchange)} || Item <- ?INFO_KEYS]),
+ rabbit_event:notify(exchange_created, info(X)),
X;
{existing, X} -> X;
Err -> Err
@@ -220,9 +167,9 @@ check_type(TypeBin) ->
end
end.
-assert_equivalence(X = #exchange{ durable = Durable,
+assert_equivalence(X = #exchange{ durable = Durable,
auto_delete = AutoDelete,
- type = Type},
+ type = Type},
Type, Durable, AutoDelete, RequiredArgs) ->
(type_to_module(Type)):assert_args_equivalence(X, RequiredArgs);
assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
@@ -232,8 +179,7 @@ assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
"cannot redeclare ~s with different type, durable or autodelete value",
[rabbit_misc:rs(Name)]).
-assert_args_equivalence(#exchange{ name = Name,
- arguments = Args },
+assert_args_equivalence(#exchange{ name = Name, arguments = Args },
RequiredArgs) ->
%% The spec says "Arguments are compared for semantic
%% equivalence". The only arg we care about is
@@ -311,92 +257,6 @@ publish(X = #exchange{type = Type}, Seen, Delivery) ->
R
end.
-%% TODO: Should all of the route and binding management not be
-%% refactored to its own module, especially seeing as unbind will have
-%% to be implemented for 0.91 ?
-
-delete_exchange_bindings(ExchangeName) ->
- [begin
- ok = mnesia:delete_object(rabbit_reverse_route,
- reverse_route(Route), write),
- ok = delete_forward_routes(Route),
- Route#route.binding
- end || Route <- mnesia:match_object(
- rabbit_route,
- #route{binding = #binding{exchange_name = ExchangeName,
- _ = '_'}},
- write)].
-
-delete_queue_bindings(QueueName) ->
- delete_queue_bindings(QueueName, fun delete_forward_routes/1).
-
-delete_transient_queue_bindings(QueueName) ->
- delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
-
-delete_queue_bindings(QueueName, FwdDeleteFun) ->
- DeletedBindings =
- [begin
- Route = reverse_route(ReverseRoute),
- ok = FwdDeleteFun(Route),
- ok = mnesia:delete_object(rabbit_reverse_route,
- ReverseRoute, write),
- Route#route.binding
- end || ReverseRoute
- <- mnesia:match_object(
- rabbit_reverse_route,
- reverse_route(#route{binding = #binding{
- queue_name = QueueName,
- _ = '_'}}),
- write)],
- Cleanup = cleanup_deleted_queue_bindings(
- lists:keysort(#binding.exchange_name, DeletedBindings), []),
- fun () ->
- lists:foreach(
- fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) ->
- Module = type_to_module(Type),
- case IsDeleted of
- auto_deleted -> Module:delete(X, Bs);
- not_deleted -> Module:remove_bindings(X, Bs)
- end
- end, Cleanup)
- end.
-
-%% Requires that its input binding list is sorted in exchange-name
-%% order, so that the grouping of bindings (for passing to
-%% cleanup_deleted_queue_bindings1) works properly.
-cleanup_deleted_queue_bindings([], Acc) ->
- Acc;
-cleanup_deleted_queue_bindings(
- [B = #binding{exchange_name = ExchangeName} | Bs], Acc) ->
- cleanup_deleted_queue_bindings(ExchangeName, Bs, [B], Acc).
-
-cleanup_deleted_queue_bindings(
- ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs],
- Bindings, Acc) ->
- cleanup_deleted_queue_bindings(ExchangeName, Bs, [B | Bindings], Acc);
-cleanup_deleted_queue_bindings(ExchangeName, Deleted, Bindings, Acc) ->
- %% either Deleted is [], or its head has a non-matching ExchangeName
- NewAcc = [cleanup_deleted_queue_bindings1(ExchangeName, Bindings) | Acc],
- cleanup_deleted_queue_bindings(Deleted, NewAcc).
-
-cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
- [X] = mnesia:read({rabbit_exchange, ExchangeName}),
- {maybe_auto_delete(X), Bindings}.
-
-delete_forward_routes(Route) ->
- ok = mnesia:delete_object(rabbit_route, Route, write),
- ok = mnesia:delete_object(rabbit_durable_route, Route, write).
-
-delete_transient_forward_routes(Route) ->
- ok = mnesia:delete_object(rabbit_route, Route, write).
-
-contains(Table, MatchHead) ->
- continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
-
-continue('$end_of_table') -> false;
-continue({[_|_], _}) -> true;
-continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
-
call_with_exchange(Exchange, Fun) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> case mnesia:read({rabbit_exchange, Exchange}) of
@@ -405,156 +265,6 @@ call_with_exchange(Exchange, Fun) ->
end
end).
-call_with_exchange_and_queue(Exchange, Queue, Fun) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () -> case {mnesia:read({rabbit_exchange, Exchange}),
- mnesia:read({rabbit_queue, Queue})} of
- {[X], [Q]} -> Fun(X, Q);
- {[ ], [_]} -> {error, exchange_not_found};
- {[_], [ ]} -> {error, queue_not_found};
- {[ ], [ ]} -> {error, exchange_and_queue_not_found}
- end
- end).
-
-add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
- case binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
- %% this argument is used to check queue exclusivity;
- %% in general, we want to fail on that in preference to
- %% anything else
- case InnerFun(X, Q) of
- ok ->
- case mnesia:read({rabbit_route, B}) of
- [] ->
- ok = sync_binding(B,
- X#exchange.durable andalso
- Q#amqqueue.durable,
- fun mnesia:write/3),
- rabbit_event:notify(
- binding_created,
- [{exchange_name, ExchangeName},
- {queue_name, QueueName},
- {routing_key, RoutingKey},
- {arguments, Arguments}]),
- {new, X, B};
- [_R] ->
- {existing, X, B}
- end;
- {error, _} = E ->
- E
- end
- end) of
- {new, Exchange = #exchange{ type = Type }, Binding} ->
- (type_to_module(Type)):add_binding(Exchange, Binding);
- {existing, _, _} ->
- ok;
- {error, _} = Err ->
- Err
- end.
-
-delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
- case binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
- case mnesia:match_object(rabbit_route, #route{binding = B},
- write) of
- [] ->
- {error, binding_not_found};
- _ ->
- case InnerFun(X, Q) of
- ok ->
- ok =
- sync_binding(B,
- X#exchange.durable andalso
- Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- rabbit_event:notify(
- binding_deleted,
- [{exchange_name, ExchangeName},
- {queue_name, QueueName}]),
- {maybe_auto_delete(X), B};
- {error, _} = E ->
- E
- end
- end
- end) of
- {error, _} = Err ->
- Err;
- {{IsDeleted, X = #exchange{ type = Type }}, B} ->
- Module = type_to_module(Type),
- case IsDeleted of
- auto_deleted -> Module:delete(X, [B]);
- not_deleted -> Module:remove_bindings(X, [B])
- end
- end.
-
-binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
- call_with_exchange_and_queue(
- ExchangeName, QueueName,
- fun (X, Q) ->
- Fun(X, Q, #binding{
- exchange_name = ExchangeName,
- queue_name = QueueName,
- key = RoutingKey,
- args = rabbit_misc:sort_field_table(Arguments)})
- end).
-
-sync_binding(Binding, Durable, Fun) ->
- ok = case Durable of
- true -> Fun(rabbit_durable_route,
- #route{binding = Binding}, write);
- false -> ok
- end,
- {Route, ReverseRoute} = route_with_reverse(Binding),
- ok = Fun(rabbit_route, Route, write),
- ok = Fun(rabbit_reverse_route, ReverseRoute, write),
- ok.
-
-list_bindings(VHostPath) ->
- [{ExchangeName, QueueName, RoutingKey, Arguments} ||
- #route{binding = #binding{
- exchange_name = ExchangeName,
- key = RoutingKey,
- queue_name = QueueName,
- args = Arguments}}
- <- mnesia:dirty_match_object(
- rabbit_route,
- #route{binding = #binding{
- exchange_name = rabbit_misc:r(VHostPath, exchange),
- _ = '_'},
- _ = '_'})].
-
-route_with_reverse(#route{binding = Binding}) ->
- route_with_reverse(Binding);
-route_with_reverse(Binding = #binding{}) ->
- Route = #route{binding = Binding},
- {Route, reverse_route(Route)}.
-
-reverse_route(#route{binding = Binding}) ->
- #reverse_route{reverse_binding = reverse_binding(Binding)};
-
-reverse_route(#reverse_route{reverse_binding = Binding}) ->
- #route{binding = reverse_binding(Binding)}.
-
-reverse_binding(#reverse_binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}) ->
- #binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args};
-
-reverse_binding(#binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}) ->
- #reverse_binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}.
-
delete(ExchangeName, IfUnused) ->
Fun = case IfUnused of
true -> fun conditional_delete/1;
@@ -568,54 +278,23 @@ delete(ExchangeName, IfUnused) ->
Error
end.
-maybe_auto_delete(Exchange = #exchange{auto_delete = false}) ->
- {not_deleted, Exchange};
-maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
+maybe_auto_delete(#exchange{auto_delete = false}) ->
+ not_deleted;
+maybe_auto_delete(#exchange{auto_delete = true} = Exchange) ->
case conditional_delete(Exchange) of
- {error, in_use} -> {not_deleted, Exchange};
- {deleted, Exchange, []} -> {auto_deleted, Exchange}
+ {error, in_use} -> not_deleted;
+ {deleted, Exchange, []} -> auto_deleted
end.
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
- %% we need to check for durable routes here too in case a bunch of
- %% routes to durable queues have been removed temporarily as a
- %% result of a node failure
- case contains(rabbit_route, Match) orelse
- contains(rabbit_durable_route, Match) of
+ case rabbit_binding:has_for_exchange(ExchangeName) of
false -> unconditional_delete(Exchange);
true -> {error, in_use}
end.
unconditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- Bindings = delete_exchange_bindings(ExchangeName),
+ Bindings = rabbit_binding:remove_for_exchange(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
ok = mnesia:delete({rabbit_exchange, ExchangeName}),
rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]),
{deleted, Exchange, Bindings}.
-
-%%----------------------------------------------------------------------------
-%% EXTENDED API
-%% These are API calls that are not used by the server internally,
-%% they are exported for embedded clients to use
-
-%% This is currently used in mod_rabbit.erl (XMPP) and expects this to
-%% return {QueueName, RoutingKey, Arguments} tuples
-list_exchange_bindings(ExchangeName) ->
- Route = #route{binding = #binding{exchange_name = ExchangeName,
- _ = '_'}},
- [{QueueName, RoutingKey, Arguments} ||
- #route{binding = #binding{queue_name = QueueName,
- key = RoutingKey,
- args = Arguments}}
- <- mnesia:dirty_match_object(rabbit_route, Route)].
-
-% Refactoring is left as an exercise for the reader
-list_queue_bindings(QueueName) ->
- Route = #route{binding = #binding{queue_name = QueueName,
- _ = '_'}},
- [{ExchangeName, RoutingKey, Arguments} ||
- #route{binding = #binding{exchange_name = ExchangeName,
- key = RoutingKey,
- args = Arguments}}
- <- mnesia:dirty_match_object(rabbit_route, Route)].
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 44607398..0a59a175 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -79,8 +79,8 @@ parse_x_match(Other) ->
%% Horrendous matching algorithm. Depends for its merge-like
%% (linear-time) behaviour on the lists:keysort
-%% (rabbit_misc:sort_field_table) that route/3 and
-%% rabbit_exchange:{add,delete}_binding/4 do.
+%% (rabbit_misc:sort_field_table) that publish/1 and
+%% rabbit_binding:{add,remove}/2 do.
%%
%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 553faaa8..cb53185f 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -39,16 +39,9 @@
%%--------------------------------------------------------------------
-start_link(StartFun, StartArgs, Protocol) ->
- Parent = self(),
- {ok, spawn_link(
- fun () ->
- %% we trap exits so that a normal termination of
- %% the channel or reader process terminates us too.
- process_flag(trap_exit, true),
- {ok, ChannelPid} = apply(StartFun, StartArgs),
- mainloop(Parent, ChannelPid, Protocol)
- end)}.
+start_link(Parent, ChannelPid, Protocol) ->
+ {ok, proc_lib:spawn_link(
+ fun () -> mainloop(Parent, ChannelPid, Protocol) end)}.
process(Pid, Frame) ->
Pid ! {frame, Frame},
@@ -62,12 +55,6 @@ shutdown(Pid) ->
read_frame(ChannelPid) ->
receive
- %% converting the exit signal into one of our own ensures that
- %% the reader sees the right pid (i.e. ours) when a channel
- %% exits. Similarly in the other direction, though it is not
- %% really relevant there since the channel is not specifically
- %% watching out for reader exit signals.
- {'EXIT', _Pid, Reason} -> exit(Reason);
{frame, Frame} -> Frame;
terminate -> rabbit_channel:shutdown(ChannelPid),
read_frame(ChannelPid);
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index ab50c28c..a9945af1 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -31,16 +31,26 @@
-module(rabbit_heartbeat).
--export([start_heartbeat/2, pause_monitor/1, resume_monitor/1]).
+-export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
+ pause_monitor/1, resume_monitor/1]).
+
+-include("rabbit.hrl").
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+-export_type([heartbeaters/0]).
+
-type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})).
--spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) ->
- heartbeaters()).
+-spec(start_heartbeat_sender/3 ::
+ (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ rabbit_types:ok(pid())).
+-spec(start_heartbeat_receiver/3 ::
+ (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ rabbit_types:ok(pid())).
+
-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok').
@@ -48,27 +58,26 @@
%%----------------------------------------------------------------------------
-start_heartbeat(_Sock, 0) ->
- none;
-start_heartbeat(Sock, TimeoutSec) ->
- Parent = self(),
+start_heartbeat_sender(_Parent, Sock, TimeoutSec) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
- Sender = heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0,
- fun () ->
- catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
- continue
- end}, Parent),
+ heartbeater(
+ {Sock, TimeoutSec * 1000 div 2, send_oct, 0,
+ fun () ->
+ catch rabbit_net:send(
+ Sock, rabbit_binary_generator:build_heartbeat_frame()),
+ continue
+ end}).
+
+start_heartbeat_receiver(Parent, Sock, TimeoutSec) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
- Receiver = heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
- fun () ->
- Parent ! timeout,
- stop
- end}, Parent),
- {Sender, Receiver}.
+ heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () ->
+ Parent ! timeout,
+ stop
+ end}).
pause_monitor(none) ->
ok;
@@ -84,21 +93,15 @@ resume_monitor({_Sender, Receiver}) ->
%%----------------------------------------------------------------------------
-heartbeater(Params, Parent) ->
- spawn_link(fun () -> heartbeater(Params, erlang:monitor(process, Parent),
- {0, 0})
- end).
+heartbeater(Params) ->
+ {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}.
heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
- MonitorRef, {StatVal, SameCount}) ->
- Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end,
+ {StatVal, SameCount}) ->
+ Recurse = fun (V) -> heartbeater(Params, V) end,
receive
- {'DOWN', MonitorRef, process, _Object, _Info} ->
- ok;
pause ->
receive
- {'DOWN', MonitorRef, process, _Object, _Info} ->
- ok;
resume ->
Recurse({0, 0});
Other ->
diff --git a/src/rabbit_hooks.erl b/src/rabbit_hooks.erl
deleted file mode 100644
index 3fc84c1e..00000000
--- a/src/rabbit_hooks.erl
+++ /dev/null
@@ -1,73 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_hooks).
-
--export([start/0]).
--export([subscribe/3, unsubscribe/2, trigger/2, notify_remote/5]).
-
--define(TableName, rabbit_hooks).
-
--ifdef(use_specs).
-
--spec(start/0 :: () -> 'ok').
--spec(subscribe/3 :: (atom(), atom(), {atom(), atom(), list()}) -> 'ok').
--spec(unsubscribe/2 :: (atom(), atom()) -> 'ok').
--spec(trigger/2 :: (atom(), list()) -> 'ok').
--spec(notify_remote/5 :: (atom(), atom(), list(), pid(), list()) -> 'ok').
-
--endif.
-
-start() ->
- ets:new(?TableName, [bag, public, named_table]),
- ok.
-
-subscribe(Hook, HandlerName, Handler) ->
- ets:insert(?TableName, {Hook, HandlerName, Handler}),
- ok.
-
-unsubscribe(Hook, HandlerName) ->
- ets:match_delete(?TableName, {Hook, HandlerName, '_'}),
- ok.
-
-trigger(Hook, Args) ->
- Hooks = ets:lookup(?TableName, Hook),
- [case catch apply(M, F, [Hook, Name, Args | A]) of
- {'EXIT', Reason} ->
- rabbit_log:warning("Failed to execute handler ~p for hook ~p: ~p",
- [Name, Hook, Reason]);
- _ -> ok
- end || {_, Name, {M, F, A}} <- Hooks],
- ok.
-
-notify_remote(Hook, HandlerName, Args, Pid, PidArgs) ->
- Pid ! {rabbitmq_hook, [Hook, HandlerName, Args | PidArgs]},
- ok.
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index d3cc5f60..21bef7ed 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -35,7 +35,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/3]).
--export([start_link/2, shutdown/1]).
+-export([start_link/2]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1]).
@@ -47,7 +47,6 @@
-spec(start_link/2 :: (pid(), non_neg_integer()) ->
rabbit_types:ok_pid_or_error()).
--spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped').
-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
@@ -77,17 +76,10 @@
start_link(ChPid, UnackedMsgCount) ->
gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []).
-shutdown(undefined) ->
- ok;
-shutdown(LimiterPid) ->
- true = unlink(LimiterPid),
- gen_server2:cast(LimiterPid, shutdown).
-
limit(undefined, 0) ->
ok;
limit(LimiterPid, PrefetchCount) ->
- unlink_on_stopped(LimiterPid,
- gen_server2:call(LimiterPid, {limit, PrefetchCount})).
+ gen_server2:call(LimiterPid, {limit, PrefetchCount}).
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
@@ -125,8 +117,7 @@ block(LimiterPid) ->
unblock(undefined) ->
ok;
unblock(LimiterPid) ->
- unlink_on_stopped(LimiterPid,
- gen_server2:call(LimiterPid, unblock, infinity)).
+ gen_server2:call(LimiterPid, unblock, infinity).
%%----------------------------------------------------------------------------
%% gen_server callbacks
@@ -168,9 +159,6 @@ handle_call(unblock, _From, State) ->
{stop, State1} -> {stop, normal, stopped, State1}
end.
-handle_cast(shutdown, State) ->
- {stop, normal, State};
-
handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
true -> Volume - Count
@@ -250,9 +238,3 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
ok
end,
State#lim{queues = NewQueues}.
-
-unlink_on_stopped(LimiterPid, stopped) ->
- ok = rabbit_misc:unlink_and_capture_exit(LimiterPid),
- stopped;
-unlink_on_stopped(_LimiterPid, Result) ->
- Result.
diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl
deleted file mode 100644
index e0457b1e..00000000
--- a/src/rabbit_load.erl
+++ /dev/null
@@ -1,78 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_load).
-
--export([local_load/0, remote_loads/0, pick/0]).
-
--define(FUDGE_FACTOR, 0.98).
--define(TIMEOUT, 100).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(load() :: {{non_neg_integer(), integer() | 'unknown'}, node()}).
--spec(local_load/0 :: () -> load()).
--spec(remote_loads/0 :: () -> [load()]).
--spec(pick/0 :: () -> node()).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-local_load() ->
- LoadAvg = case whereis(cpu_sup) of
- undefined -> unknown;
- _ -> case cpu_sup:avg1() of
- L when is_integer(L) -> L;
- {error, timeout} -> unknown
- end
- end,
- {{statistics(run_queue), LoadAvg}, node()}.
-
-remote_loads() ->
- {ResL, _BadNodes} =
- rpc:multicall(nodes(), ?MODULE, local_load, [], ?TIMEOUT),
- ResL.
-
-pick() ->
- RemoteLoads = remote_loads(),
- {{RunQ, LoadAvg}, Node} = local_load(),
- %% add bias towards current node; we rely on Erlang's term order
- %% of SomeFloat < local_unknown < unknown.
- AdjustedLoadAvg = case LoadAvg of
- unknown -> local_unknown;
- _ -> LoadAvg * ?FUDGE_FACTOR
- end,
- Loads = [{{RunQ, AdjustedLoadAvg}, Node} | RemoteLoads],
- {_, SelectedNode} = lists:min(Loads),
- SelectedNode.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 5fa3f8ed..086d260e 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -39,7 +39,6 @@
-export([die/1, frame_error/2, amqp_error/4,
protocol_error/3, protocol_error/4, protocol_error/1]).
-export([not_found/1, assert_args_equivalence/4]).
--export([get_config/1, get_config/2, set_config/2]).
-export([dirty_read/1]).
-export([table_lookup/2]).
-export([r/3, r/2, r_arg/4, rs/1]).
@@ -108,10 +107,6 @@
rabbit_framing:amqp_table(),
rabbit_types:r(any()), [binary()]) ->
'ok' | rabbit_types:connection_exit()).
--spec(get_config/1 ::
- (atom()) -> rabbit_types:ok_or_error2(any(), 'not_found')).
--spec(get_config/2 :: (atom(), A) -> A).
--spec(set_config/2 :: (atom(), any()) -> 'ok').
-spec(dirty_read/1 ::
({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')).
-spec(table_lookup/2 ::
@@ -240,21 +235,6 @@ assert_args_equivalence1(Orig, New, Name, Key) ->
[Key, rabbit_misc:rs(Name), New1, Orig1])
end.
-get_config(Key) ->
- case dirty_read({rabbit_config, Key}) of
- {ok, {rabbit_config, Key, V}} -> {ok, V};
- Other -> Other
- end.
-
-get_config(Key, DefaultValue) ->
- case get_config(Key) of
- {ok, V} -> V;
- {error, not_found} -> DefaultValue
- end.
-
-set_config(Key, Value) ->
- ok = mnesia:dirty_write({rabbit_config, Key, Value}).
-
dirty_read(ReadSpec) ->
case mnesia:dirty_read(ReadSpec) of
[Result] -> {ok, Result};
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 4a5adfae..a3214888 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -169,10 +169,6 @@ table_definitions() ->
{attributes, record_info(fields, vhost)},
{disc_copies, [node()]},
{match, #vhost{_='_'}}]},
- {rabbit_config,
- [{attributes, [key, val]}, % same mnesia's default
- {disc_copies, [node()]},
- {match, {rabbit_config, '_', '_'}}]},
{rabbit_listener,
[{record_name, listener},
{attributes, record_info(fields, listener)},
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 27017ddc..bbecbfe2 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,7 +34,7 @@
-behaviour(gen_server2).
-export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
- sync/3, client_init/2, client_terminate/1,
+ sync/3, client_init/2, client_terminate/2,
client_delete_and_terminate/3, successfully_recovered_state/1]).
-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
@@ -98,8 +98,7 @@
}).
-record(file_summary,
- {file, valid_total_size, contiguous_top, left, right, file_size,
- locked, readers}).
+ {file, valid_total_size, left, right, file_size, locked, readers}).
%%----------------------------------------------------------------------------
@@ -136,7 +135,7 @@
'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(client_init/2 :: (server(), binary()) -> client_msstate()).
--spec(client_terminate/1 :: (client_msstate()) -> 'ok').
+-spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok').
-spec(client_delete_and_terminate/3 ::
(client_msstate(), server(), binary()) -> 'ok').
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
@@ -159,8 +158,7 @@
%% {Guid, RefCount, File, Offset, TotalSize}
%% By default, it's in ets, but it's also pluggable.
%% FileSummary: this is an ets table which maps File to #file_summary{}:
-%% {File, ValidTotalSize, ContiguousTop, Left, Right,
-%% FileSize, Locked, Readers}
+%% {File, ValidTotalSize, Left, Right, FileSize, Locked, Readers}
%%
%% The basic idea is that messages are appended to the current file up
%% until that file becomes too big (> file_size_limit). At that point,
@@ -176,9 +174,7 @@
%%
%% As messages are removed from files, holes appear in these
%% files. The field ValidTotalSize contains the total amount of useful
-%% data left in the file, whilst ContiguousTop contains the amount of
-%% valid data right at the start of each file. These are needed for
-%% garbage collection.
+%% data left in the file. This is needed for garbage collection.
%%
%% When we discover that a file is now empty, we delete it. When we
%% discover that it can be combined with the useful data in either its
@@ -224,9 +220,7 @@
%% above B (i.e. truncate to the limit of the good contiguous region
%% at the start of the file), then write C and D on top and then write
%% E, F and G from the right file on top. Thus contiguous blocks of
-%% good data at the bottom of files are not rewritten (yes, this is
-%% the data the size of which is tracked by the ContiguousTop
-%% variable. Judicious use of a mirror is required).
+%% good data at the bottom of files are not rewritten.
%%
%% +-------+ +-------+ +-------+
%% | X | | G | | G |
@@ -373,13 +367,13 @@ client_init(Server, Ref) ->
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }.
-client_terminate(CState) ->
+client_terminate(CState, Server) ->
close_all_handles(CState),
- ok.
+ ok = gen_server2:call(Server, client_terminate, infinity).
client_delete_and_terminate(CState, Server, Ref) ->
- ok = client_terminate(CState),
- ok = gen_server2:call(Server, {delete_client, Ref}, infinity).
+ close_all_handles(CState),
+ ok = gen_server2:cast(Server, {client_delete, Ref}).
successfully_recovered_state(Server) ->
gen_server2:call(Server, successfully_recovered_state, infinity).
@@ -622,10 +616,8 @@ handle_call({new_client_state, CRef}, _From,
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
-handle_call({delete_client, CRef}, _From,
- State = #msstate { client_refs = ClientRefs }) ->
- reply(ok,
- State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
+handle_call(client_terminate, _From, State) ->
+ reply(ok, State).
handle_cast({write, Guid},
State = #msstate { current_file_handle = CurHdl,
@@ -646,20 +638,14 @@ handle_cast({write, Guid},
offset = CurOffset, total_size = TotalSize },
State),
[#file_summary { valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
right = undefined,
locked = false,
file_size = FileSize }] =
ets:lookup(FileSummaryEts, CurFile),
ValidTotalSize1 = ValidTotalSize + TotalSize,
- ContiguousTop1 = case CurOffset =:= ContiguousTop of
- true -> ValidTotalSize1;
- false -> ContiguousTop
- end,
true = ets:update_element(
FileSummaryEts, CurFile,
[{#file_summary.valid_total_size, ValidTotalSize1},
- {#file_summary.contiguous_top, ContiguousTop1},
{#file_summary.file_size, FileSize + TotalSize}]),
NextOffset = CurOffset + TotalSize,
noreply(
@@ -740,7 +726,12 @@ handle_cast({gc_done, Reclaimed, Src, Dst},
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
- noreply(State).
+ noreply(State);
+
+handle_cast({client_delete, CRef},
+ State = #msstate { client_refs = ClientRefs }) ->
+ noreply(
+ State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
handle_info(timeout, State) ->
noreply(internal_sync(State));
@@ -915,8 +906,7 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts }) ->
#msg_location { ref_count = RefCount, file = File,
- offset = Offset, total_size = TotalSize } =
- index_lookup(Guid, State),
+ total_size = TotalSize } = index_lookup(Guid, State),
case RefCount of
1 ->
%% don't remove from CUR_FILE_CACHE_ETS_NAME here because
@@ -924,7 +914,6 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
%% msg.
ok = remove_cache_entry(DedupCacheEts, Guid),
[#file_summary { valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
locked = Locked }] =
ets:lookup(FileSummaryEts, File),
case Locked of
@@ -932,12 +921,11 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
add_to_pending_gc_completion({remove, Guid}, State);
false ->
ok = index_delete(Guid, State),
- ContiguousTop1 = lists:min([ContiguousTop, Offset]),
ValidTotalSize1 = ValidTotalSize - TotalSize,
- true = ets:update_element(
- FileSummaryEts, File,
- [{#file_summary.valid_total_size, ValidTotalSize1},
- {#file_summary.contiguous_top, ContiguousTop1}]),
+ true =
+ ets:update_element(
+ FileSummaryEts, File,
+ [{#file_summary.valid_total_size, ValidTotalSize1}]),
State1 = delete_file_if_empty(File, State),
State1 #msstate { sum_valid_data = SumValid - TotalSize }
end;
@@ -1284,16 +1272,17 @@ scan_file_for_valid_messages(Dir, FileName) ->
%% Takes the list in *ascending* order (i.e. eldest message
%% first). This is the opposite of what scan_file_for_valid_messages
%% produces. The list of msgs that is produced is youngest first.
-find_contiguous_block_prefix(L) -> find_contiguous_block_prefix(L, 0, []).
+drop_contiguous_block_prefix(L) -> drop_contiguous_block_prefix(L, 0).
-find_contiguous_block_prefix([], ExpectedOffset, Guids) ->
- {ExpectedOffset, Guids};
-find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail],
- ExpectedOffset, Guids) ->
+drop_contiguous_block_prefix([], ExpectedOffset) ->
+ {ExpectedOffset, []};
+drop_contiguous_block_prefix([#msg_location { offset = ExpectedOffset,
+ total_size = TotalSize } | Tail],
+ ExpectedOffset) ->
ExpectedOffset1 = ExpectedOffset + TotalSize,
- find_contiguous_block_prefix(Tail, ExpectedOffset1, [Guid | Guids]);
-find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) ->
- {ExpectedOffset, Guids}.
+ drop_contiguous_block_prefix(Tail, ExpectedOffset1);
+drop_contiguous_block_prefix(MsgsAfterGap, ExpectedOffset) ->
+ {ExpectedOffset, MsgsAfterGap}.
build_index(true, _StartupFunState,
State = #msstate { file_summary_ets = FileSummaryEts }) ->
@@ -1369,9 +1358,6 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
{VMAcc, VTSAcc}
end
end, {[], 0}, Messages),
- %% foldl reverses lists, find_contiguous_block_prefix needs
- %% msgs eldest first, so, ValidMessages is the right way round
- {ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages),
{Right, FileSize1} =
case Files of
%% if it's the last file, we'll truncate to remove any
@@ -1388,7 +1374,6 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
ok = gatherer:in(Gatherer, #file_summary {
file = File,
valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
left = Left,
right = Right,
file_size = FileSize1,
@@ -1416,7 +1401,6 @@ maybe_roll_to_new_file(
true = ets:insert_new(FileSummaryEts, #file_summary {
file = NextFile,
valid_total_size = 0,
- contiguous_top = 0,
left = CurFile,
right = undefined,
file_size = 0,
@@ -1543,7 +1527,6 @@ gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
true = ets:update_element(
FileSummaryEts, DstFile,
[{#file_summary.valid_total_size, TotalValidData},
- {#file_summary.contiguous_top, TotalValidData},
{#file_summary.file_size, TotalValidData}]),
SrcFileSize + DstFileSize - TotalValidData;
false -> concurrent_readers
@@ -1554,7 +1537,6 @@ combine_files(#file_summary { file = Source,
left = Destination },
#file_summary { file = Destination,
valid_total_size = DestinationValid,
- contiguous_top = DestinationContiguousTop,
right = Source },
State = {_FileSummaryEts, Dir, _Index, _IndexState}) ->
SourceName = filenum_to_name(Source),
@@ -1570,41 +1552,32 @@ combine_files(#file_summary { file = Source,
%% the DestinationContiguousTop to a tmp file then truncate,
%% copy back in, and then copy over from Source
%% otherwise we just truncate straight away and copy over from Source
- case DestinationContiguousTop =:= DestinationValid of
- true ->
- ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize);
- false ->
- {DestinationWorkList, DestinationValid} =
- find_unremoved_messages_in_file(Destination, State),
- Worklist =
- lists:dropwhile(
- fun (#msg_location { offset = Offset })
- when Offset =/= DestinationContiguousTop ->
- %% it cannot be that Offset =:=
- %% DestinationContiguousTop because if it
- %% was then DestinationContiguousTop would
- %% have been extended by TotalSize
- Offset < DestinationContiguousTop
- end, DestinationWorkList),
- Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
- {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
- ok = copy_messages(
- Worklist, DestinationContiguousTop, DestinationValid,
- DestinationHdl, TmpHdl, Destination, State),
- TmpSize = DestinationValid - DestinationContiguousTop,
- %% so now Tmp contains everything we need to salvage from
- %% Destination, and index_state has been updated to
- %% reflect the compaction of Destination so truncate
- %% Destination and copy from Tmp back to the end
- {ok, 0} = file_handle_cache:position(TmpHdl, 0),
- ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize),
- {ok, TmpSize} =
- file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
- %% position in DestinationHdl should now be DestinationValid
- ok = file_handle_cache:sync(DestinationHdl),
- ok = file_handle_cache:delete(TmpHdl)
+ {DestinationWorkList, DestinationValid} =
+ find_unremoved_messages_in_file(Destination, State),
+ {DestinationContiguousTop, DestinationWorkListTail} =
+ drop_contiguous_block_prefix(DestinationWorkList),
+ case DestinationWorkListTail of
+ [] -> ok = truncate_and_extend_file(
+ DestinationHdl, DestinationContiguousTop, ExpectedSize);
+ _ -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE),
+ ok = copy_messages(
+ DestinationWorkListTail, DestinationContiguousTop,
+ DestinationValid, DestinationHdl, TmpHdl, Destination,
+ State),
+ TmpSize = DestinationValid - DestinationContiguousTop,
+ %% so now Tmp contains everything we need to salvage
+ %% from Destination, and index_state has been updated to
+ %% reflect the compaction of Destination so truncate
+ %% Destination and copy from Tmp back to the end
+ {ok, 0} = file_handle_cache:position(TmpHdl, 0),
+ ok = truncate_and_extend_file(
+ DestinationHdl, DestinationContiguousTop, ExpectedSize),
+ {ok, TmpSize} =
+ file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
+ %% position in DestinationHdl should now be DestinationValid
+ ok = file_handle_cache:sync(DestinationHdl),
+ ok = file_handle_cache:delete(TmpHdl)
end,
{SourceWorkList, SourceValid} =
find_unremoved_messages_in_file(Source, State),
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 3facef17..5cfd6a5c 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -93,7 +93,14 @@ usage() ->
action(start_all, [NodeCount], RpcTimeout) ->
io:format("Starting all nodes...~n", []),
application:load(rabbit),
- NodeName = rabbit_misc:nodeparts(getenv("RABBITMQ_NODENAME")),
+ {_NodeNamePrefix, NodeHost} = NodeName = rabbit_misc:nodeparts(
+ getenv("RABBITMQ_NODENAME")),
+ case net_adm:names(NodeHost) of
+ {error, EpmdReason} ->
+ throw({cannot_connect_to_epmd, NodeHost, EpmdReason});
+ {ok, _} ->
+ ok
+ end,
{NodePids, Running} =
case list_to_integer(NodeCount) of
1 -> {NodePid, Started} = start_node(rabbit_misc:makenode(NodeName),
@@ -303,8 +310,8 @@ kill_wait(Pid, TimeLeft, Forceful) ->
is_dead(Pid) ->
PidS = integer_to_list(Pid),
with_os([{unix, fun () ->
- Res = os:cmd("ps --no-headers --pid " ++ PidS),
- Res == ""
+ system("kill -0 " ++ PidS
+ ++ " >/dev/null 2>&1") /= 0
end},
{win32, fun () ->
Res = os:cmd("tasklist /nh /fi \"pid eq " ++
@@ -315,6 +322,16 @@ is_dead(Pid) ->
end
end}]).
+% Like system(3)
+system(Cmd) ->
+ ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'",
+ Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]),
+ receive {Port, {exit_status, Status}} -> Status end.
+
+% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'"
+escape_quotes(Cmd) ->
+ lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)).
+
call_all_nodes(Func) ->
case read_pids_file() of
[] -> throw(no_nodes_running);
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 6baa4b88..2286896b 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -46,7 +46,7 @@
'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' |
'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend').
-type(error() :: rabbit_types:error(any())).
--type(socket() :: rabbit_networking:ip_port() | rabbit_types:ssl_socket()).
+-type(socket() :: port() | #ssl_socket{}).
-spec(async_recv/3 ::
(socket(), integer(), timeout()) -> rabbit_types:ok(any())).
@@ -72,72 +72,58 @@
%%---------------------------------------------------------------------------
+-define(IS_SSL(Sock), is_record(Sock, ssl_socket)).
-async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) ->
+async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) ->
Pid = self(),
Ref = make_ref(),
spawn(fun () -> Pid ! {inet_async, Sock, Ref,
- ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)}
- end),
+ ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)}
+ end),
{ok, Ref};
-
async_recv(Sock, Length, infinity) when is_port(Sock) ->
prim_inet:async_recv(Sock, Length, -1);
-
async_recv(Sock, Length, Timeout) when is_port(Sock) ->
prim_inet:async_recv(Sock, Length, Timeout).
-close(Sock) when is_record(Sock, ssl_socket) ->
+close(Sock) when ?IS_SSL(Sock) ->
ssl:close(Sock#ssl_socket.ssl);
-
close(Sock) when is_port(Sock) ->
gen_tcp:close(Sock).
-
-controlling_process(Sock, Pid) when is_record(Sock, ssl_socket) ->
+controlling_process(Sock, Pid) when ?IS_SSL(Sock) ->
ssl:controlling_process(Sock#ssl_socket.ssl, Pid);
-
controlling_process(Sock, Pid) when is_port(Sock) ->
gen_tcp:controlling_process(Sock, Pid).
-
-getstat(Sock, Stats) when is_record(Sock, ssl_socket) ->
+getstat(Sock, Stats) when ?IS_SSL(Sock) ->
inet:getstat(Sock#ssl_socket.tcp, Stats);
-
getstat(Sock, Stats) when is_port(Sock) ->
inet:getstat(Sock, Stats).
-
-peername(Sock) when is_record(Sock, ssl_socket) ->
+peername(Sock) when ?IS_SSL(Sock) ->
ssl:peername(Sock#ssl_socket.ssl);
-
peername(Sock) when is_port(Sock) ->
inet:peername(Sock).
-
-port_command(Sock, Data) when is_record(Sock, ssl_socket) ->
+port_command(Sock, Data) when ?IS_SSL(Sock) ->
case ssl:send(Sock#ssl_socket.ssl, Data) of
- ok ->
- self() ! {inet_reply, Sock, ok},
- true;
- {error, Reason} ->
- erlang:error(Reason)
+ ok -> self() ! {inet_reply, Sock, ok},
+ true;
+ {error, Reason} -> erlang:error(Reason)
end;
-
port_command(Sock, Data) when is_port(Sock) ->
erlang:port_command(Sock, Data).
-send(Sock, Data) when is_record(Sock, ssl_socket) ->
+send(Sock, Data) when ?IS_SSL(Sock) ->
ssl:send(Sock#ssl_socket.ssl, Data);
-
send(Sock, Data) when is_port(Sock) ->
gen_tcp:send(Sock, Data).
-sockname(Sock) when is_record(Sock, ssl_socket) ->
+sockname(Sock) when ?IS_SSL(Sock) ->
ssl:sockname(Sock#ssl_socket.ssl);
-
sockname(Sock) when is_port(Sock) ->
inet:sockname(Sock).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index f968b0d8..08272afe 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -107,7 +107,15 @@ boot_ssl() ->
ok;
{ok, SslListeners} ->
ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
- {ok, SslOpts} = application:get_env(ssl_options),
+ {ok, SslOptsConfig} = application:get_env(ssl_options),
+ SslOpts =
+ case proplists:get_value(verify, SslOptsConfig, verify_none) of
+ verify_none -> SslOptsConfig;
+ verify_peer -> [{verify_fun, fun([]) -> true;
+ ([_|_]) -> false
+ end}
+ | SslOptsConfig]
+ end,
[start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
ok
end.
@@ -118,7 +126,7 @@ start() ->
{rabbit_tcp_client_sup,
{tcp_client_sup, start_link,
[{local, rabbit_tcp_client_sup},
- {rabbit_reader,start_link,[]}]},
+ {rabbit_connection_sup,start_link,[]}]},
transient, infinity, supervisor, [tcp_client_sup]}),
ok.
@@ -204,10 +212,10 @@ on_node_down(Node) ->
ok = mnesia:dirty_delete(rabbit_listener, Node).
start_client(Sock, SockTransform) ->
- {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
- ok = rabbit_net:controlling_process(Sock, Child),
- Child ! {go, Sock, SockTransform},
- Child.
+ {ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []),
+ ok = rabbit_net:controlling_process(Sock, Reader),
+ Reader ! {go, Sock, SockTransform},
+ Reader.
start_client(Sock) ->
start_client(Sock, fun (S) -> {ok, S} end).
@@ -230,8 +238,9 @@ start_ssl_client(SslOpts, Sock) ->
end).
connections() ->
- [Pid || {_, Pid, _, _} <- supervisor:which_children(
- rabbit_tcp_client_sup)].
+ [rabbit_connection_sup:reader(ConnSup) ||
+ {_, ConnSup, supervisor, _}
+ <- supervisor:which_children(rabbit_tcp_client_sup)].
connection_info_keys() -> rabbit_reader:info_keys().
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index c9f75be0..b23776cd 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -51,7 +51,7 @@
%%----------------------------------------------------------------------------
start() ->
- io:format("Activating RabbitMQ plugins ..."),
+ io:format("Activating RabbitMQ plugins ...~n"),
%% Ensure Rabbit is loaded so we can access it's environment
application:load(rabbit),
@@ -77,7 +77,7 @@ start() ->
AppList
end,
AppVersions = [determine_version(App) || App <- AllApps],
- {rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions),
+ RabbitVersion = proplists:get_value(rabbit, AppVersions),
%% Build the overall release descriptor
RDesc = {release,
@@ -130,8 +130,9 @@ start() ->
ok -> ok;
error -> error("failed to compile boot script file ~s", [ScriptFile])
end,
- io:format("~n~w plugins activated:~n", [length(PluginApps)]),
- [io:format("* ~w~n", [App]) || App <- PluginApps],
+ io:format("~w plugins activated:~n", [length(PluginApps)]),
+ [io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)])
+ || App <- PluginApps],
io:nl(),
halt(),
ok.
@@ -150,29 +151,33 @@ determine_version(App) ->
{ok, Vsn} = application:get_key(App, vsn),
{App, Vsn}.
-assert_dir(Dir) ->
- case filelib:is_dir(Dir) of
- true -> ok;
- false -> ok = filelib:ensure_dir(Dir),
- ok = file:make_dir(Dir)
- end.
-
-delete_dir(Dir) ->
- case filelib:is_dir(Dir) of
+delete_recursively(Fn) ->
+ case filelib:is_dir(Fn) and not(is_symlink(Fn)) of
true ->
- case file:list_dir(Dir) of
+ case file:list_dir(Fn) of
{ok, Files} ->
- [case Dir ++ "/" ++ F of
- Fn ->
- case filelib:is_dir(Fn) and not(is_symlink(Fn)) of
- true -> delete_dir(Fn);
- false -> file:delete(Fn)
- end
- end || F <- Files]
- end,
- ok = file:del_dir(Dir);
+ case lists:foldl(fun ( Fn1, ok) -> delete_recursively(
+ Fn ++ "/" ++ Fn1);
+ (_Fn1, Err) -> Err
+ end, ok, Files) of
+ ok -> case file:del_dir(Fn) of
+ ok -> ok;
+ {error, E} -> {error,
+ {cannot_delete, Fn, E}}
+ end;
+ Err -> Err
+ end;
+ {error, E} ->
+ {error, {cannot_list_files, Fn, E}}
+ end;
false ->
- ok
+ case filelib:is_file(Fn) of
+ true -> case file:delete(Fn) of
+ ok -> ok;
+ {error, E} -> {error, {cannot_delete, Fn, E}}
+ end;
+ false -> ok
+ end
end.
is_symlink(Name) ->
@@ -181,13 +186,18 @@ is_symlink(Name) ->
_ -> false
end.
-unpack_ez_plugins(PluginSrcDir, PluginDestDir) ->
+unpack_ez_plugins(SrcDir, DestDir) ->
%% Eliminate the contents of the destination directory
- delete_dir(PluginDestDir),
-
- assert_dir(PluginDestDir),
- [unpack_ez_plugin(PluginName, PluginDestDir) ||
- PluginName <- filelib:wildcard(PluginSrcDir ++ "/*.ez")].
+ case delete_recursively(DestDir) of
+ ok -> ok;
+ {error, E} -> error("Could not delete dir ~s (~p)", [DestDir, E])
+ end,
+ case filelib:ensure_dir(DestDir ++ "/") of
+ ok -> ok;
+ {error, E2} -> error("Could not create dir ~s (~p)", [DestDir, E2])
+ end,
+ [unpack_ez_plugin(PluginName, DestDir) ||
+ PluginName <- filelib:wildcard(SrcDir ++ "/*.ez")].
unpack_ez_plugin(PluginFn, PluginDestDir) ->
zip:unzip(PluginFn, [{cwd, PluginDestDir}]),
@@ -246,8 +256,8 @@ post_process_script(ScriptFile) ->
{error, {failed_to_load_script, Reason}}
end.
-process_entry(Entry = {apply,{application,start_boot,[stdlib,permanent]}}) ->
- [Entry, {apply,{rabbit,prepare,[]}}];
+process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) ->
+ [{apply,{rabbit,prepare,[]}}, Entry];
process_entry(Entry) ->
[Entry].
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index b056d60b..0a49b94d 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -33,7 +33,7 @@
-behaviour(gen_server).
--export([start_link/0, register/2, delete_all/1, shutdown/1]).
+-export([start_link/0, register/2, delete_all/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -49,7 +49,6 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok').
-spec(delete_all/1 :: (pid()) -> 'ok').
--spec(shutdown/1 :: (pid()) -> 'ok').
-endif.
@@ -64,9 +63,6 @@ register(CollectorPid, Q) ->
delete_all(CollectorPid) ->
gen_server:call(CollectorPid, delete_all, infinity).
-shutdown(CollectorPid) ->
- gen_server:cast(CollectorPid, shutdown).
-
%%----------------------------------------------------------------------------
init([]) ->
@@ -90,8 +86,8 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) ->
|| {MonitorRef, Q} <- dict:to_list(Queues)],
{reply, ok, State}.
-handle_cast(shutdown, State) ->
- {stop, normal, State}.
+handle_cast(Msg, State) ->
+ {stop, {unhandled_cast, Msg}, State}.
handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
State = #state{queues = Queues}) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index a133bf45..a21961b5 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -33,11 +33,11 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]).
+-export([start_link/3, info_keys/0, info/1, info/2, shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/1, mainloop/2]).
+-export([init/4, mainloop/2]).
-export([conserve_memory/2, server_properties/0]).
@@ -46,7 +46,6 @@
-export([emit_stats/1]).
-import(gen_tcp).
--import(fprof).
-import(inet).
-import(prim_inet).
@@ -60,7 +59,8 @@
%---------------------------------------------------------------------------
-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
- connection_state, queue_collector, heartbeater, stats_timer}).
+ connection_state, queue_collector, heartbeater, stats_timer,
+ channel_sup_sup_pid, start_heartbeat_fun}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
@@ -160,6 +160,12 @@
-ifdef(use_specs).
+-type(start_heartbeat_fun() ::
+ fun ((rabbit_networking:socket(), non_neg_integer()) ->
+ rabbit_heartbeat:heartbeaters())).
+
+-spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) ->
+ rabbit_types:ok(pid())).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
@@ -168,21 +174,33 @@
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
+%% These specs only exists to add no_return() to keep dialyzer happy
+-spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()).
+-spec(start_connection/7 ::
+ (pid(), pid(), pid(), start_heartbeat_fun(), any(),
+ rabbit_networking:socket(),
+ fun ((rabbit_networking:socket()) ->
+ rabbit_types:ok_or_error2(
+ rabbit_networking:socket(), any()))) -> no_return()).
+
-endif.
%%--------------------------------------------------------------------------
-start_link() ->
- {ok, proc_lib:spawn_link(?MODULE, init, [self()])}.
+start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) ->
+ {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid,
+ Collector, StartHeartbeatFun])}.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent) ->
+init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
- start_connection(Parent, Deb, Sock, SockTransform)
+ start_connection(
+ Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock,
+ SockTransform)
end.
system_continue(Parent, Deb, State) ->
@@ -208,33 +226,6 @@ info(Pid, Items) ->
emit_stats(Pid) ->
gen_server:cast(Pid, emit_stats).
-setup_profiling() ->
- Value = rabbit_misc:get_config(profiling_enabled, false),
- case Value of
- once ->
- rabbit_log:info("Enabling profiling for this connection, "
- "and disabling for subsequent.~n"),
- rabbit_misc:set_config(profiling_enabled, false),
- fprof:trace(start);
- true ->
- rabbit_log:info("Enabling profiling for this connection.~n"),
- fprof:trace(start);
- false ->
- ok
- end,
- Value.
-
-teardown_profiling(Value) ->
- case Value of
- false ->
- ok;
- _ ->
- rabbit_log:info("Completing profiling for this connection.~n"),
- fprof:trace(stop),
- fprof:profile(),
- fprof:analyse([{dest, []}, {cols, 100}])
- end.
-
conserve_memory(Pid, Conserve) ->
Pid ! {conserve_memory, Conserve},
ok.
@@ -261,7 +252,8 @@ socket_op(Sock, Fun) ->
exit(normal)
end.
-start_connection(Parent, Deb, Sock, SockTransform) ->
+start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
+ Sock, SockTransform) ->
process_flag(trap_exit, true),
{PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
PeerAddressS = inet_parse:ntoa(PeerAddress),
@@ -270,28 +262,29 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
ClientSock = socket_op(Sock, SockTransform),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
- ProfilingValue = setup_profiling(),
- {ok, Collector} = rabbit_queue_collector:start_link(),
try
mainloop(Deb, switch_callback(
- #v1{parent = Parent,
- sock = ClientSock,
- connection = #connection{
- user = none,
- timeout_sec = ?HANDSHAKE_TIMEOUT,
- frame_max = ?FRAME_MIN_SIZE,
- vhost = none,
- client_properties = none,
- protocol = none},
- callback = uninitialized_callback,
- recv_length = 0,
- recv_ref = none,
- connection_state = pre_init,
- queue_collector = Collector,
- heartbeater = none,
- stats_timer =
- rabbit_event:init_stats_timer()},
- handshake, 8))
+ #v1{parent = Parent,
+ sock = ClientSock,
+ connection = #connection{
+ protocol = none,
+ user = none,
+ timeout_sec = ?HANDSHAKE_TIMEOUT,
+ frame_max = ?FRAME_MIN_SIZE,
+ vhost = none,
+ client_properties = none},
+ callback = uninitialized_callback,
+ recv_length = 0,
+ recv_ref = none,
+ connection_state = pre_init,
+ queue_collector = Collector,
+ heartbeater = none,
+ stats_timer =
+ rabbit_event:init_stats_timer(),
+ channel_sup_sup_pid = ChannelSupSupPid,
+ start_heartbeat_fun = StartHeartbeatFun
+ },
+ handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
fun rabbit_log:warning/2;
@@ -308,9 +301,6 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%% output to be sent, which results in unnecessary delays.
%%
%% gen_tcp:close(ClientSock),
- teardown_profiling(ProfilingValue),
- rabbit_misc:unlink_and_capture_exit(Collector),
- rabbit_queue_collector:shutdown(Collector),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
@@ -347,10 +337,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
exit(Reason);
{channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
throw(E);
- {channel_exit, Channel, Reason} ->
- mainloop(Deb, handle_channel_exit(Channel, Reason, State));
- {'EXIT', Pid, Reason} ->
- mainloop(Deb, handle_dependent_exit(Pid, Reason, State));
+ {channel_exit, ChannelOrFrPid, Reason} ->
+ mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State));
+ {'EXIT', ChSupPid, Reason} ->
+ mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State));
terminate_connection ->
State;
handshake_timeout ->
@@ -443,33 +433,45 @@ close_channel(Channel, State) ->
put({channel, Channel}, closing),
State.
-handle_channel_exit(ChPid, Reason, State) when is_pid(ChPid) ->
- {channel, Channel} = get({chpid, ChPid}),
+handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) ->
+ {channel, Channel} = get({ch_fr_pid, ChFrPid}),
handle_exception(State, Channel, Reason);
handle_channel_exit(Channel, Reason, State) ->
handle_exception(State, Channel, Reason).
-handle_dependent_exit(Pid, normal, State) ->
- erase({chpid, Pid}),
- maybe_close(State);
-handle_dependent_exit(Pid, Reason, State) ->
- case channel_cleanup(Pid) of
- undefined -> exit({abnormal_dependent_exit, Pid, Reason});
- Channel -> maybe_close(handle_exception(State, Channel, Reason))
+handle_dependent_exit(ChSupPid, Reason, State) ->
+ case termination_kind(Reason) of
+ controlled ->
+ case erase({ch_sup_pid, ChSupPid}) of
+ undefined -> ok;
+ {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr)
+ end,
+ maybe_close(State);
+ uncontrolled ->
+ case channel_cleanup(ChSupPid) of
+ undefined ->
+ exit({abnormal_dependent_exit, ChSupPid, Reason});
+ Channel ->
+ maybe_close(handle_exception(State, Channel, Reason))
+ end
end.
-channel_cleanup(Pid) ->
- case get({chpid, Pid}) of
- undefined -> undefined;
- {channel, Channel} -> erase({channel, Channel}),
- erase({chpid, Pid}),
- Channel
+channel_cleanup(ChSupPid) ->
+ case get({ch_sup_pid, ChSupPid}) of
+ undefined -> undefined;
+ {{channel, Channel}, ChFr} -> erase({channel, Channel}),
+ erase(ChFr),
+ erase({ch_sup_pid, ChSupPid}),
+ Channel
end.
-all_channels() -> [Pid || {{chpid, Pid},_} <- get()].
+all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid},
+ {_Channel, {ch_fr_pid, ChFrPid}}} <- get()].
terminate_channels() ->
- NChannels = length([exit(Pid, normal) || Pid <- all_channels()]),
+ NChannels =
+ length([rabbit_framing_channel:shutdown(ChFrPid)
+ || ChFrPid <- all_channels()]),
if NChannels > 0 ->
Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
@@ -487,14 +489,15 @@ wait_for_channel_termination(0, TimerRef) ->
wait_for_channel_termination(N, TimerRef) ->
receive
- {'EXIT', Pid, Reason} ->
- case channel_cleanup(Pid) of
+ {'EXIT', ChSupPid, Reason} ->
+ case channel_cleanup(ChSupPid) of
undefined ->
- exit({abnormal_dependent_exit, Pid, Reason});
+ exit({abnormal_dependent_exit, ChSupPid, Reason});
Channel ->
- case Reason of
- normal -> ok;
- _ ->
+ case termination_kind(Reason) of
+ controlled ->
+ ok;
+ uncontrolled ->
rabbit_log:error(
"connection ~p, channel ~p - "
"error while terminating:~n~p~n",
@@ -519,6 +522,11 @@ maybe_close(State = #v1{connection_state = closing,
maybe_close(State) ->
State.
+termination_kind(normal) -> controlled;
+termination_kind(shutdown) -> controlled;
+termination_kind({shutdown, _Term}) -> controlled;
+termination_kind(_) -> uncontrolled.
+
handle_frame(Type, 0, Payload,
State = #v1{connection_state = CS,
connection = #connection{protocol = Protocol}})
@@ -548,8 +556,8 @@ handle_frame(Type, Channel, Payload,
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
- {chpid, ChPid} ->
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
+ {ch_fr_pid, ChFrPid} ->
+ ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame),
case AnalyzedFrame of
{method, 'channel.close', _} ->
erase({channel, Channel}),
@@ -732,7 +740,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
- sock = Sock}) ->
+ sock = Sock,
+ start_heartbeat_fun = SHF}) ->
if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) ->
rabbit_misc:protocol_error(
not_allowed, "frame_max=~w < ~w min size",
@@ -742,8 +751,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ?FRAME_MAX]);
true ->
- Heartbeater = rabbit_heartbeat:start_heartbeat(
- Sock, ClientHeartbeat),
+ Heartbeater = SHF(Sock, ClientHeartbeat),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
@@ -765,9 +773,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
State#v1{connection_state = running,
connection = NewConnection}),
- rabbit_event:notify(
- connection_created,
- [{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]),
+ rabbit_event:notify(connection_created,
+ infos(?CREATION_EVENT_KEYS, State1)),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
@@ -849,21 +856,21 @@ i(Item, #v1{}) ->
%%--------------------------------------------------------------------------
-send_to_new_channel(Channel, AnalyzedFrame,
- State = #v1{queue_collector = Collector}) ->
- #v1{sock = Sock, connection = #connection{
- frame_max = FrameMax,
- user = #user{username = Username},
- vhost = VHost,
- protocol = Protocol}} = State,
- {ok, WriterPid} = rabbit_writer:start(Sock, Channel, FrameMax, Protocol),
- {ok, ChPid} = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/6,
- [Channel, self(), WriterPid, Username, VHost, Collector],
- Protocol),
- put({channel, Channel}, {chpid, ChPid}),
- put({chpid, ChPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
+send_to_new_channel(Channel, AnalyzedFrame, State) ->
+ #v1{sock = Sock, queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = #user{username = Username},
+ vhost = VHost}} = State,
+ {ok, ChSupPid, ChFrPid} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {Protocol, Sock, Channel, FrameMax,
+ self(), Username, VHost, Collector}),
+ put({channel, Channel}, {ch_fr_pid, ChFrPid}),
+ put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}),
+ put({ch_fr_pid, ChFrPid}, {channel, Channel}),
+ ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame).
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",
@@ -931,5 +938,4 @@ amqp_exception_explanation(Text, Expl) ->
end.
internal_emit_stats(State) ->
- rabbit_event:notify(connection_stats,
- [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
+ rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index ec049a1a..bfccb0da 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -33,9 +33,7 @@
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
--export([deliver/2,
- match_bindings/2,
- match_routing_key/2]).
+-export([deliver/2, match_bindings/2, match_routing_key/2]).
%%----------------------------------------------------------------------------
@@ -45,9 +43,15 @@
-type(routing_key() :: binary()).
-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
+-type(qpids() :: [pid()]).
-spec(deliver/2 ::
- ([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}).
+ (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}).
+-spec(match_bindings/2 :: (rabbit_exchange:name(),
+ fun ((rabbit_types:binding()) -> boolean())) ->
+ qpids()).
+-spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') ->
+ qpids()).
-endif.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 082e7877..a72656b7 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -35,9 +35,6 @@
-export([all_tests/0, test_parsing/0]).
-%% Exported so the hook mechanism can call back
--export([handle_hook/3, bad_handle_hook/3, extra_arg_hook/5]).
-
-import(lists).
-include("rabbit.hrl").
@@ -975,6 +972,8 @@ test_user_management() ->
{error, {user_already_exists, _}} =
control_action(add_user, ["foo", "bar"]),
ok = control_action(change_password, ["foo", "baz"]),
+ ok = control_action(set_admin, ["foo"]),
+ ok = control_action(clear_admin, ["foo"]),
ok = control_action(list_users, []),
%% vhost creation
@@ -1022,7 +1021,8 @@ test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
{ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
- <<"user">>, <<"/">>, self()),
+ <<"user">>, <<"/">>, self(),
+ fun (_) -> {ok, self()} end),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
[rabbit_amqqueue:declare(
@@ -1039,7 +1039,15 @@ test_server_status() ->
ok = info_action(list_exchanges, rabbit_exchange:info_keys(), true),
%% list bindings
- ok = control_action(list_bindings, []),
+ ok = info_action(list_bindings, rabbit_binding:info_keys(), true),
+ %% misc binding listing APIs
+ [_|_] = rabbit_binding:list_for_exchange(
+ rabbit_misc:r(<<"/">>, exchange, <<"">>)),
+ [_] = rabbit_binding:list_for_queue(
+ rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
+ [_] = rabbit_binding:list_for_exchange_and_queue(
+ rabbit_misc:r(<<"/">>, exchange, <<"">>),
+ rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
%% list connections
[#listener{host = H, port = P} | _] =
@@ -1063,67 +1071,23 @@ test_server_status() ->
%% cleanup
[{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]],
+
+ unlink(Ch),
ok = rabbit_channel:shutdown(Ch),
passed.
-test_hooks() ->
- %% Firing of hooks calls all hooks in an isolated manner
- rabbit_hooks:subscribe(test_hook, test, {rabbit_tests, handle_hook, []}),
- rabbit_hooks:subscribe(test_hook, test2, {rabbit_tests, handle_hook, []}),
- rabbit_hooks:subscribe(test_hook2, test2, {rabbit_tests, handle_hook, []}),
- rabbit_hooks:trigger(test_hook, [arg1, arg2]),
- [arg1, arg2] = get(test_hook_test_fired),
- [arg1, arg2] = get(test_hook_test2_fired),
- undefined = get(test_hook2_test2_fired),
-
- %% Hook Deletion works
- put(test_hook_test_fired, undefined),
- put(test_hook_test2_fired, undefined),
- rabbit_hooks:unsubscribe(test_hook, test),
- rabbit_hooks:trigger(test_hook, [arg3, arg4]),
- undefined = get(test_hook_test_fired),
- [arg3, arg4] = get(test_hook_test2_fired),
- undefined = get(test_hook2_test2_fired),
-
- %% Catches exceptions from bad hooks
- rabbit_hooks:subscribe(test_hook3, test, {rabbit_tests, bad_handle_hook, []}),
- ok = rabbit_hooks:trigger(test_hook3, []),
-
- %% Passing extra arguments to hooks
- rabbit_hooks:subscribe(arg_hook, test, {rabbit_tests, extra_arg_hook, [1, 3]}),
- rabbit_hooks:trigger(arg_hook, [arg1, arg2]),
- {[arg1, arg2], 1, 3} = get(arg_hook_test_fired),
-
- %% Invoking Pids
- Remote = fun () ->
- receive
- {rabbitmq_hook,[remote_test,test,[],Target]} ->
- Target ! invoked
- end
- end,
- P = spawn(Remote),
- rabbit_hooks:subscribe(remote_test, test, {rabbit_hooks, notify_remote, [P, [self()]]}),
- rabbit_hooks:trigger(remote_test, []),
- receive
- invoked -> ok
- after 100 ->
- io:format("Remote hook not invoked"),
- throw(timeout)
- end,
- passed.
-
test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
- {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"guest">>,
- <<"/">>, self()),
+ {ok, Ch} = rabbit_channel:start_link(1, Me, Writer,
+ <<"guest">>, <<"/">>, self(),
+ fun (_) -> {ok, self()} end),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
- MRef = erlang:monitor(process, Ch),
receive #'channel.open_ok'{} -> ok
after 1000 -> throw(failed_to_receive_channel_open_ok)
end,
- {Writer, Ch, MRef}.
+ {Writer, Ch}.
test_statistics_receiver(Pid) ->
receive
@@ -1162,7 +1126,7 @@ test_statistics() ->
%% by far the most complex code though.
%% Set up a channel and queue
- {_Writer, Ch, _MRef} = test_spawn(fun test_statistics_receiver/1),
+ {_Writer, Ch} = test_spawn(fun test_statistics_receiver/1),
rabbit_channel:do(Ch, #'queue.declare'{}),
QName = receive #'queue.declare_ok'{queue = Q0} ->
Q0
@@ -1406,14 +1370,6 @@ delete_log_handlers(Handlers) ->
Handler <- Handlers],
ok.
-handle_hook(HookName, Handler, Args) ->
- A = atom_to_list(HookName) ++ "_" ++ atom_to_list(Handler) ++ "_fired",
- put(list_to_atom(A), Args).
-bad_handle_hook(_, _, _) ->
- exit(bad_handle_hook_called).
-extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) ->
- handle_hook(Hookname, Handler, {Args, Extra1, Extra2}).
-
test_supervisor_delayed_restart() ->
test_sup:test_supervisor_delayed_restart().
@@ -1514,7 +1470,7 @@ msg_store_remove(Guids) ->
foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
rabbit_msg_store:client_terminate(
lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end,
- rabbit_msg_store:client_init(MsgStore, Ref), L)).
+ rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore).
test_msg_store() ->
restart_msg_store_empty(),
@@ -1577,7 +1533,7 @@ test_msg_store() ->
ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf),
%% read the second half again, just for fun (aka code coverage)
MSCState7 = msg_store_read(Guids2ndHalf, MSCState6),
- ok = rabbit_msg_store:client_terminate(MSCState7),
+ ok = rabbit_msg_store:client_terminate(MSCState7, ?PERSISTENT_MSG_STORE),
%% stop and restart, preserving every other msg in 2nd half
ok = rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start_msg_store(
@@ -1602,7 +1558,7 @@ test_msg_store() ->
{ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8),
%% this should force some sort of sync internally otherwise misread
ok = rabbit_msg_store:client_terminate(
- msg_store_read(Guids1stHalf, MSCState9)),
+ msg_store_read(Guids1stHalf, MSCState9), ?PERSISTENT_MSG_STORE),
ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf),
%% restart empty
restart_msg_store_empty(), %% now safe to reuse guids
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 47e8bb01..0b6a15ec 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -38,7 +38,7 @@
-export_type([txn/0, maybe/1, info/0, info_key/0, message/0, basic_message/0,
delivery/0, content/0, decoded_content/0, undecoded_content/0,
unencoded_content/0, encoded_content/0, vhost/0, ctag/0,
- amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0,
+ amqp_error/0, r/1, r2/2, r3/3, listener/0,
binding/0, amqqueue/0, exchange/0, connection/0, protocol/0,
user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2,
ok_pid_or_error/0, channel_exit/0, connection_exit/0]).
@@ -107,8 +107,6 @@
kind :: Kind,
name :: Name}).
--type(ssl_socket() :: #ssl_socket{}).
-
-type(listener() ::
#listener{node :: node(),
protocol :: atom(),
@@ -118,7 +116,8 @@
-type(binding() ::
#binding{exchange_name :: rabbit_exchange:name(),
queue_name :: rabbit_amqqueue:name(),
- key :: rabbit_exchange:binding_key()}).
+ key :: rabbit_binding:key(),
+ args :: rabbit_framing:amqp_table()}).
-type(amqqueue() ::
#amqqueue{name :: rabbit_amqqueue:name(),
@@ -141,7 +140,8 @@
-type(user() ::
#user{username :: rabbit_access_control:username(),
- password :: rabbit_access_control:password()}).
+ password :: rabbit_access_control:password(),
+ is_admin :: boolean()}).
-type(ok(A) :: {'ok', A}).
-type(error(A) :: {'error', A}).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0f52eee8..30d3a8ae 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -439,9 +439,10 @@ terminate(State) ->
remove_pending_ack(true, tx_commit_index(State)),
case MSCStateP of
undefined -> ok;
- _ -> rabbit_msg_store:client_terminate(MSCStateP)
+ _ -> rabbit_msg_store:client_terminate(
+ MSCStateP, ?PERSISTENT_MSG_STORE)
end,
- rabbit_msg_store:client_terminate(MSCStateT),
+ rabbit_msg_store:client_terminate(MSCStateT, ?TRANSIENT_MSG_STORE),
Terms = [{persistent_ref, PRef},
{transient_ref, TRef},
{persistent_count, PCount}],
@@ -464,8 +465,7 @@ delete_and_terminate(State) ->
case MSCStateP of
undefined -> ok;
_ -> rabbit_msg_store:client_delete_and_terminate(
- MSCStateP, ?PERSISTENT_MSG_STORE, PRef),
- rabbit_msg_store:client_terminate(MSCStateP)
+ MSCStateP, ?PERSISTENT_MSG_STORE, PRef)
end,
rabbit_msg_store:client_delete_and_terminate(
MSCStateT, ?TRANSIENT_MSG_STORE, TRef),
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index f90ee734..aa986e54 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -33,9 +33,9 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/4, start_link/4, shutdown/1, mainloop/1]).
--export([send_command/2, send_command/3, send_command_and_signal_back/3,
- send_command_and_signal_back/4, send_command_and_notify/5]).
+-export([start/5, start_link/5, mainloop/2, mainloop1/2]).
+-export([send_command/2, send_command/3, send_command_sync/2,
+ send_command_sync/3, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
-import(gen_tcp).
@@ -48,24 +48,23 @@
-ifdef(use_specs).
--spec(start/4 ::
+-spec(start/5 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol())
+ non_neg_integer(), rabbit_types:protocol(), pid())
-> rabbit_types:ok(pid())).
--spec(start_link/4 ::
+-spec(start_link/5 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol())
+ non_neg_integer(), rabbit_types:protocol(), pid())
-> rabbit_types:ok(pid())).
-spec(send_command/2 ::
(pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(send_command/3 ::
(pid(), rabbit_framing:amqp_method_record(), rabbit_types:content())
-> 'ok').
--spec(send_command_and_signal_back/3 ::
- (pid(), rabbit_framing:amqp_method(), pid()) -> 'ok').
--spec(send_command_and_signal_back/4 ::
- (pid(), rabbit_framing:amqp_method(), rabbit_types:content(), pid())
- -> 'ok').
+-spec(send_command_sync/2 ::
+ (pid(), rabbit_framing:amqp_method()) -> 'ok').
+-spec(send_command_sync/3 ::
+ (pid(), rabbit_framing:amqp_method(), rabbit_types:content()) -> 'ok').
-spec(send_command_and_notify/5 ::
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
@@ -84,68 +83,61 @@
%%----------------------------------------------------------------------------
-start(Sock, Channel, FrameMax, Protocol) ->
- {ok, spawn(?MODULE, mainloop, [#wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}])}.
-
-start_link(Sock, Channel, FrameMax, Protocol) ->
- {ok, spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
+start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
+ {ok,
+ proc_lib:spawn(?MODULE, mainloop, [ReaderPid,
+ #wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
protocol = Protocol}])}.
-mainloop(State) ->
+start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
+ {ok,
+ proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid,
+ #wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol}])}.
+
+mainloop(ReaderPid, State) ->
+ try
+ mainloop1(ReaderPid, State)
+ catch
+ exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error}
+ end,
+ done.
+
+mainloop1(ReaderPid, State) ->
receive
- Message -> ?MODULE:mainloop(handle_message(Message, State))
+ Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
after ?HIBERNATE_AFTER ->
- erlang:hibernate(?MODULE, mainloop, [State])
+ erlang:hibernate(?MODULE, mainloop, [ReaderPid, State])
end.
-handle_message({send_command, MethodRecord},
- State = #wstate{sock = Sock, channel = Channel,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol),
+handle_message({send_command, MethodRecord}, State) ->
+ ok = internal_send_command_async(MethodRecord, State),
State;
-handle_message({send_command, MethodRecord, Content},
- State = #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax, Protocol),
+handle_message({send_command, MethodRecord, Content}, State) ->
+ ok = internal_send_command_async(MethodRecord, Content, State),
State;
-handle_message({send_command_and_signal_back, MethodRecord, Parent},
- State = #wstate{sock = Sock, channel = Channel,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol),
- Parent ! rabbit_writer_send_command_signal,
+handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) ->
+ ok = internal_send_command_async(MethodRecord, State),
+ gen_server:reply(From, ok),
State;
-handle_message({send_command_and_signal_back, MethodRecord, Content, Parent},
- State = #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax, Protocol),
- Parent ! rabbit_writer_send_command_signal,
+handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}},
+ State) ->
+ ok = internal_send_command_async(MethodRecord, Content, State),
+ gen_server:reply(From, ok),
State;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
- State = #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax, Protocol),
+ State) ->
+ ok = internal_send_command_async(MethodRecord, Content, State),
rabbit_amqqueue:notify_sent(QPid, ChPid),
State;
handle_message({inet_reply, _, ok}, State) ->
State;
handle_message({inet_reply, _, Status}, _State) ->
exit({writer, send_failed, Status});
-handle_message(shutdown, _State) ->
- exit(normal);
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).
@@ -159,29 +151,28 @@ send_command(W, MethodRecord, Content) ->
W ! {send_command, MethodRecord, Content},
ok.
-send_command_and_signal_back(W, MethodRecord, Parent) ->
- W ! {send_command_and_signal_back, MethodRecord, Parent},
- ok.
+send_command_sync(W, MethodRecord) ->
+ call(W, {send_command_sync, MethodRecord}).
-send_command_and_signal_back(W, MethodRecord, Content, Parent) ->
- W ! {send_command_and_signal_back, MethodRecord, Content, Parent},
- ok.
+send_command_sync(W, MethodRecord, Content) ->
+ call(W, {send_command_sync, MethodRecord, Content}).
send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.
-shutdown(W) ->
- W ! shutdown,
- rabbit_misc:unlink_and_capture_exit(W),
- ok.
+%---------------------------------------------------------------------------
+
+call(Pid, Msg) ->
+ {ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity),
+ Res.
%---------------------------------------------------------------------------
assemble_frames(Channel, MethodRecord, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, none),
- rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord,
- Protocol).
+ rabbit_binary_generator:build_simple_method_frame(
+ Channel, MethodRecord, Protocol).
assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, Content),
@@ -223,12 +214,18 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
%% Also note that the port has bounded buffers and port_command blocks
%% when these are full. So the fact that we process the result
%% asynchronously does not impact flow control.
-internal_send_command_async(Sock, Channel, MethodRecord, Protocol) ->
+internal_send_command_async(MethodRecord,
+ #wstate{sock = Sock,
+ channel = Channel,
+ protocol = Protocol}) ->
true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)),
ok.
-internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax,
- Protocol) ->
+internal_send_command_async(MethodRecord, Content,
+ #wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol}) ->
true = port_cmd(Sock, assemble_frames(Channel, MethodRecord,
Content, FrameMax, Protocol)),
ok.
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 87883037..93adfcb1 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -34,7 +34,9 @@
%% 4) Added an 'intrinsic' restart type. Like the transient type, this
%% type means the child should only be restarted if the child exits
%% abnormally. Unlike the transient type, if the child exits
-%% normally, the supervisor itself also exits normally.
+%% normally, the supervisor itself also exits normally. If the
+%% child is a supervisor and it exits normally (i.e. with reason of
+%% 'shutdown') then the child's parent also exits normally.
%%
%% All modifications are (C) 2010 Rabbit Technologies Ltd.
%%
@@ -63,7 +65,7 @@
-export([start_link/2,start_link/3,
start_child/2, restart_child/2,
delete_child/2, terminate_child/2,
- which_children/1,
+ which_children/1, find_child/2,
check_childspecs/1]).
-export([behaviour_info/1]).
@@ -138,6 +140,10 @@ terminate_child(Supervisor, Name) ->
which_children(Supervisor) ->
call(Supervisor, which_children).
+find_child(Supervisor, Name) ->
+ [Pid || {Name1, Pid, _Type, _Modules} <- which_children(Supervisor),
+ Name1 =:= Name].
+
call(Supervisor, Req) ->
gen_server:call(Supervisor, Req, infinity).
@@ -541,6 +547,9 @@ do_restart(permanent, Reason, Child, State) ->
restart(Child, State);
do_restart(intrinsic, normal, Child, State) ->
{shutdown, state_del_child(Child, State)};
+do_restart(intrinsic, shutdown, Child = #child{child_type = supervisor},
+ State) ->
+ {shutdown, state_del_child(Child, State)};
do_restart(_, normal, Child, State) ->
NState = state_del_child(Child, State),
{ok, NState};
diff --git a/src/tcp_client_sup.erl b/src/tcp_client_sup.erl
index 1b785843..02d7e0e4 100644
--- a/src/tcp_client_sup.erl
+++ b/src/tcp_client_sup.erl
@@ -31,19 +31,19 @@
-module(tcp_client_sup).
--behaviour(supervisor).
+-behaviour(supervisor2).
-export([start_link/1, start_link/2]).
-export([init/1]).
start_link(Callback) ->
- supervisor:start_link(?MODULE, Callback).
+ supervisor2:start_link(?MODULE, Callback).
start_link(SupName, Callback) ->
- supervisor:start_link(SupName, ?MODULE, Callback).
+ supervisor2:start_link(SupName, ?MODULE, Callback).
init({M,F,A}) ->
- {ok, {{simple_one_for_one, 10, 10},
+ {ok, {{simple_one_for_one_terminate, 10, 10},
[{tcp_client, {M,F,A},
- temporary, brutal_kill, worker, [M]}]}}.
+ temporary, infinity, supervisor, [M]}]}}.